xenium
Loading...
Searching...
No Matches
kirsch_bounded_kfifo_queue.hpp
1//
2// Copyright (c) 2018-2020 Manuel Pöter.
3// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4//
5
6#ifndef XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
7#define XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
8
9#include <xenium/marked_ptr.hpp>
10#include <xenium/parameter.hpp>
11#include <xenium/policy.hpp>
12#include <xenium/utils.hpp>
13
14#include <xenium/detail/pointer_queue_traits.hpp>
15
16#include <algorithm>
17#include <atomic>
18#include <cstdint>
19#include <stdexcept>
20
21#ifdef _MSC_VER
22#pragma warning(push)
23#pragma warning(disable: 26495) // uninitialized member variable
24#endif
25
26namespace xenium {
46 template <class T, class... Policies>
48 private:
49 using traits = detail::pointer_queue_traits_t<T, Policies...>;
50 using raw_value_type = typename traits::raw_type;
51 public:
52 using value_type = T;
53 static constexpr unsigned padding_bytes = parameter::value_param_t<unsigned, policy::padding_bytes, sizeof(raw_value_type), Policies...>::value;
54
55 kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments);
57
60
61 kirsch_bounded_kfifo_queue& operator= (const kirsch_bounded_kfifo_queue&) = delete;
63
70 bool try_push(value_type value);
71
78 bool try_pop(value_type& result);
79
80 private:
82
83 struct padded_entry {
84 std::atomic<marked_value> value;
85 // we use max here to avoid arrays of size zero which are not allowed by Visual C++
86 char padding[std::max(padding_bytes, 1u)];
87 };
88
89 struct unpadded_entry {
90 std::atomic<marked_value> value;
91 };
92
93 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
94
95 public:
99 static constexpr std::size_t entry_size = sizeof(entry);
100
101 private:
102 struct marked_idx {
103 marked_idx() = default;
104 marked_idx(uint64_t val, uint64_t mark) noexcept { val_ = val | (mark << bits); }
105
106 uint64_t get() const noexcept { return val_ & val_mask; }
107 uint64_t mark() const noexcept { return val_ >> bits; }
108 bool operator==(const marked_idx& other) const noexcept { return this->val_ == other.val_; }
109 bool operator!=(const marked_idx& other) const noexcept { return this->val_ != other.val_; }
110 private:
111 static constexpr unsigned bits = 16;
112 static constexpr uint64_t val_mask = (static_cast<uint64_t>(1) << bits) - 1;
113 uint64_t val_ = 0;
114 };
115
116 template <bool Empty>
117 bool find_index(uint64_t start_index, uint64_t& index, marked_value& old);
118 bool queue_full(const marked_idx& head_old, const marked_idx& tail_old) const;
119 bool segment_empty(const marked_idx& head_old) const;
120 bool not_in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current) const;
121 bool in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current) const;
122 bool committed(const marked_idx& tail_old, marked_value new_value, uint64_t index);
123
124 std::uint64_t queue_size_;
125 std::size_t k_;
126 // all operations on head/tail are synchronized via the value operations and
127 // can therefore use memory_order_relaxed.
128 std::atomic<marked_idx> head_;
129 std::atomic<marked_idx> tail_;
130 std::unique_ptr<entry[]> queue_;
131 };
132
133 template <class T, class... Policies>
134 kirsch_bounded_kfifo_queue<T, Policies...>::kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments) :
135 queue_size_(k * num_segments),
136 k_(k),
137 head_(),
138 tail_(),
139 queue_(new entry[k * num_segments]())
140 {}
141
142 template <class T, class... Policies>
143 kirsch_bounded_kfifo_queue<T, Policies...>::~kirsch_bounded_kfifo_queue() {
144 for (unsigned i = 0; i < queue_size_; ++i)
145 traits::delete_value(queue_[i].value.load(std::memory_order_relaxed).get());
146 }
147
148 template <class T, class... Policies>
150 if (value == nullptr)
151 throw std::invalid_argument("value can not be nullptr");
152
153 raw_value_type raw_value = traits::get_raw(value);
154 for (;;) {
155 marked_idx tail_old = tail_.load(std::memory_order_relaxed);
156 marked_idx head_old = head_.load(std::memory_order_relaxed);
157
158 uint64_t idx;
159 marked_value old_value;
160 bool found_idx = find_index<true>(tail_old.get(), idx, old_value);
161 if (tail_old != tail_.load(std::memory_order_relaxed))
162 continue;
163
164 if (found_idx) {
165 assert(old_value.get() == nullptr);
166 const marked_value new_value(raw_value, old_value.mark() + 1);
167 // (1) - this release-CAS synchronizes with the acquire-load (3, 4)
168 if (queue_[idx].value.compare_exchange_strong(
169 old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
170 committed(tail_old, new_value, idx)) {
171 traits::release(value);
172 return true;
173 }
174 } else {
175 if (queue_full(head_old, tail_old)) {
176 if (segment_empty(head_old)) {
177 // increment head by k
178 marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
179 head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
180 } else if (head_old == head_.load(std::memory_order_relaxed)) {
181 // queue is full
182 return false;
183 }
184 }
185 // increment tail by k
186 marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
187 tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
188 }
189 }
190 }
191
192 template <class T, class... Policies>
194 for (;;) {
195 marked_idx head_old = head_.load(std::memory_order_relaxed);
196 marked_idx tail_old = tail_.load(std::memory_order_relaxed);
197
198 uint64_t idx;
199 marked_value old_value;
200 bool found_idx = find_index<false>(head_old.get(), idx, old_value);
201 if (head_old != head_.load(std::memory_order_relaxed))
202 continue;
203
204 if (found_idx) {
205 assert(old_value.get() != nullptr);
206 if (head_old.get() == tail_old.get()) {
207 marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
208 tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
209 }
210 marked_value new_value(nullptr, old_value.mark() + 1);
211 // (2) - this release-CAS synchronizes with the acquire-load (3, 4)
212 if (queue_[idx].value.compare_exchange_strong(
213 old_value, new_value, std::memory_order_release, std::memory_order_relaxed)) {
214 traits::store(result, old_value.get());
215 return true;
216 }
217 } else {
218 if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
219 return false;
220
221 marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
222 head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
223 }
224 }
225 }
226
227 template <class T, class... Policies>
228 template <bool Empty>
230 uint64_t start_index, uint64_t& value_index, marked_value& old)
231 {
232 const uint64_t random_index = utils::random() % k_;
233 for (size_t i = 0; i < k_; i++) {
234 // TODO - this can be simplified if queue_size is a multiple of k!
235 uint64_t index = (start_index + ((random_index + i) % k_)) % queue_size_;
236 // (3) - this acquire-load synchronizes-with the release-CAS (1, 2)
237 old = queue_[index].value.load(std::memory_order_acquire);
238 if ((Empty && old.get() == nullptr) || (!Empty && old.get() != nullptr)) {
239 value_index = index;
240 return true;
241 }
242 }
243 return false;
244 }
245
246 template <class T, class... Policies>
247 bool kirsch_bounded_kfifo_queue<T, Policies...>::committed(
248 const marked_idx& tail_old, marked_value value, uint64_t index)
249 {
250 if (queue_[index].value.load(std::memory_order_relaxed) != value)
251 return true;
252
253 marked_idx tail_current = tail_.load(std::memory_order_relaxed);
254 marked_idx head_current = head_.load(std::memory_order_relaxed);
255 if (in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
256 return true;
257 } else if (not_in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
258 marked_value new_value(nullptr, value.mark() + 1);
259 if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
260 return true;
261 } else {
262 marked_idx new_head(head_current.get(), head_current.mark() + 1);
263 if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
264 return true;
265
266 marked_value new_value(nullptr, value.mark() + 1);
267 if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
268 return true;
269 }
270 return false;
271 }
272
273 template <class T, class... Policies>
274 bool kirsch_bounded_kfifo_queue<T, Policies...>::queue_full(
275 const marked_idx& head_old, const marked_idx& tail_old) const
276 {
277 if (((tail_old.get() + k_) % queue_size_) == head_old.get() &&
278 (head_old == head_.load(std::memory_order_relaxed)))
279 return true;
280 return false;
281 }
282
283 template <class T, class... Policies>
284 bool kirsch_bounded_kfifo_queue<T, Policies...>::segment_empty(const marked_idx& head_old) const {
285 const uint64_t start = head_old.get();
286 for (size_t i = 0; i < k_; i++) {
287 // TODO - this can be simplified if queue_size is a multiple of k!
288 // (4) - this acquire-load synchronizes-with the release-CAS (1, 2)
289 if (queue_[(start + i) % queue_size_].value.load(std::memory_order_acquire).get() != nullptr)
290 return false;
291 }
292 return true;
293 }
294
295 template <class T, class... Policies>
296 bool kirsch_bounded_kfifo_queue<T, Policies...>::in_valid_region(uint64_t tail_old,
297 uint64_t tail_current, uint64_t head_current) const
298 {
299 bool wrap_around = tail_current < head_current;
300 if (!wrap_around)
301 return head_current < tail_old && tail_old <= tail_current;
302 return head_current < tail_old || tail_old <= tail_current;
303 }
304
305 template <class T, class... Policies>
306 bool kirsch_bounded_kfifo_queue<T, Policies...>::not_in_valid_region(uint64_t tail_old,
307 uint64_t tail_current, uint64_t head_current) const
308 {
309 bool wrap_around = tail_current < head_current;
310 if (!wrap_around)
311 return tail_old < tail_current || head_current < tail_old;
312 return tail_old < tail_current && head_current < tail_old;
313 }
314}
315#ifdef _MSC_VER
316#pragma warning(pop)
317#endif
318
319#endif
A bounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition kirsch_bounded_kfifo_queue.hpp:47
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition kirsch_bounded_kfifo_queue.hpp:99
bool try_pop(value_type &result)
Definition kirsch_bounded_kfifo_queue.hpp:193
bool try_push(value_type value)
Tries to push a new element to the queue. Progress guarantees: lock-free.
Definition kirsch_bounded_kfifo_queue.hpp:149
A pointer with an embedded mark/tag value.
Definition marked_ptr.hpp:41
T * get() const noexcept
Get underlying pointer (with mark bits stripped off).
Definition marked_ptr.hpp:77
uintptr_t mark() const noexcept
Get the mark value.
Definition marked_ptr.hpp:70
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition policy.hpp:117