6#ifndef XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
7#define XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
9#include <xenium/marked_ptr.hpp>
10#include <xenium/parameter.hpp>
11#include <xenium/policy.hpp>
12#include <xenium/utils.hpp>
14#include <xenium/detail/pointer_queue_traits.hpp>
23#pragma warning(disable: 26495)
46 template <
class T,
class... Policies>
49 using traits = detail::pointer_queue_traits_t<T, Policies...>;
50 using raw_value_type =
typename traits::raw_type;
53 static constexpr unsigned padding_bytes = parameter::value_param_t<unsigned,
policy::padding_bytes,
sizeof(raw_value_type), Policies...>::value;
78 bool try_pop(value_type& result);
84 std::atomic<marked_value> value;
86 char padding[std::max(padding_bytes, 1u)];
89 struct unpadded_entry {
90 std::atomic<marked_value> value;
93 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
103 marked_idx() =
default;
104 marked_idx(uint64_t val, uint64_t mark)
noexcept { val_ = val | (mark << bits); }
106 uint64_t get()
const noexcept {
return val_ & val_mask; }
107 uint64_t mark()
const noexcept {
return val_ >> bits; }
108 bool operator==(
const marked_idx& other)
const noexcept {
return this->val_ == other.val_; }
109 bool operator!=(
const marked_idx& other)
const noexcept {
return this->val_ != other.val_; }
111 static constexpr unsigned bits = 16;
112 static constexpr uint64_t val_mask = (
static_cast<uint64_t
>(1) << bits) - 1;
116 template <
bool Empty>
117 bool find_index(uint64_t start_index, uint64_t& index, marked_value& old);
118 bool queue_full(
const marked_idx& head_old,
const marked_idx& tail_old)
const;
119 bool segment_empty(
const marked_idx& head_old)
const;
120 bool not_in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current)
const;
121 bool in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current)
const;
122 bool committed(
const marked_idx& tail_old, marked_value new_value, uint64_t index);
124 std::uint64_t queue_size_;
128 std::atomic<marked_idx> head_;
129 std::atomic<marked_idx> tail_;
130 std::unique_ptr<entry[]> queue_;
133 template <
class T,
class... Policies>
134 kirsch_bounded_kfifo_queue<T, Policies...>::kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments) :
135 queue_size_(k * num_segments),
139 queue_(new entry[k * num_segments]())
142 template <
class T,
class... Policies>
143 kirsch_bounded_kfifo_queue<T, Policies...>::~kirsch_bounded_kfifo_queue() {
144 for (
unsigned i = 0; i < queue_size_; ++i)
145 traits::delete_value(queue_[i].value.load(std::memory_order_relaxed).get());
148 template <
class T,
class... Policies>
150 if (value ==
nullptr)
151 throw std::invalid_argument(
"value can not be nullptr");
153 raw_value_type raw_value = traits::get_raw(value);
155 marked_idx tail_old = tail_.load(std::memory_order_relaxed);
156 marked_idx head_old = head_.load(std::memory_order_relaxed);
160 bool found_idx = find_index<true>(tail_old.get(), idx, old_value);
161 if (tail_old != tail_.load(std::memory_order_relaxed))
165 assert(old_value.
get() ==
nullptr);
168 if (queue_[idx].value.compare_exchange_strong(
169 old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
170 committed(tail_old, new_value, idx)) {
171 traits::release(value);
175 if (queue_full(head_old, tail_old)) {
176 if (segment_empty(head_old)) {
178 marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
179 head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
180 }
else if (head_old == head_.load(std::memory_order_relaxed)) {
186 marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
187 tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
192 template <
class T,
class... Policies>
195 marked_idx head_old = head_.load(std::memory_order_relaxed);
196 marked_idx tail_old = tail_.load(std::memory_order_relaxed);
200 bool found_idx = find_index<false>(head_old.get(), idx, old_value);
201 if (head_old != head_.load(std::memory_order_relaxed))
205 assert(old_value.
get() !=
nullptr);
206 if (head_old.get() == tail_old.get()) {
207 marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
208 tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
212 if (queue_[idx].value.compare_exchange_strong(
213 old_value, new_value, std::memory_order_release, std::memory_order_relaxed)) {
214 traits::store(result, old_value.
get());
218 if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
221 marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
222 head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
227 template <
class T,
class... Policies>
228 template <
bool Empty>
230 uint64_t start_index, uint64_t& value_index, marked_value& old)
232 const uint64_t random_index = utils::random() % k_;
233 for (
size_t i = 0; i < k_; i++) {
235 uint64_t index = (start_index + ((random_index + i) % k_)) % queue_size_;
237 old = queue_[index].value.load(std::memory_order_acquire);
238 if ((Empty && old.get() ==
nullptr) || (!Empty && old.get() !=
nullptr)) {
246 template <
class T,
class... Policies>
247 bool kirsch_bounded_kfifo_queue<T, Policies...>::committed(
248 const marked_idx& tail_old, marked_value value, uint64_t index)
250 if (queue_[index].value.load(std::memory_order_relaxed) != value)
253 marked_idx tail_current = tail_.load(std::memory_order_relaxed);
254 marked_idx head_current = head_.load(std::memory_order_relaxed);
255 if (in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
257 }
else if (not_in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
258 marked_value new_value(
nullptr, value.mark() + 1);
259 if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
262 marked_idx new_head(head_current.get(), head_current.mark() + 1);
263 if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
266 marked_value new_value(
nullptr, value.mark() + 1);
267 if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
273 template <
class T,
class... Policies>
274 bool kirsch_bounded_kfifo_queue<T, Policies...>::queue_full(
275 const marked_idx& head_old,
const marked_idx& tail_old)
const
277 if (((tail_old.get() + k_) % queue_size_) == head_old.get() &&
278 (head_old == head_.load(std::memory_order_relaxed)))
283 template <
class T,
class... Policies>
284 bool kirsch_bounded_kfifo_queue<T, Policies...>::segment_empty(
const marked_idx& head_old)
const {
285 const uint64_t start = head_old.get();
286 for (
size_t i = 0; i < k_; i++) {
289 if (queue_[(start + i) % queue_size_].value.load(std::memory_order_acquire).get() !=
nullptr)
295 template <
class T,
class... Policies>
296 bool kirsch_bounded_kfifo_queue<T, Policies...>::in_valid_region(uint64_t tail_old,
297 uint64_t tail_current, uint64_t head_current)
const
299 bool wrap_around = tail_current < head_current;
301 return head_current < tail_old && tail_old <= tail_current;
302 return head_current < tail_old || tail_old <= tail_current;
305 template <
class T,
class... Policies>
306 bool kirsch_bounded_kfifo_queue<T, Policies...>::not_in_valid_region(uint64_t tail_old,
307 uint64_t tail_current, uint64_t head_current)
const
309 bool wrap_around = tail_current < head_current;
311 return tail_old < tail_current || head_current < tail_old;
312 return tail_old < tail_current && head_current < tail_old;
A bounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition kirsch_bounded_kfifo_queue.hpp:47
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition kirsch_bounded_kfifo_queue.hpp:99
bool try_pop(value_type &result)
Definition kirsch_bounded_kfifo_queue.hpp:193
bool try_push(value_type value)
Tries to push a new element to the queue. Progress guarantees: lock-free.
Definition kirsch_bounded_kfifo_queue.hpp:149
A pointer with an embedded mark/tag value.
Definition marked_ptr.hpp:41
T * get() const noexcept
Get underlying pointer (with mark bits stripped off).
Definition marked_ptr.hpp:77
uintptr_t mark() const noexcept
Get the mark value.
Definition marked_ptr.hpp:70
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition policy.hpp:117