xenium
Loading...
Searching...
No Matches
vyukov_bounded_queue.hpp
1//
2// Copyright (c) 2018-2020 Manuel Pöter.
3// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4//
5
6#ifndef XENIUM_VYUKOV_BOUNDED_QUEUE_HPP
7#define XENIUM_VYUKOV_BOUNDED_QUEUE_HPP
8
9#include <xenium/utils.hpp>
10#include <xenium/parameter.hpp>
11
12#include <atomic>
13#include <cassert>
14#include <cstdint>
15#include <memory>
16
17#ifdef _MSC_VER
18#pragma warning(push)
19#pragma warning(disable: 4324) // structure was padded due to alignment specifier
20#endif
21
22namespace xenium {
23
24namespace policy {
30 template <bool Value>
32
33}
58template<class T, class... Policies>
60public:
61 using value_type = T;
62
63 static constexpr bool default_to_weak =
64 parameter::value_param_t<bool, policy::default_to_weak, false, Policies...>::value;;
65
70 vyukov_bounded_queue(std::size_t size) :
71 cells(new cell[size]),
72 index_mask(size - 1)
73 {
74 assert(size >= 2 && utils::is_power_of_two(size));
75 for (std::size_t i = 0; i < size; ++i)
76 cells[i].sequence.store(i, std::memory_order_relaxed);
77 enqueue_pos.store(0, std::memory_order_relaxed);
78 dequeue_pos.store(0, std::memory_order_relaxed);
79 }
80
83
84 vyukov_bounded_queue& operator= (const vyukov_bounded_queue&) = delete;
85 vyukov_bounded_queue& operator= (vyukov_bounded_queue&&) = delete;
86
99 template <class... Args>
100 bool try_push(Args&&... args) {
101 return do_try_push<default_to_weak>(std::forward<Args>(args)...);
102 }
103
121 template <class... Args>
122 bool try_push_strong(Args&&... args) {
123 return do_try_push<false>(std::forward<Args>(args)...);
124 }
125
143 template <class... Args>
144 bool try_push_weak(Args&&... args) {
145 return do_try_push<true>(std::forward<Args>(args)...);
146 }
147
159 [[nodiscard]] bool try_pop(T& result) {
160 return do_try_pop<default_to_weak>(result);
161 }
162
174 [[nodiscard]] bool try_pop_strong(T& result) {
175 return do_try_pop<false>(result);
176 }
177
189 [[nodiscard]] bool try_pop_weak(T& result) {
190 return do_try_pop<true>(result);
191 }
192
193private:
194 template <bool Weak, class... Args>
195 bool do_try_push(Args&&... args) {
196 cell* c;
197 std::size_t pos = enqueue_pos.load(std::memory_order_relaxed);
198 for (;;) {
199 c = &cells[pos & index_mask];
200 // (3) - this acquire-load synchronizes-with the release-store (2)
201 std::size_t seq = c->sequence.load(std::memory_order_acquire);
202 if (seq == pos) {
203 if (enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
204 break;
205 } else {
206 if (Weak) {
207 if (seq < pos)
208 return false;
209 else
210 pos = enqueue_pos.load(std::memory_order_relaxed);
211 } else {
212 auto pos2 = enqueue_pos.load(std::memory_order_relaxed);
213 if (pos2 == pos && dequeue_pos.load(std::memory_order_relaxed) + index_mask + 1 == pos)
214 return false;
215 pos = pos2;
216 }
217 }
218 }
219 assign_value(c->value, std::forward<Args>(args)...);
220 // (4) - this release-store synchronizes-with the acquire-load (1)
221 c->sequence.store(pos + 1, std::memory_order_release);
222 return true;
223 }
224
225 template <bool Weak>
226 bool do_try_pop(T& result) {
227 cell* c;
228 std::size_t pos = dequeue_pos.load(std::memory_order_relaxed);
229 for (;;) {
230 c = &cells[pos & index_mask];
231 // (1) - this acquire-load synchronizes-with the release-store (4)
232 std::size_t seq = c->sequence.load(std::memory_order_acquire);
233 auto new_pos = pos + 1;
234 if (seq == new_pos) {
235 if (dequeue_pos.compare_exchange_weak(pos, new_pos, std::memory_order_relaxed))
236 break;
237 } else {
238 if (Weak) {
239 if (seq < new_pos)
240 return false;
241 pos = dequeue_pos.load(std::memory_order_relaxed);
242 } else {
243 auto pos2 = dequeue_pos.load(std::memory_order_relaxed);
244 if (pos2 == pos && enqueue_pos.load(std::memory_order_relaxed) == pos)
245 return false;
246 pos = pos2;
247 }
248 }
249 }
250 result = std::move(c->value);
251 // (2) - this release-store synchronizes-with the acquire-load (3)
252 c->sequence.store(pos + index_mask + 1, std::memory_order_release);
253 return true;
254 }
255
256 void assign_value(T& v, const T& source) { v = source; }
257 void assign_value(T& v, T&& source) { v = std::move(source); }
258 template <class... Args>
259 void assign_value(T& v, Args&&... args) { v = T{std::forward<Args>(args)...}; }
260
261 // TODO - add optional padding via policy
262 struct cell {
263 std::atomic<std::size_t> sequence;
264 T value;
265 };
266
267 std::unique_ptr<cell[]> cells;
268 const std::size_t index_mask;
269 alignas(64) std::atomic<size_t> enqueue_pos;
270 alignas(64) std::atomic<size_t> dequeue_pos;
271};
272}
273
274#ifdef _MSC_VER
275#pragma warning(pop)
276#endif
277
278#endif
Policy to configure whether try_push/try_pop in vyukov_bounded_queue should default to try_push_weak/...
Definition vyukov_bounded_queue.hpp:31
A bounded generic multi-producer/multi-consumer FIFO queue.
Definition vyukov_bounded_queue.hpp:59
bool try_push_strong(Args &&... args)
Tries to push a new element to the queue.
Definition vyukov_bounded_queue.hpp:122
bool try_pop_strong(T &result)
Tries to pop an element from the queue as long as the queue is not empty.
Definition vyukov_bounded_queue.hpp:174
vyukov_bounded_queue(std::size_t size)
Constructs a new instance with the specified maximum size.
Definition vyukov_bounded_queue.hpp:70
bool try_pop(T &result)
Tries to pop an element from the queue.
Definition vyukov_bounded_queue.hpp:159
bool try_pop_weak(T &result)
Tries to pop an element from the queue.
Definition vyukov_bounded_queue.hpp:189
bool try_push(Args &&... args)
Tries to push a new element to the queue.
Definition vyukov_bounded_queue.hpp:100
bool try_push_weak(Args &&... args)
Tries to push a new element to the queue.
Definition vyukov_bounded_queue.hpp:144