38 #pragma GCC diagnostic push 39 #pragma GCC diagnostic ignored "-Wconversion" 41 #ifdef MCDBGQ_USE_RELACY 42 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast" 46 #if defined(__APPLE__) 47 #include "TargetConditionals.h" 50 #ifdef MCDBGQ_USE_RELACY 51 #include "relacy/relacy_std.hpp" 52 #include "relacy_shims.h" 66 #include <type_traits> 82 #if defined(MCDBGQ_USE_RELACY) 87 static inline thread_id_t
thread_id() {
return rl::thread_index(); }
89 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__) 92 extern "C" __declspec(dllimport)
unsigned long __stdcall GetCurrentThreadId(
void);
94 static_assert(
sizeof(
unsigned long) ==
sizeof(std::uint32_t),
"Expected size of unsigned long to be 32 bits on Windows");
98 static inline thread_id_t
thread_id() {
return static_cast<thread_id_t
>(::GetCurrentThreadId()); }
100 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) 102 static_assert(
sizeof(std::thread::id) == 4 ||
sizeof(std::thread::id) == 8,
"std::thread::id is expected to be either 4 or 8 bytes");
110 static inline thread_id_t
thread_id() {
return std::this_thread::get_id(); }
112 template<std::
size_t>
struct thread_id_size { };
113 template<>
struct thread_id_size<4> {
typedef std::uint32_t numeric_t; };
114 template<>
struct thread_id_size<8> {
typedef std::uint64_t numeric_t; };
124 static thread_id_hash_t
prehash(thread_id_t
const& x)
127 return std::hash<std::thread::id>()(x);
129 return *
reinterpret_cast<thread_id_hash_t const*
>(&x);
138 #if defined(__GNUC__) || defined(__INTEL_COMPILER) 139 #define MOODYCAMEL_THREADLOCAL __thread 140 #elif defined(_MSC_VER) 141 #define MOODYCAMEL_THREADLOCAL __declspec(thread) 144 #define MOODYCAMEL_THREADLOCAL thread_local 155 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED 156 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__)) 157 #define MOODYCAMEL_EXCEPTIONS_ENABLED 165 #if (defined(LIBCARLA_NO_EXCEPTIONS) && defined(MOODYCAMEL_EXCEPTIONS_ENABLED)) 166 # undef MOODYCAMEL_EXCEPTIONS_ENABLED 169 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED 170 #define MOODYCAMEL_TRY try 171 #define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__) 172 #define MOODYCAMEL_RETHROW throw 173 #define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr) 175 #define MOODYCAMEL_TRY if (true) 176 #define MOODYCAMEL_CATCH(...) else if (false) 177 #define MOODYCAMEL_RETHROW 178 #define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr) 183 #ifndef MOODYCAMEL_NOEXCEPT 184 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED) 185 #define MOODYCAMEL_NOEXCEPT 186 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true 187 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true 188 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800 191 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT 192 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value) 193 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)) 194 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900 195 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT 196 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value) 197 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)) 199 #define MOODYCAMEL_NOEXCEPT noexcept 200 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr) 201 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr) 205 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 206 #ifdef MCDBGQ_USE_RELACY 207 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 212 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__) 221 #ifndef MOODYCAMEL_DELETE_FUNCTION 222 #if defined(_MSC_VER) && _MSC_VER < 1800 223 #define MOODYCAMEL_DELETE_FUNCTION 225 #define MOODYCAMEL_DELETE_FUNCTION = delete 231 #if defined(__GNUC__) 232 static inline bool (
likely)(
bool x) {
return __builtin_expect((x),
true); }
233 static inline bool (
unlikely)(
bool x) {
return __builtin_expect((x),
false); }
235 static inline bool (
likely)(
bool x) {
return x; }
236 static inline bool (
unlikely)(
bool x) {
return x; }
240 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 241 #include "internal/concurrentqueue_internal_debug.h" 248 static_assert(std::is_integral<T>::value,
"const_numeric_max can only be used with integers");
249 static const T value = std::numeric_limits<T>::is_signed
250 ? (
static_cast<T
>(1) << (
sizeof(T) * CHAR_BIT - 1)) -
static_cast<T
>(1)
251 : static_cast<T>(-1);
254 #if defined(__GLIBCXX__) 297 static const size_t BLOCK_SIZE = 32;
304 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
308 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
312 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
318 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
323 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
332 #ifndef MCDBGQ_USE_RELACY 335 #if defined(malloc) || defined(free) 338 static inline void* WORKAROUND_malloc(
size_t size) {
return malloc(size); }
339 static inline void WORKAROUND_free(
void* ptr) {
return free(ptr); }
340 static inline void* (malloc)(
size_t size) {
return WORKAROUND_malloc(size); }
341 static inline void (free)(
void* ptr) {
return WORKAROUND_free(ptr); }
343 static inline void*
malloc(
size_t size) {
return std::malloc(size); }
344 static inline void free(
void* ptr) {
return std::free(ptr); }
349 static inline void* malloc(
size_t size) {
return rl::rl_malloc(size, $); }
350 static inline void free(
void* ptr) {
return rl::rl_free(ptr, $); }
367 class ConcurrentQueueTests;
379 : next(nullptr), inactive(false), token(nullptr)
385 static inline std::uint32_t
hash(std::uint32_t h)
395 return h ^ (h >> 16);
399 static inline std::uint64_t
hash(std::uint64_t h)
402 h *= 0xff51afd7ed558ccd;
404 h *= 0xc4ceb9fe1a85ec53;
405 return h ^ (h >> 33);
412 static_assert(
sizeof(
thread_id_t) <= 8,
"Expected a platform where thread IDs are at most 64-bit values");
421 #pragma warning(push) 422 #pragma warning(disable: 4554) 424 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
"circular_less_than is intended to be used only with unsigned integer types");
425 return static_cast<T
>(a - b) > static_cast<T>(static_cast<T>(1) <<
static_cast<T
>(
sizeof(T) * CHAR_BIT - 1));
434 const std::size_t alignment = std::alignment_of<U>::value;
435 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
441 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
"ceil_to_pow_2 is intended to be used only with unsigned integer types");
448 for (std::size_t i = 1; i <
sizeof(T); i <<= 1) {
456 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
458 T temp = std::move(left.load(std::memory_order_relaxed));
459 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
460 right.store(std::move(temp), std::memory_order_relaxed);
464 static inline T
const&
nomove(T
const& x)
469 template<
bool Enable>
473 static inline T
const&
eval(T
const& x)
484 -> decltype(std::forward<U>(x))
486 return std::forward<U>(x);
490 template<
typename It>
496 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) 502 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 503 #ifdef MCDBGQ_USE_RELACY 504 typedef RelacyThreadExitListener ThreadExitListener;
505 typedef RelacyThreadExitNotifier ThreadExitNotifier;
507 struct ThreadExitListener
509 typedef void (*callback_t)(
void*);
513 ThreadExitListener* next;
517 class ThreadExitNotifier
520 static void subscribe(ThreadExitListener* listener)
522 auto& tlsInst = instance();
523 listener->next = tlsInst.tail;
524 tlsInst.tail = listener;
527 static void unsubscribe(ThreadExitListener* listener)
529 auto& tlsInst = instance();
530 ThreadExitListener** prev = &tlsInst.tail;
531 for (
auto ptr = tlsInst.tail; ptr !=
nullptr; ptr = ptr->next) {
532 if (ptr == listener) {
541 ThreadExitNotifier() : tail(
nullptr) { }
545 ~ThreadExitNotifier()
548 assert(
this == &instance() &&
"If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
549 for (
auto ptr = tail; ptr !=
nullptr; ptr = ptr->next) {
550 ptr->callback(ptr->userData);
555 static inline ThreadExitNotifier& instance()
557 static thread_local ThreadExitNotifier notifier;
562 ThreadExitListener* tail;
581 template<
typename T,
typename Traits>
584 template<
typename T,
typename Traits>
588 : producer(other.producer)
590 other.producer =
nullptr;
591 if (producer !=
nullptr) {
592 producer->token =
this;
605 if (producer !=
nullptr) {
606 producer->token =
this;
608 if (other.producer !=
nullptr) {
609 other.producer->token = &other;
621 inline bool valid()
const {
return producer !=
nullptr; }
625 if (producer !=
nullptr) {
626 producer->token =
nullptr;
627 producer->inactive.store(
true, std::memory_order_release);
637 friend class ConcurrentQueueTests;
646 template<
typename T,
typename Traits>
649 template<
typename T,
typename Traits>
653 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
665 std::swap(initialOffset, other.initialOffset);
666 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
667 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
668 std::swap(currentProducer, other.currentProducer);
669 std::swap(desiredProducer, other.desiredProducer);
678 friend class ConcurrentQueueTests;
690 template<
typename T,
typename Traits>
694 template<
typename T,
typename Traits = ConcurrentQueueDefaultTraits>
704 static const size_t BLOCK_SIZE =
static_cast<size_t>(Traits::BLOCK_SIZE);
705 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD =
static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
706 static const size_t EXPLICIT_INITIAL_INDEX_SIZE =
static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
707 static const size_t IMPLICIT_INITIAL_INDEX_SIZE =
static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
708 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE =
static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
709 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE =
static_cast<std::uint32_t
>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
711 #pragma warning(push) 712 #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!) 713 #pragma warning(disable: 4309) // static_cast: Truncation of constant value 720 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
"Traits::size_t must be an unsigned integral type");
721 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
"Traits::index_t must be an unsigned integral type");
722 static_assert(
sizeof(index_t) >=
sizeof(
size_t),
"Traits::index_t must be at least as wide as Traits::size_t");
723 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
"Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
724 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)),
"Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
725 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)),
"Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
726 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)),
"Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
727 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)),
"Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
728 static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1,
"Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
742 : producerListTail(nullptr),
744 initialBlockPoolIndex(0),
745 nextExplicitConsumerId(0),
746 globalExplicitConsumerOffset(0)
748 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
749 populate_initial_implicit_producer_hash();
750 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
752 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 757 explicitProducers.store(
nullptr, std::memory_order_relaxed);
758 implicitProducers.store(
nullptr, std::memory_order_relaxed);
765 ConcurrentQueue(
size_t minCapacity,
size_t maxExplicitProducers,
size_t maxImplicitProducers)
766 : producerListTail(nullptr),
768 initialBlockPoolIndex(0),
769 nextExplicitConsumerId(0),
770 globalExplicitConsumerOffset(0)
772 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
773 populate_initial_implicit_producer_hash();
774 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
775 populate_initial_block_list(blocks);
777 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 778 explicitProducers.store(
nullptr, std::memory_order_relaxed);
779 implicitProducers.store(
nullptr, std::memory_order_relaxed);
789 auto ptr = producerListTail.load(std::memory_order_relaxed);
790 while (ptr !=
nullptr) {
791 auto next = ptr->next_prod();
792 if (ptr->token !=
nullptr) {
793 ptr->token->producer =
nullptr;
800 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
801 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
802 while (hash !=
nullptr) {
803 auto prev = hash->prev;
804 if (prev !=
nullptr) {
805 for (
size_t i = 0; i != hash->capacity; ++i) {
806 hash->entries[i].~ImplicitProducerKVP();
808 hash->~ImplicitProducerHash();
809 (Traits::free)(hash);
816 auto block = freeList.head_unsafe();
817 while (block !=
nullptr) {
818 auto next = block->freeListNext.load(std::memory_order_relaxed);
819 if (block->dynamicallyAllocated) {
826 destroy_array(initialBlockPool, initialBlockPoolSize);
840 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
841 producerCount(other.producerCount.load(std::memory_order_relaxed)),
842 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
843 initialBlockPool(other.initialBlockPool),
844 initialBlockPoolSize(other.initialBlockPoolSize),
845 freeList(std::move(other.freeList)),
846 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
847 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
850 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
851 populate_initial_implicit_producer_hash();
852 swap_implicit_producer_hashes(other);
854 other.producerListTail.store(
nullptr, std::memory_order_relaxed);
855 other.producerCount.store(0, std::memory_order_relaxed);
856 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
857 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
859 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 860 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
861 other.explicitProducers.store(
nullptr, std::memory_order_relaxed);
862 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
863 other.implicitProducers.store(
nullptr, std::memory_order_relaxed);
866 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
867 other.initialBlockPoolSize = 0;
868 other.initialBlockPool =
nullptr;
875 return swap_internal(other);
885 swap_internal(other);
891 if (
this == &other) {
904 swap_implicit_producer_hashes(other);
909 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 925 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
926 return inner_enqueue<CanAlloc>(item);
936 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
937 return inner_enqueue<CanAlloc>(std::move(item));
944 inline bool enqueue(producer_token_t
const& token, T
const& item)
946 return inner_enqueue<CanAlloc>(token, item);
953 inline bool enqueue(producer_token_t
const& token, T&& item)
955 return inner_enqueue<CanAlloc>(token, std::move(item));
964 template<
typename It>
967 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
968 return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
977 template<
typename It>
978 bool enqueue_bulk(producer_token_t
const& token, It itemFirst,
size_t count)
980 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
990 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
991 return inner_enqueue<CannotAlloc>(item);
1001 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1002 return inner_enqueue<CannotAlloc>(std::move(item));
1010 return inner_enqueue<CannotAlloc>(token, item);
1018 return inner_enqueue<CannotAlloc>(token, std::move(item));
1028 template<
typename It>
1031 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1032 return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1040 template<
typename It>
1043 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1052 template<
typename U>
1057 size_t nonEmptyCount = 0;
1058 ProducerBase* best =
nullptr;
1059 size_t bestSize = 0;
1060 for (
auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr !=
nullptr; ptr = ptr->next_prod()) {
1061 auto size = ptr->size_approx();
1063 if (size > bestSize) {
1073 if (nonEmptyCount > 0) {
1077 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1078 if (ptr != best && ptr->dequeue(item)) {
1095 template<
typename U>
1098 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1099 if (ptr->dequeue(item)) {
1110 template<
typename U>
1120 if (!update_current_producer_after_rotation(token)) {
1127 if (static_cast<ProducerBase*>(token.
currentProducer)->dequeue(item)) {
1129 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1134 auto tail = producerListTail.load(std::memory_order_acquire);
1135 auto ptr =
static_cast<ProducerBase*
>(token.
currentProducer)->next_prod();
1136 if (ptr ==
nullptr) {
1140 if (ptr->dequeue(item)) {
1145 ptr = ptr->next_prod();
1146 if (ptr ==
nullptr) {
1158 template<
typename It>
1162 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1163 count += ptr->dequeue_bulk(itemFirst, max - count);
1176 template<
typename It>
1180 if (!update_current_producer_after_rotation(token)) {
1185 size_t count =
static_cast<ProducerBase*
>(token.
currentProducer)->dequeue_bulk(itemFirst, max);
1187 if ((token.
itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1188 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1195 auto tail = producerListTail.load(std::memory_order_acquire);
1196 auto ptr =
static_cast<ProducerBase*
>(token.
currentProducer)->next_prod();
1197 if (ptr ==
nullptr) {
1201 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1203 if (dequeued != 0) {
1207 if (dequeued == max) {
1211 ptr = ptr->next_prod();
1212 if (ptr ==
nullptr) {
1227 template<
typename U>
1230 return static_cast<ExplicitProducer*
>(producer.
producer)->dequeue(item);
1240 template<
typename It>
1243 return static_cast<ExplicitProducer*
>(producer.
producer)->dequeue_bulk(itemFirst, max);
1256 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1257 size += ptr->size_approx();
1281 struct ExplicitProducer;
1282 friend struct ExplicitProducer;
1283 struct ImplicitProducer;
1284 friend struct ImplicitProducer;
1285 friend class ConcurrentQueueTests;
1294 template<AllocationMode canAlloc,
typename U>
1297 return static_cast<ExplicitProducer*
>(token.
producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1300 template<AllocationMode canAlloc,
typename U>
1303 auto producer = get_or_add_implicit_producer();
1304 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1307 template<AllocationMode canAlloc,
typename It>
1310 return static_cast<ExplicitProducer*
>(token.
producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1313 template<AllocationMode canAlloc,
typename It>
1316 auto producer = get_or_add_implicit_producer();
1317 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1323 auto tail = producerListTail.load(std::memory_order_acquire);
1327 auto prodCount = producerCount.load(std::memory_order_relaxed);
1328 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1333 std::uint32_t offset = prodCount - 1 - (token.
initialOffset % prodCount);
1335 for (std::uint32_t i = 0; i != offset; ++i) {
1344 if (delta >= prodCount) {
1345 delta = delta % prodCount;
1347 for (std::uint32_t i = 0; i != delta; ++i) {
1365 template <
typename N>
1377 template<
typename N>
1381 FreeList(
FreeList&& other) : freeListHead(other.freeListHead.load(
std::memory_order_relaxed)) { other.freeListHead.store(
nullptr, std::memory_order_relaxed); }
1389 #if MCDBGQ_NOLOCKFREE_FREELIST 1390 debug::DebugLock lock(mutex);
1394 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1397 add_knowing_refcount_is_zero(node);
1403 #if MCDBGQ_NOLOCKFREE_FREELIST 1404 debug::DebugLock lock(mutex);
1406 auto head = freeListHead.load(std::memory_order_acquire);
1407 while (head !=
nullptr) {
1408 auto prevHead = head;
1409 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1410 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1411 head = freeListHead.load(std::memory_order_acquire);
1417 auto next = head->freeListNext.load(std::memory_order_relaxed);
1418 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1421 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1424 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1431 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1432 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1433 add_knowing_refcount_is_zero(prevHead);
1441 N*
head_unsafe()
const {
return freeListHead.load(std::memory_order_relaxed); }
1454 auto head = freeListHead.load(std::memory_order_relaxed);
1456 node->freeListNext.store(head, std::memory_order_relaxed);
1457 node->freeListRefs.store(1, std::memory_order_release);
1458 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1460 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1472 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1473 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1475 #if MCDBGQ_NOLOCKFREE_FREELIST 1476 debug::DebugMutex mutex;
1490 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true)
1497 template<InnerQueueContext context>
1500 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1502 for (
size_t i = 0; i < BLOCK_SIZE; ++i) {
1503 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1509 std::atomic_thread_fence(std::memory_order_acquire);
1514 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1515 std::atomic_thread_fence(std::memory_order_acquire);
1518 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1524 template<InnerQueueContext context>
1527 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1529 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1530 emptyFlags[BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1))].store(
true, std::memory_order_release);
1535 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1536 assert(prevVal < BLOCK_SIZE);
1537 return prevVal == BLOCK_SIZE - 1;
1543 template<InnerQueueContext context>
1546 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1548 std::atomic_thread_fence(std::memory_order_release);
1549 i = BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1)) - count + 1;
1550 for (
size_t j = 0; j != count; ++j) {
1551 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1552 emptyFlags[i + j].store(
true, std::memory_order_relaxed);
1558 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1559 assert(prevVal + count <= BLOCK_SIZE);
1560 return prevVal + count == BLOCK_SIZE;
1564 template<InnerQueueContext context>
1567 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1569 for (
size_t i = 0; i != BLOCK_SIZE; ++i) {
1570 emptyFlags[i].store(
true, std::memory_order_relaxed);
1575 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1579 template<InnerQueueContext context>
1582 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1584 for (
size_t i = 0; i != BLOCK_SIZE; ++i) {
1585 emptyFlags[i].store(
false, std::memory_order_relaxed);
1590 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1594 inline T*
operator[](index_t idx) MOODYCAMEL_NOEXCEPT {
return static_cast<T*
>(
static_cast<void*
>(elements)) +
static_cast<size_t>(idx &
static_cast<index_t
>(BLOCK_SIZE - 1)); }
1595 inline T
const*
operator[](index_t idx)
const MOODYCAMEL_NOEXCEPT {
return static_cast<T const*
>(
static_cast<void const*
>(elements)) +
static_cast<size_t>(idx &
static_cast<index_t
>(BLOCK_SIZE - 1)); }
1603 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value,
"The queue does not support super-aligned types at this time");
1609 char elements[
sizeof(T) * BLOCK_SIZE];
1615 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1626 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value,
"Internal error: Blocks must be at least as aligned as the type they are wrapping");
1644 dequeueOptimisticCount(0),
1645 dequeueOvercommit(0),
1647 isExplicit(isExplicit_),
1654 template<
typename U>
1665 template<
typename It>
1680 auto tail = tailIndex.load(std::memory_order_relaxed);
1681 auto head = headIndex.load(std::memory_order_relaxed);
1685 inline index_t
getTail()
const {
return tailIndex.load(std::memory_order_relaxed); }
1701 friend struct MemStats;
1714 blockIndex(nullptr),
1715 pr_blockIndexSlotsUsed(0),
1716 pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
1717 pr_blockIndexFront(0),
1718 pr_blockIndexEntries(nullptr),
1719 pr_blockIndexRaw(nullptr)
1722 if (poolBasedIndexSize > pr_blockIndexSize) {
1723 pr_blockIndexSize = poolBasedIndexSize;
1734 if (this->tailBlock !=
nullptr) {
1736 Block* halfDequeuedBlock =
nullptr;
1737 if ((this->headIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0) {
1740 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1741 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1742 i = (i + 1) & (pr_blockIndexSize - 1);
1744 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1745 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1749 auto block = this->tailBlock;
1751 block = block->
next;
1752 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1757 if (block == halfDequeuedBlock) {
1758 i =
static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1));
1762 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE :
static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1));
1763 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1764 (*block)[i++]->~T();
1766 }
while (block != this->tailBlock);
1770 if (this->tailBlock !=
nullptr) {
1771 auto block = this->tailBlock;
1773 auto nextBlock = block->
next;
1774 if (block->dynamicallyAllocated) {
1778 this->parent->add_block_to_free_list(block);
1781 }
while (block != this->tailBlock);
1786 while (header !=
nullptr) {
1788 header->~BlockIndexHeader();
1789 (Traits::free)(header);
1794 template<AllocationMode allocMode,
typename U>
1797 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1798 index_t newTailIndex = 1 + currentTailIndex;
1799 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1801 auto startBlock = this->tailBlock;
1802 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1803 if (this->tailBlock !=
nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1805 this->tailBlock = this->tailBlock->next;
1806 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1818 auto head = this->headIndex.load(std::memory_order_relaxed);
1819 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1820 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1828 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1833 if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
1839 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1840 if (newBlock ==
nullptr) {
1844 newBlock->owner =
this;
1846 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1847 if (this->tailBlock ==
nullptr) {
1848 newBlock->next = newBlock;
1851 newBlock->next = this->tailBlock->next;
1852 this->tailBlock->next = newBlock;
1854 this->tailBlock = newBlock;
1855 ++pr_blockIndexSlotsUsed;
1862 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1867 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1868 this->tailBlock = startBlock ==
nullptr ? this->tailBlock : startBlock;
1874 (void)originalBlockIndexSlotsUsed;
1878 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1879 entry.base = currentTailIndex;
1880 entry.block = this->tailBlock;
1881 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1882 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1885 this->tailIndex.store(newTailIndex, std::memory_order_release);
1891 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1893 this->tailIndex.store(newTailIndex, std::memory_order_release);
1897 template<
typename U>
1900 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1901 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1902 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1919 std::atomic_thread_fence(std::memory_order_acquire);
1922 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1934 tail = this->tailIndex.load(std::memory_order_acquire);
1935 if ((
details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1946 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1951 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1952 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1957 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1958 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1959 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(blockBaseIndex - headBase) / BLOCK_SIZE);
1960 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
1963 auto& el = *((*block)[index]);
1973 (*block)[index]->~T();
1974 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1976 } guard = { block, index };
1978 element = std::move(el);
1981 element = std::move(el);
1983 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1990 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
1997 template<AllocationMode allocMode,
typename It>
2003 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2004 auto startBlock = this->tailBlock;
2005 auto originalBlockIndexFront = pr_blockIndexFront;
2006 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2008 Block* firstAllocatedBlock =
nullptr;
2011 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2012 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2013 if (blockBaseDiff > 0) {
2015 while (blockBaseDiff > 0 && this->tailBlock !=
nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->
next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2016 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2017 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2019 this->tailBlock = this->tailBlock->
next;
2020 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2022 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2023 entry.base = currentTailIndex;
2024 entry.block = this->tailBlock;
2025 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2029 while (blockBaseDiff > 0) {
2030 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2031 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2033 auto head = this->headIndex.load(std::memory_order_relaxed);
2034 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2035 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE !=
details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2036 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2037 if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) {
2039 pr_blockIndexFront = originalBlockIndexFront;
2040 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2041 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2048 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2052 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2053 if (newBlock ==
nullptr) {
2054 pr_blockIndexFront = originalBlockIndexFront;
2055 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2056 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2061 newBlock->owner =
this;
2063 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2064 if (this->tailBlock ==
nullptr) {
2065 newBlock->
next = newBlock;
2068 newBlock->
next = this->tailBlock->
next;
2069 this->tailBlock->
next = newBlock;
2071 this->tailBlock = newBlock;
2072 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2074 ++pr_blockIndexSlotsUsed;
2076 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2077 entry.base = currentTailIndex;
2078 entry.block = this->tailBlock;
2079 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2084 auto block = firstAllocatedBlock;
2086 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2087 if (block == this->tailBlock) {
2090 block = block->
next;
2094 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2099 index_t newTailIndex = startTailIndex +
static_cast<index_t
>(count);
2100 currentTailIndex = startTailIndex;
2101 auto endBlock = this->tailBlock;
2102 this->tailBlock = startBlock;
2103 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2104 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2105 this->tailBlock = firstAllocatedBlock;
2108 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2109 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2110 stopIndex = newTailIndex;
2113 while (currentTailIndex != stopIndex) {
2114 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2119 while (currentTailIndex != stopIndex) {
2136 auto constructedStopIndex = currentTailIndex;
2137 auto lastBlockEnqueued = this->tailBlock;
2139 pr_blockIndexFront = originalBlockIndexFront;
2140 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2141 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2144 auto block = startBlock;
2145 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2146 block = firstAllocatedBlock;
2148 currentTailIndex = startTailIndex;
2150 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2151 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2152 stopIndex = constructedStopIndex;
2154 while (currentTailIndex != stopIndex) {
2155 (*block)[currentTailIndex++]->~T();
2157 if (block == lastBlockEnqueued) {
2160 block = block->
next;
2167 if (this->tailBlock == endBlock) {
2168 assert(currentTailIndex == newTailIndex);
2171 this->tailBlock = this->tailBlock->next;
2175 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2178 this->tailIndex.store(newTailIndex, std::memory_order_release);
2182 template<
typename It>
2185 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2186 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2187 auto desiredCount =
static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2188 if (details::circular_less_than<size_t>(0, desiredCount)) {
2189 desiredCount = desiredCount < max ? desiredCount : max;
2190 std::atomic_thread_fence(std::memory_order_acquire);
2192 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);;
2194 tail = this->tailIndex.load(std::memory_order_acquire);
2195 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2196 if (details::circular_less_than<size_t>(0, actualCount)) {
2197 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2198 if (actualCount < desiredCount) {
2199 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2204 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2207 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2208 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2210 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2211 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2212 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
2213 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2216 auto index = firstIndex;
2218 auto firstIndexInBlock = index;
2219 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2220 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount), endIndex) ? firstIndex +
static_cast<index_t
>(actualCount) : endIndex;
2221 auto block = localBlockIndex->entries[indexIndex].block;
2223 while (index != endIndex) {
2224 auto& el = *((*block)[index]);
2225 *itemFirst++ = std::move(el);
2232 while (index != endIndex) {
2233 auto& el = *((*block)[index]);
2234 *itemFirst = std::move(el);
2245 block = localBlockIndex->entries[indexIndex].block;
2246 while (index != endIndex) {
2247 (*block)[index++]->~T();
2249 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2250 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2252 firstIndexInBlock = index;
2253 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2254 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount), endIndex) ? firstIndex +
static_cast<index_t
>(actualCount) : endIndex;
2255 }
while (index != firstIndex + actualCount);
2260 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2261 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2262 }
while (index != firstIndex + actualCount);
2268 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2293 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2296 pr_blockIndexSize <<= 1;
2297 auto newRawPtr =
static_cast<char*
>((Traits::malloc)(
sizeof(
BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(
BlockIndexEntry) * pr_blockIndexSize));
2298 if (newRawPtr ==
nullptr) {
2299 pr_blockIndexSize >>= 1;
2307 if (pr_blockIndexSlotsUsed != 0) {
2308 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2310 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2311 i = (i + 1) & prevBlockSizeMask;
2312 }
while (i != pr_blockIndexFront);
2317 header->
size = pr_blockIndexSize;
2318 header->
front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2319 header->entries = newBlockIndexEntries;
2320 header->prev = pr_blockIndexRaw;
2322 pr_blockIndexFront = j;
2323 pr_blockIndexEntries = newBlockIndexEntries;
2324 pr_blockIndexRaw = newRawPtr;
2325 blockIndex.store(header, std::memory_order_release);
2340 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 2347 friend struct MemStats;
2360 nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
2373 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 2375 if (!this->inactive.load(std::memory_order_relaxed)) {
2376 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2381 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2382 auto index = this->headIndex.load(std::memory_order_relaxed);
2383 Block* block =
nullptr;
2385 bool forceFreeLastBlock = index != tail;
2386 while (index != tail) {
2387 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block ==
nullptr) {
2388 if (block !=
nullptr) {
2390 this->parent->add_block_to_free_list(block);
2393 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2396 ((*block)[index])->~T();
2402 if (this->tailBlock !=
nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2403 this->parent->add_block_to_free_list(this->tailBlock);
2407 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2408 if (localBlockIndex !=
nullptr) {
2409 for (
size_t i = 0; i != localBlockIndex->capacity; ++i) {
2410 localBlockIndex->index[i]->~BlockIndexEntry();
2413 auto prev = localBlockIndex->prev;
2414 localBlockIndex->~BlockIndexHeader();
2415 (Traits::free)(localBlockIndex);
2416 localBlockIndex = prev;
2417 }
while (localBlockIndex !=
nullptr);
2421 template<AllocationMode allocMode,
typename U>
2424 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2425 index_t newTailIndex = 1 + currentTailIndex;
2426 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2428 auto head = this->headIndex.load(std::memory_order_relaxed);
2429 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2430 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE !=
details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2433 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2434 debug::DebugLock lock(mutex);
2438 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2443 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2444 if (newBlock ==
nullptr) {
2445 rewind_block_index_tail();
2446 idxEntry->
value.store(
nullptr, std::memory_order_relaxed);
2450 newBlock->owner =
this;
2452 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2457 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2460 rewind_block_index_tail();
2461 idxEntry->
value.store(
nullptr, std::memory_order_relaxed);
2462 this->parent->add_block_to_free_list(newBlock);
2468 idxEntry->
value.store(newBlock, std::memory_order_relaxed);
2470 this->tailBlock = newBlock;
2473 this->tailIndex.store(newTailIndex, std::memory_order_release);
2479 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2481 this->tailIndex.store(newTailIndex, std::memory_order_release);
2485 template<
typename U>
2489 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2490 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2491 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2492 std::atomic_thread_fence(std::memory_order_acquire);
2494 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2495 tail = this->tailIndex.load(std::memory_order_acquire);
2496 if ((
details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2497 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2500 auto entry = get_block_index_entry_for_index(index);
2503 auto block = entry->value.load(std::memory_order_relaxed);
2504 auto& el = *((*block)[index]);
2507 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2510 debug::DebugLock lock(producer->mutex);
2520 (*block)[index]->~T();
2521 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2522 entry->
value.store(
nullptr, std::memory_order_relaxed);
2526 } guard = { block, index, entry, this->parent };
2528 element = std::move(el);
2531 element = std::move(el);
2534 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2536 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2537 debug::DebugLock lock(mutex);
2540 entry->value.store(
nullptr, std::memory_order_relaxed);
2542 this->parent->add_block_to_free_list(block);
2549 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2556 template<AllocationMode allocMode,
typename It>
2568 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2569 auto startBlock = this->tailBlock;
2570 Block* firstAllocatedBlock =
nullptr;
2571 auto endBlock = this->tailBlock;
2574 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2575 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2576 if (blockBaseDiff > 0) {
2577 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2578 debug::DebugLock lock(mutex);
2581 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2582 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2587 bool indexInserted =
false;
2588 auto head = this->headIndex.load(std::memory_order_relaxed);
2589 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2590 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE !=
details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2591 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
nullptr) {
2594 if (indexInserted) {
2595 rewind_block_index_tail();
2596 idxEntry->
value.store(
nullptr, std::memory_order_relaxed);
2598 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2599 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->
next) {
2600 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2601 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2602 idxEntry->
value.store(
nullptr, std::memory_order_relaxed);
2603 rewind_block_index_tail();
2605 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2606 this->tailBlock = startBlock;
2612 newBlock->owner =
this;
2614 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2615 newBlock->
next =
nullptr;
2618 idxEntry->
value.store(newBlock, std::memory_order_relaxed);
2622 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr) {
2623 assert(this->tailBlock !=
nullptr);
2624 this->tailBlock->next = newBlock;
2626 this->tailBlock = newBlock;
2627 endBlock = newBlock;
2628 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? newBlock : firstAllocatedBlock;
2629 }
while (blockBaseDiff > 0);
2633 index_t newTailIndex = startTailIndex +
static_cast<index_t
>(count);
2634 currentTailIndex = startTailIndex;
2635 this->tailBlock = startBlock;
2636 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2637 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2638 this->tailBlock = firstAllocatedBlock;
2641 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2642 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2643 stopIndex = newTailIndex;
2646 while (currentTailIndex != stopIndex) {
2647 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2652 while (currentTailIndex != stopIndex) {
2659 auto constructedStopIndex = currentTailIndex;
2660 auto lastBlockEnqueued = this->tailBlock;
2663 auto block = startBlock;
2664 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2665 block = firstAllocatedBlock;
2667 currentTailIndex = startTailIndex;
2669 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2670 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2671 stopIndex = constructedStopIndex;
2673 while (currentTailIndex != stopIndex) {
2674 (*block)[currentTailIndex++]->~T();
2676 if (block == lastBlockEnqueued) {
2679 block = block->
next;
2683 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2684 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->
next) {
2685 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2686 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2687 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2688 rewind_block_index_tail();
2690 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2691 this->tailBlock = startBlock;
2696 if (this->tailBlock == endBlock) {
2697 assert(currentTailIndex == newTailIndex);
2700 this->tailBlock = this->tailBlock->next;
2702 this->tailIndex.store(newTailIndex, std::memory_order_release);
2706 template<
typename It>
2709 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2710 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2711 auto desiredCount =
static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2712 if (details::circular_less_than<size_t>(0, desiredCount)) {
2713 desiredCount = desiredCount < max ? desiredCount : max;
2714 std::atomic_thread_fence(std::memory_order_acquire);
2716 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2718 tail = this->tailIndex.load(std::memory_order_acquire);
2719 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2720 if (details::circular_less_than<size_t>(0, actualCount)) {
2721 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2722 if (actualCount < desiredCount) {
2723 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2728 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2731 auto index = firstIndex;
2733 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2735 auto blockStartIndex = index;
2736 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2737 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount), endIndex) ? firstIndex +
static_cast<index_t
>(actualCount) : endIndex;
2739 auto entry = localBlockIndex->
index[indexIndex];
2740 auto block = entry->
value.load(std::memory_order_relaxed);
2742 while (index != endIndex) {
2743 auto& el = *((*block)[index]);
2744 *itemFirst++ = std::move(el);
2751 while (index != endIndex) {
2752 auto& el = *((*block)[index]);
2753 *itemFirst = std::move(el);
2761 entry = localBlockIndex->
index[indexIndex];
2762 block = entry->
value.load(std::memory_order_relaxed);
2763 while (index != endIndex) {
2764 (*block)[index++]->~T();
2767 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2768 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2769 debug::DebugLock lock(mutex);
2771 entry->value.store(
nullptr, std::memory_order_relaxed);
2772 this->parent->add_block_to_free_list(block);
2774 indexIndex = (indexIndex + 1) & (localBlockIndex->
capacity - 1);
2776 blockStartIndex = index;
2777 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2778 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount), endIndex) ? firstIndex +
static_cast<index_t
>(actualCount) : endIndex;
2779 }
while (index != firstIndex + actualCount);
2784 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2786 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2787 debug::DebugLock lock(mutex);
2791 entry->value.store(
nullptr, std::memory_order_relaxed);
2793 this->parent->add_block_to_free_list(block);
2795 indexIndex = (indexIndex + 1) & (localBlockIndex->
capacity - 1);
2796 }
while (index != firstIndex + actualCount);
2801 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2810 static const index_t INVALID_BLOCK_BASE = 1;
2827 template<AllocationMode allocMode>
2830 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2831 if (localBlockIndex ==
nullptr) {
2834 auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2835 idxEntry = localBlockIndex->index[newTail];
2836 if (idxEntry->
key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2837 idxEntry->
value.load(std::memory_order_relaxed) ==
nullptr) {
2839 idxEntry->
key.store(blockStartIndex, std::memory_order_relaxed);
2840 localBlockIndex->tail.store(newTail, std::memory_order_release);
2845 if (allocMode == CannotAlloc || !new_block_index()) {
2848 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2849 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2850 idxEntry = localBlockIndex->index[newTail];
2851 assert(idxEntry->
key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2852 idxEntry->
key.store(blockStartIndex, std::memory_order_relaxed);
2853 localBlockIndex->tail.store(newTail, std::memory_order_release);
2859 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2860 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2866 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2867 return localBlockIndex->
index[idx];
2872 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2873 debug::DebugLock lock(mutex);
2875 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2876 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2877 auto tail = localBlockIndex->
tail.load(std::memory_order_acquire);
2878 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2879 assert(tailBase != INVALID_BLOCK_BASE);
2882 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(index - tailBase) / BLOCK_SIZE);
2883 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2884 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) !=
nullptr);
2890 auto prev = blockIndex.load(std::memory_order_relaxed);
2891 size_t prevCapacity = prev ==
nullptr ? 0 : prev->capacity;
2892 auto entryCount = prev ==
nullptr ? nextBlockIndexCapacity : prevCapacity;
2893 auto raw =
static_cast<char*
>((Traits::malloc)(
2895 std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(
BlockIndexEntry) * entryCount +
2896 std::alignment_of<BlockIndexEntry*>::value - 1 +
sizeof(
BlockIndexEntry*) * nextBlockIndexCapacity));
2897 if (raw ==
nullptr) {
2903 auto index =
reinterpret_cast<BlockIndexEntry**
>(details::align_for<BlockIndexEntry*>(
reinterpret_cast<char*
>(entries) +
sizeof(
BlockIndexEntry) * entryCount));
2904 if (prev !=
nullptr) {
2905 auto prevTail = prev->tail.load(std::memory_order_relaxed);
2906 auto prevPos = prevTail;
2909 prevPos = (prevPos + 1) & (prev->capacity - 1);
2910 index[i++] = prev->index[prevPos];
2911 }
while (prevPos != prevTail);
2912 assert(i == prevCapacity);
2914 for (
size_t i = 0; i != entryCount; ++i) {
2916 entries[i].
key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2917 index[prevCapacity + i] = entries + i;
2919 header->prev = prev;
2920 header->entries = entries;
2921 header->index = index;
2922 header->capacity = nextBlockIndexCapacity;
2923 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
2925 blockIndex.store(header, std::memory_order_release);
2927 nextBlockIndexCapacity <<= 1;
2936 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 2938 details::ThreadExitListener threadExitListener;
2942 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 2948 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2949 mutable debug::DebugMutex mutex;
2952 friend struct MemStats;
2963 initialBlockPoolSize = blockCount;
2964 if (initialBlockPoolSize == 0) {
2965 initialBlockPool =
nullptr;
2969 initialBlockPool = create_array<Block>(blockCount);
2970 if (initialBlockPool ==
nullptr) {
2971 initialBlockPoolSize = 0;
2973 for (
size_t i = 0; i < initialBlockPoolSize; ++i) {
2974 initialBlockPool[i].dynamicallyAllocated =
false;
2980 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
2984 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
2986 return index < initialBlockPoolSize ? (initialBlockPool + index) :
nullptr;
2992 block->owner =
nullptr;
2994 freeList.add(block);
2999 while (block !=
nullptr) {
3000 auto next = block->next;
3001 add_block_to_free_list(block);
3008 return freeList.try_get();
3012 template<AllocationMode canAlloc>
3015 auto block = try_get_block_from_initial_pool();
3016 if (block !=
nullptr) {
3020 block = try_get_block_from_free_list();
3021 if (block !=
nullptr) {
3025 if (canAlloc == CanAlloc) {
3026 return create<Block>();
3036 size_t allocatedBlocks;
3039 size_t ownedBlocksExplicit;
3040 size_t ownedBlocksImplicit;
3041 size_t implicitProducers;
3042 size_t explicitProducers;
3043 size_t elementsEnqueued;
3044 size_t blockClassBytes;
3045 size_t queueClassBytes;
3046 size_t implicitBlockIndexBytes;
3047 size_t explicitBlockIndexBytes;
3054 MemStats stats = { 0 };
3058 auto block = q->
freeList.head_unsafe();
3059 while (block !=
nullptr) {
3060 ++stats.allocatedBlocks;
3062 block = block->freeListNext.load(std::memory_order_relaxed);
3065 for (
auto ptr = q->
producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3066 bool implicit =
dynamic_cast<ImplicitProducer*
>(ptr) !=
nullptr;
3067 stats.implicitProducers += implicit ? 1 : 0;
3068 stats.explicitProducers += implicit ? 0 : 1;
3071 auto prod =
static_cast<ImplicitProducer*
>(ptr);
3072 stats.queueClassBytes +=
sizeof(ImplicitProducer);
3073 auto head = prod->headIndex.load(std::memory_order_relaxed);
3074 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3075 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3076 if (hash !=
nullptr) {
3077 for (
size_t i = 0; i != hash->capacity; ++i) {
3078 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) !=
nullptr) {
3079 ++stats.allocatedBlocks;
3080 ++stats.ownedBlocksImplicit;
3083 stats.implicitBlockIndexBytes += hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry);
3084 for (; hash !=
nullptr; hash = hash->prev) {
3085 stats.implicitBlockIndexBytes +=
sizeof(
typename ImplicitProducer::BlockIndexHeader) + hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry*);
3088 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3094 auto prod =
static_cast<ExplicitProducer*
>(ptr);
3095 stats.queueClassBytes +=
sizeof(ExplicitProducer);
3096 auto tailBlock = prod->tailBlock;
3097 bool wasNonEmpty =
false;
3098 if (tailBlock !=
nullptr) {
3099 auto block = tailBlock;
3101 ++stats.allocatedBlocks;
3102 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3104 wasNonEmpty = wasNonEmpty || block != tailBlock;
3106 ++stats.ownedBlocksExplicit;
3107 block = block->next;
3108 }
while (block != tailBlock);
3110 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3111 while (index !=
nullptr) {
3112 stats.explicitBlockIndexBytes +=
sizeof(
typename ExplicitProducer::BlockIndexHeader) + index->size *
sizeof(
typename ExplicitProducer::BlockIndexEntry);
3113 index =
static_cast<typename ExplicitProducer::BlockIndexHeader*
>(index->prev);
3119 stats.allocatedBlocks += freeOnInitialPool;
3120 stats.freeBlocks += freeOnInitialPool;
3122 stats.blockClassBytes =
sizeof(Block) * stats.allocatedBlocks;
3130 MemStats getMemStats()
3132 return MemStats::getFor(
this);
3135 friend struct MemStats;
3146 return recycle_or_create_producer(isExplicit, recycled);
3151 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3152 debug::DebugLock lock(implicitProdMutex);
3155 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3156 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3157 bool expected =
true;
3158 if (ptr->inactive.compare_exchange_strong(expected,
false, std::memory_order_acquire, std::memory_order_relaxed)) {
3167 return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(
this)) : create<ImplicitProducer>(
this));
3173 if (producer ==
nullptr) {
3177 producerCount.fetch_add(1, std::memory_order_relaxed);
3180 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3182 producer->next = prevTail;
3183 }
while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3185 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 3186 if (producer->isExplicit) {
3187 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3189 static_cast<ExplicitProducer*
>(producer)->nextExplicitProducer = prevTailExplicit;
3190 }
while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3193 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3195 static_cast<ImplicitProducer*
>(producer)->nextImplicitProducer = prevTailImplicit;
3196 }
while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3208 for (
auto ptr = producerListTail.load(std::memory_order_relaxed); ptr !=
nullptr; ptr = ptr->next_prod()) {
3220 std::atomic<details::thread_id_t>
key;
3227 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3228 value = other.value;
3239 if (
this != &other) {
3246 template<
typename XT,
typename XTraits>
3258 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return;
3260 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3261 auto hash = &initialImplicitProducerHash;
3262 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3263 hash->entries = &initialImplicitProducerHashEntries[0];
3264 for (
size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3267 hash->prev =
nullptr;
3268 implicitProducerHash.store(hash, std::memory_order_relaxed);
3273 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return;
3277 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3284 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3287 ImplicitProducerHash* hash;
3291 hash->prev = &initialImplicitProducerHash;
3293 if (other.
implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3297 ImplicitProducerHash* hash;
3298 for (hash = other.
implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3318 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3319 debug::DebugLock lock(implicitProdMutex);
3325 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3326 for (
auto hash = mainHash; hash !=
nullptr; hash = hash->prev) {
3328 auto index = hashedId;
3330 index &= hash->capacity - 1;
3332 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3333 if (probedKey ==
id) {
3339 auto value = hash->entries[index].value;
3340 if (hash != mainHash) {
3343 index &= mainHash->capacity - 1;
3344 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3346 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3348 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3349 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_acquire, std::memory_order_acquire))) {
3351 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3353 mainHash->entries[index].value = value;
3370 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3372 if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3377 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3378 if (newCount >= (mainHash->capacity >> 1)) {
3379 auto newCapacity = mainHash->capacity << 1;
3380 while (newCount >= (newCapacity >> 1)) {
3383 auto raw =
static_cast<char*
>((Traits::malloc)(
sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 +
sizeof(ImplicitProducerKVP) * newCapacity));
3384 if (raw ==
nullptr) {
3386 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3387 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3391 auto newHash =
new (raw) ImplicitProducerHash;
3392 newHash->capacity = newCapacity;
3393 newHash->entries =
reinterpret_cast<ImplicitProducerKVP*
>(details::align_for<ImplicitProducerKVP>(raw +
sizeof(ImplicitProducerHash)));
3394 for (
size_t i = 0; i != newCapacity; ++i) {
3395 new (newHash->entries + i) ImplicitProducerKVP;
3398 newHash->prev = mainHash;
3399 implicitProducerHash.store(newHash, std::memory_order_release);
3400 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3404 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3411 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3413 auto producer =
static_cast<ImplicitProducer*
>(recycle_or_create_producer(
false, recycled));
3414 if (producer ==
nullptr) {
3415 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3419 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3422 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3423 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3424 producer->threadExitListener.userData = producer;
3425 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3428 auto index = hashedId;
3430 index &= mainHash->capacity - 1;
3431 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3434 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3436 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3437 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_acquire, std::memory_order_acquire))) {
3439 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3441 mainHash->entries[index].value = producer;
3452 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3456 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3457 void implicit_producer_thread_exited(ImplicitProducer* producer)
3460 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3463 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3464 debug::DebugLock lock(implicitProdMutex);
3466 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3467 assert(hash !=
nullptr);
3474 for (; hash !=
nullptr; hash = hash->prev) {
3475 auto index = hashedId;
3477 index &= hash->capacity - 1;
3478 probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3479 if (probedKey ==
id) {
3488 producer->inactive.store(
true, std::memory_order_release);
3491 static void implicit_producer_thread_exited_callback(
void* userData)
3493 auto producer =
static_cast<ImplicitProducer*
>(userData);
3494 auto queue = producer->parent;
3495 queue->implicit_producer_thread_exited(producer);
3503 template<
typename U>
3507 auto p =
static_cast<U*
>((Traits::malloc)(
sizeof(U) * count));
3512 for (
size_t i = 0; i != count; ++i) {
3518 template<
typename U>
3523 for (
size_t i = count; i != 0; ) {
3530 template<
typename U>
3533 auto p = (Traits::malloc)(
sizeof(U));
3534 return p !=
nullptr ?
new (p) U :
nullptr;
3537 template<
typename U,
typename A1>
3540 auto p = (Traits::malloc)(
sizeof(U));
3541 return p !=
nullptr ?
new (p) U(std::forward<A1>(a1)) :
nullptr;
3544 template<
typename U>
3561 #if !MCDBGQ_USEDEBUGFREELIST 3564 debug::DebugFreeList<Block> freeList;
3576 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3577 debug::DebugMutex implicitProdMutex;
3580 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 3581 std::atomic<ExplicitProducer*> explicitProducers;
3582 std::atomic<ImplicitProducer*> implicitProducers;
3587 template<
typename T,
typename Traits>
3589 : producer(queue.recycle_or_create_producer(true))
3596 template<
typename T,
typename Traits>
3605 template<
typename T,
typename Traits>
3607 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3613 template<
typename T,
typename Traits>
3621 template<
typename T,
typename Traits>
3637 template<
typename T,
typename Traits>
3645 #if defined(__GNUC__) 3646 #pragma GCC diagnostic pop bool try_enqueue(T const &item)
#define MOODYCAMEL_NOEXCEPT
std::uintptr_t thread_id_t
bool set_many_empty(index_t i, size_t count)
void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT
size_t size_approx() const
static void free(void *ptr)
std::atomic< size_t > elementsCompletelyDequeued
details::ConcurrentQueueProducerTypelessBase * producer
size_t pr_blockIndexSlotsUsed
T const * operator[](index_t idx) const MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_RETHROW
static bool is_lock_free()
std::atomic< std::uint32_t > freeListRefs
details::max_align_t dummy
ConsumerToken & operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
::moodycamel::ConsumerToken consumer_token_t
static T const & nomove(T const &x)
std::array< ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE > initialImplicitProducerHashEntries
static std::uint64_t hash(std::uint64_t h)
BlockIndexEntry * pr_blockIndexEntries
std::atomic< BlockIndexHeader * > blockIndex
Block * requisition_block()
void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT
size_t dequeue_bulk(It &itemFirst, size_t max)
ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
std::atomic< index_t > key
#define MOODYCAMEL_CATCH(...)
static T const & eval(T const &x)
void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT
bool try_enqueue(T &&item)
std::atomic< N * > freeListNext
std::atomic< details::thread_id_t > key
bool dynamicallyAllocated
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ExplicitProducer(ConcurrentQueue *parent)
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
bool enqueue(U &&element)
size_t size_approx() const
bool inner_enqueue(producer_token_t const &token, U &&element)
void swap(T &lhs, T &rhs)
static void destroy_array(U *p, size_t count)
std::atomic< ImplicitProducerHash * > implicitProducerHash
ProducerBase * recycle_or_create_producer(bool isExplicit)
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ProducerBase * add_producer(ProducerBase *producer)
FreeList< Block > freeList
static void destroy(U *p)
bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex)
std::uint32_t lastKnownGlobalOffset
ImplicitProducerKVP & operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
ConcurrentQueueProducerTypelessBase * next
size_t try_dequeue_bulk(It itemFirst, size_t max)
void swap_implicit_producer_hashes(ConcurrentQueue &other)
details::ConcurrentQueueProducerTypelessBase * currentProducer
std::atomic< bool > shouldBeOnFreeList
ConsumerToken(ConcurrentQueue< T, Traits > &q)
bool try_enqueue(producer_token_t const &token, T const &item)
static void swap_relaxed(std::atomic< T > &left, std::atomic< T > &right)
void add_blocks_to_free_list(Block *block)
static thread_id_t thread_id()
bool enqueue(T const &item)
FreeList(FreeList &&other)
static U * create_array(size_t count)
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
bool new_block_index(size_t numberOfFilledSlotsToExpose)
std::atomic< index_t > tailIndex
std::uint32_t initialOffset
ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
bool update_current_producer_after_rotation(consumer_token_t &token)
std::atomic< Block * > freeListNext
static thread_id_hash_t prehash(thread_id_t const &x)
std::atomic< index_t > dequeueOvercommit
thread_id_t thread_id_hash_t
std::atomic< std::uint32_t > globalExplicitConsumerOffset
static std::uint32_t hash(std::uint32_t h)
bool inner_enqueue_bulk(It itemFirst, size_t count)
static char * align_for(char *ptr)
static bool circular_less_than(T a, T b)
bool try_enqueue(producer_token_t const &token, T &&item)
bool enqueue_bulk(It itemFirst, size_t count)
bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
void swap(typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &a, typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT
bool enqueue_bulk(It itemFirst, size_t count)
ProducerBase * next_prod() const
std::atomic< std::uint32_t > nextExplicitConsumerId
std::atomic< N * > freeListHead
static const thread_id_t invalid_thread_id
ImplicitProducerHash * prev
static bool() unlikely(bool x)
size_t dequeue_bulk(It &itemFirst, size_t max)
static U * create(A1 &&a1)
bool enqueue(U &&element)
static void * malloc(size_t size)
bool inner_enqueue(U &&element)
std::atomic< std::uint32_t > producerCount
#define MOODYCAMEL_THREADLOCAL
ProducerToken & operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
bool try_dequeue(consumer_token_t &token, U &item)
void add_knowing_refcount_is_zero(N *node)
ConcurrentQueue & swap_internal(ConcurrentQueue &other)
thread_id_t thread_id_numeric_size_t
static auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
void populate_initial_implicit_producer_hash()
ImplicitProducer(ConcurrentQueue *parent)
std::uint32_t itemsConsumedFromCurrent
ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
bool try_dequeue_non_interleaved(U &item)
void rewind_block_index_tail()
std::atomic< index_t > dequeueOptimisticCount
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
ProducerToken(ConcurrentQueue< T, Traits > &queue)
T * operator[](index_t idx) MOODYCAMEL_NOEXCEPT
ImplicitProducer * get_or_add_implicit_producer()
size_t get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const
std::atomic_flag implicitProducerHashResizeInProgress
void add_block_to_free_list(Block *block)
std::atomic< index_t > headIndex
ConcurrentQueue & operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
std::atomic< std::uint32_t > freeListRefs
ImplicitProducerKVP * entries
size_t nextBlockIndexCapacity
static size_t hash_thread_id(thread_id_t id)
ImplicitProducerHash initialImplicitProducerHash
bool try_enqueue_bulk(It itemFirst, size_t count)
bool enqueue_bulk(It itemFirst, size_t count)
#define MOODYCAMEL_DELETE_FUNCTION
std::atomic< size_t > initialBlockPoolIndex
size_t pr_blockIndexFront
std::atomic< Block * > value
BlockIndexEntry * get_block_index_entry_for_index(index_t index) const
size_t initialBlockPoolSize
static T ceil_to_pow_2(T x)
std::atomic< BlockIndexHeader * > blockIndex
std::atomic< bool > inactive
std::atomic< size_t > implicitProducerHashCount
ProducerBase * recycle_or_create_producer(bool isExplicit, bool &recycled)
ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
Block * try_get_block_from_free_list()
static bool() likely(bool x)
bool set_empty(index_t i)
ConcurrentQueueProducerTypelessBase()
ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
static auto eval(U &&x) -> decltype(std::forward< U >(x))
void swap(FreeList &other)
std::max_align_t std_max_align_t
bool try_dequeue(U &item)
bool enqueue(producer_token_t const &token, T const &item)
Block * try_get_block_from_initial_pool()
void populate_initial_block_list(size_t blockCount)
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
details::ConcurrentQueueProducerTypelessBase * desiredProducer
bool enqueue(producer_token_t const &token, T &&item)
static const thread_id_t invalid_thread_id2
::moodycamel::ProducerToken producer_token_t
std::atomic< ProducerBase * > producerListTail
ConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
size_t dequeue_bulk(It &itemFirst, size_t max)
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)