CARLA
ConcurrentQueue.h
Go to the documentation of this file.
1 // Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
2 // An overview, including benchmark results, is provided here:
3 // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4 // The full design is also described in excruciating detail at:
5 // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6 
7 // Simplified BSD license:
8 // Copyright (c) 2013-2016, Cameron Desrochers.
9 // All rights reserved.
10 //
11 // Redistribution and use in source and binary forms, with or without modification,
12 // are permitted provided that the following conditions are met:
13 //
14 // - Redistributions of source code must retain the above copyright notice, this list of
15 // conditions and the following disclaimer.
16 // - Redistributions in binary form must reproduce the above copyright notice, this list of
17 // conditions and the following disclaimer in the documentation and/or other materials
18 // provided with the distribution.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23 // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
25 // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26 // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27 // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
28 // EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 
30 // Notice: This file has been slightly adapted for its use by CARLA.
31 
32 #pragma once
33 
34 #if defined(__GNUC__)
35 // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
36 // Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
37 // upon assigning any computed values)
38 #pragma GCC diagnostic push
39 #pragma GCC diagnostic ignored "-Wconversion"
40 
41 #ifdef MCDBGQ_USE_RELACY
42 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
43 #endif
44 #endif
45 
46 #if defined(__APPLE__)
47 #include "TargetConditionals.h"
48 #endif
49 
50 #ifdef MCDBGQ_USE_RELACY
51 #include "relacy/relacy_std.hpp"
52 #include "relacy_shims.h"
53 // We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
54 // We'll override the default trait malloc ourselves without a macro.
55 #undef new
56 #undef delete
57 #undef malloc
58 #undef free
59 #else
60 #include <atomic> // Requires C++11. Sorry VS2010.
61 #include <cassert>
62 #endif
63 #include <cstddef> // for max_align_t
64 #include <cstdint>
65 #include <cstdlib>
66 #include <type_traits>
67 #include <algorithm>
68 #include <utility>
69 #include <limits>
70 #include <climits> // for CHAR_BIT
71 #include <array>
72 #include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
73 
74 // Platform-specific definitions of a numeric thread ID type and an invalid value
75 namespace moodycamel { namespace details {
76  template<typename thread_id_t> struct thread_id_converter {
79  static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
80  };
81 } }
82 #if defined(MCDBGQ_USE_RELACY)
83 namespace moodycamel { namespace details {
84  typedef std::uint32_t thread_id_t;
85  static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
86  static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
87  static inline thread_id_t thread_id() { return rl::thread_index(); }
88 } }
89 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
90 // No sense pulling in windows.h in a header, we'll manually declare the function
91 // we use and rely on backwards-compatibility for this not to break
92 extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
93 namespace moodycamel { namespace details {
94  static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
95  typedef std::uint32_t thread_id_t;
96  static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
97  static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
98  static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
99 } }
100 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
101 namespace moodycamel { namespace details {
102  static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
103 
104  typedef std::thread::id thread_id_t;
105  static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
106 
107  // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
108  // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
109  // be.
110  static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
111 
112  template<std::size_t> struct thread_id_size { };
113  template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
114  template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
115 
116  template<> struct thread_id_converter<thread_id_t> {
117  typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
118 #ifndef __APPLE__
119  typedef std::size_t thread_id_hash_t;
120 #else
121  typedef thread_id_numeric_size_t thread_id_hash_t;
122 #endif
123 
124  static thread_id_hash_t prehash(thread_id_t const& x)
125  {
126 #ifndef __APPLE__
127  return std::hash<std::thread::id>()(x);
128 #else
129  return *reinterpret_cast<thread_id_hash_t const*>(&x);
130 #endif
131  }
132  };
133 } }
134 #else
135 // Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
136 // In order to get a numeric thread ID in a platform-independent way, we use a thread-local
137 // static variable's address as a thread identifier :-)
138 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
139 #define MOODYCAMEL_THREADLOCAL __thread
140 #elif defined(_MSC_VER)
141 #define MOODYCAMEL_THREADLOCAL __declspec(thread)
142 #else
143 // Assume C++11 compliant compiler
144 #define MOODYCAMEL_THREADLOCAL thread_local
145 #endif
146 namespace moodycamel { namespace details {
147  typedef std::uintptr_t thread_id_t;
148  static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
149  static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
150  static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
151 } }
152 #endif
153 
154 // Exceptions
155 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
156 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
157 #define MOODYCAMEL_EXCEPTIONS_ENABLED
158 #endif
159 #endif
160 
161 // ~~~ @begin Modified for CARLA ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
162 
163 #include <carla/Exception.h>
164 
165 #if (defined(LIBCARLA_NO_EXCEPTIONS) && defined(MOODYCAMEL_EXCEPTIONS_ENABLED))
166 # undef MOODYCAMEL_EXCEPTIONS_ENABLED
167 #endif
168 
169 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
170 #define MOODYCAMEL_TRY try
171 #define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
172 #define MOODYCAMEL_RETHROW throw
173 #define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr)
174 #else
175 #define MOODYCAMEL_TRY if (true)
176 #define MOODYCAMEL_CATCH(...) else if (false)
177 #define MOODYCAMEL_RETHROW
178 #define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr)
179 #endif
180 
181 // ~~~ @end Modified for CARLA ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
182 
183 #ifndef MOODYCAMEL_NOEXCEPT
184 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
185 #define MOODYCAMEL_NOEXCEPT
186 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
187 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
188 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
189 // VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
190 // We have to assume *all* non-trivial constructors may throw on VS2012!
191 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
192 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
193 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
194 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
195 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
196 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
197 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
198 #else
199 #define MOODYCAMEL_NOEXCEPT noexcept
200 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
201 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
202 #endif
203 #endif
204 
205 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
206 #ifdef MCDBGQ_USE_RELACY
207 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
208 #else
209 // VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
210 // g++ <=4.7 doesn't support thread_local either.
211 // Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
212 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
213 // Assume `thread_local` is fully supported in all other C++11 compilers/platforms
214 //#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on
215 #endif
216 #endif
217 #endif
218 
219 // VS2012 doesn't support deleted functions.
220 // In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
221 #ifndef MOODYCAMEL_DELETE_FUNCTION
222 #if defined(_MSC_VER) && _MSC_VER < 1800
223 #define MOODYCAMEL_DELETE_FUNCTION
224 #else
225 #define MOODYCAMEL_DELETE_FUNCTION = delete
226 #endif
227 #endif
228 
229 // Compiler-specific likely/unlikely hints
230 namespace moodycamel { namespace details {
231 #if defined(__GNUC__)
232  static inline bool (likely)(bool x) { return __builtin_expect((x), true); }
233  static inline bool (unlikely)(bool x) { return __builtin_expect((x), false); }
234 #else
235  static inline bool (likely)(bool x) { return x; }
236  static inline bool (unlikely)(bool x) { return x; }
237 #endif
238 } }
239 
240 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
241 #include "internal/concurrentqueue_internal_debug.h"
242 #endif
243 
244 namespace moodycamel {
245 namespace details {
246  template<typename T>
248  static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
249  static const T value = std::numeric_limits<T>::is_signed
250  ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
251  : static_cast<T>(-1);
252  };
253 
254 #if defined(__GLIBCXX__)
255  typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
256 #else
257  typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
258 #endif
259 
260  // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
261  // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
262  typedef union {
263  std_max_align_t x;
264  long long y;
265  void* z;
266  } max_align_t;
267 }
268 
269 // Default traits for the ConcurrentQueue. To change some of the
270 // traits without re-implementing all of them, inherit from this
271 // struct and shadow the declarations you wish to be different;
272 // since the traits are used as a template type parameter, the
273 // shadowed declarations will be used where defined, and the defaults
274 // otherwise.
276 {
277  // General-purpose size type. std::size_t is strongly recommended.
278  typedef std::size_t size_t;
279 
280  // The type used for the enqueue and dequeue indices. Must be at least as
281  // large as size_t. Should be significantly larger than the number of elements
282  // you expect to hold at once, especially if you have a high turnover rate;
283  // for example, on 32-bit x86, if you expect to have over a hundred million
284  // elements or pump several million elements through your queue in a very
285  // short space of time, using a 32-bit type *may* trigger a race condition.
286  // A 64-bit int type is recommended in that case, and in practice will
287  // prevent a race condition no matter the usage of the queue. Note that
288  // whether the queue is lock-free with a 64-int type depends on the whether
289  // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
290  typedef std::size_t index_t;
291 
292  // Internally, all elements are enqueued and dequeued from multi-element
293  // blocks; this is the smallest controllable unit. If you expect few elements
294  // but many producers, a smaller block size should be favoured. For few producers
295  // and/or many elements, a larger block size is preferred. A sane default
296  // is provided. Must be a power of 2.
297  static const size_t BLOCK_SIZE = 32;
298 
299  // For explicit producers (i.e. when using a producer token), the block is
300  // checked for being empty by iterating through a list of flags, one per element.
301  // For large block sizes, this is too inefficient, and switching to an atomic
302  // counter-based approach is faster. The switch is made for block sizes strictly
303  // larger than this threshold.
304  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
305 
306  // How many full blocks can be expected for a single explicit producer? This should
307  // reflect that number's maximum for optimal performance. Must be a power of 2.
308  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
309 
310  // How many full blocks can be expected for a single implicit producer? This should
311  // reflect that number's maximum for optimal performance. Must be a power of 2.
312  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
313 
314  // The initial size of the hash table mapping thread IDs to implicit producers.
315  // Note that the hash is resized every time it becomes half full.
316  // Must be a power of two, and either 0 or at least 1. If 0, implicit production
317  // (using the enqueue methods without an explicit producer token) is disabled.
318  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
319 
320  // Controls the number of items that an explicit consumer (i.e. one with a token)
321  // must consume before it causes all consumers to rotate and move on to the next
322  // internal queue.
323  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
324 
325  // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
326  // Enqueue operations that would cause this limit to be surpassed will fail. Note
327  // that this limit is enforced at the block level (for performance reasons), i.e.
328  // it's rounded up to the nearest block size.
329  static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
330 
331 
332 #ifndef MCDBGQ_USE_RELACY
333  // Memory allocation can be customized if needed.
334  // malloc should return nullptr on failure, and handle alignment like std::malloc.
335 #if defined(malloc) || defined(free)
336  // Gah, this is 2015, stop defining macros that break standard code already!
337  // Work around malloc/free being special macros:
338  static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
339  static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
340  static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
341  static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
342 #else
343  static inline void* malloc(size_t size) { return std::malloc(size); }
344  static inline void free(void* ptr) { return std::free(ptr); }
345 #endif
346 #else
347  // Debug versions when running under the Relacy race detector (ignore
348  // these in user code)
349  static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
350  static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
351 #endif
352 };
353 
354 
355 // When producing or consuming many elements, the most efficient way is to:
356 // 1) Use one of the bulk-operation methods of the queue with a token
357 // 2) Failing that, use the bulk-operation methods without a token
358 // 3) Failing that, create a token and use that with the single-item methods
359 // 4) Failing that, use the single-parameter methods of the queue
360 // Having said that, don't create tokens willy-nilly -- ideally there should be
361 // a maximum of one token per thread (of each kind).
362 struct ProducerToken;
363 struct ConsumerToken;
364 
365 template<typename T, typename Traits> class ConcurrentQueue;
366 template<typename T, typename Traits> class BlockingConcurrentQueue;
367 class ConcurrentQueueTests;
368 
369 
370 namespace details
371 {
373  {
375  std::atomic<bool> inactive;
377 
379  : next(nullptr), inactive(false), token(nullptr)
380  {
381  }
382  };
383 
384  template<bool use32> struct _hash_32_or_64 {
385  static inline std::uint32_t hash(std::uint32_t h)
386  {
387  // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
388  // Since the thread ID is already unique, all we really want to do is propagate that
389  // uniqueness evenly across all the bits, so that we can use a subset of the bits while
390  // reducing collisions significantly
391  h ^= h >> 16;
392  h *= 0x85ebca6b;
393  h ^= h >> 13;
394  h *= 0xc2b2ae35;
395  return h ^ (h >> 16);
396  }
397  };
398  template<> struct _hash_32_or_64<1> {
399  static inline std::uint64_t hash(std::uint64_t h)
400  {
401  h ^= h >> 33;
402  h *= 0xff51afd7ed558ccd;
403  h ^= h >> 33;
404  h *= 0xc4ceb9fe1a85ec53;
405  return h ^ (h >> 33);
406  }
407  };
408  template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };
409 
410  static inline size_t hash_thread_id(thread_id_t id)
411  {
412  static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
415  }
416 
417  template<typename T>
418  static inline bool circular_less_than(T a, T b)
419  {
420 #ifdef _MSC_VER
421 #pragma warning(push)
422 #pragma warning(disable: 4554)
423 #endif
424  static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
425  return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
426 #ifdef _MSC_VER
427 #pragma warning(pop)
428 #endif
429  }
430 
431  template<typename U>
432  static inline char* align_for(char* ptr)
433  {
434  const std::size_t alignment = std::alignment_of<U>::value;
435  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
436  }
437 
438  template<typename T>
439  static inline T ceil_to_pow_2(T x)
440  {
441  static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
442 
443  // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
444  --x;
445  x |= x >> 1;
446  x |= x >> 2;
447  x |= x >> 4;
448  for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
449  x |= x >> (i << 3);
450  }
451  ++x;
452  return x;
453  }
454 
455  template<typename T>
456  static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
457  {
458  T temp = std::move(left.load(std::memory_order_relaxed));
459  left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
460  right.store(std::move(temp), std::memory_order_relaxed);
461  }
462 
463  template<typename T>
464  static inline T const& nomove(T const& x)
465  {
466  return x;
467  }
468 
469  template<bool Enable>
470  struct nomove_if
471  {
472  template<typename T>
473  static inline T const& eval(T const& x)
474  {
475  return x;
476  }
477  };
478 
479  template<>
480  struct nomove_if<false>
481  {
482  template<typename U>
483  static inline auto eval(U&& x)
484  -> decltype(std::forward<U>(x))
485  {
486  return std::forward<U>(x);
487  }
488  };
489 
490  template<typename It>
491  static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
492  {
493  return *it;
494  }
495 
496 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
497  template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
498 #else
499  template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
500 #endif
501 
502 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
503 #ifdef MCDBGQ_USE_RELACY
504  typedef RelacyThreadExitListener ThreadExitListener;
505  typedef RelacyThreadExitNotifier ThreadExitNotifier;
506 #else
507  struct ThreadExitListener
508  {
509  typedef void (*callback_t)(void*);
510  callback_t callback;
511  void* userData;
512 
513  ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
514  };
515 
516 
517  class ThreadExitNotifier
518  {
519  public:
520  static void subscribe(ThreadExitListener* listener)
521  {
522  auto& tlsInst = instance();
523  listener->next = tlsInst.tail;
524  tlsInst.tail = listener;
525  }
526 
527  static void unsubscribe(ThreadExitListener* listener)
528  {
529  auto& tlsInst = instance();
530  ThreadExitListener** prev = &tlsInst.tail;
531  for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
532  if (ptr == listener) {
533  *prev = ptr->next;
534  break;
535  }
536  prev = &ptr->next;
537  }
538  }
539 
540  private:
541  ThreadExitNotifier() : tail(nullptr) { }
542  ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
543  ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
544 
545  ~ThreadExitNotifier()
546  {
547  // This thread is about to exit, let everyone know!
548  assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
549  for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
550  ptr->callback(ptr->userData);
551  }
552  }
553 
554  // Thread-local
555  static inline ThreadExitNotifier& instance()
556  {
557  static thread_local ThreadExitNotifier notifier;
558  return notifier;
559  }
560 
561  private:
562  ThreadExitListener* tail;
563  };
564 #endif
565 #endif
566 
567  template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
568  template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
569  template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
570  template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
571  template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
572  template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
573  template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
574  template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
575  template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
576 }
577 
578 
580 {
581  template<typename T, typename Traits>
582  explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
583 
584  template<typename T, typename Traits>
586 
588  : producer(other.producer)
589  {
590  other.producer = nullptr;
591  if (producer != nullptr) {
592  producer->token = this;
593  }
594  }
595 
597  {
598  swap(other);
599  return *this;
600  }
601 
603  {
604  std::swap(producer, other.producer);
605  if (producer != nullptr) {
606  producer->token = this;
607  }
608  if (other.producer != nullptr) {
609  other.producer->token = &other;
610  }
611  }
612 
613  // A token is always valid unless:
614  // 1) Memory allocation failed during construction
615  // 2) It was moved via the move constructor
616  // (Note: assignment does a swap, leaving both potentially valid)
617  // 3) The associated queue was destroyed
618  // Note that if valid() returns true, that only indicates
619  // that the token is valid for use with a specific queue,
620  // but not which one; that's up to the user to track.
621  inline bool valid() const { return producer != nullptr; }
622 
624  {
625  if (producer != nullptr) {
626  producer->token = nullptr;
627  producer->inactive.store(true, std::memory_order_release);
628  }
629  }
630 
631  // Disable copying and assignment
634 
635 private:
636  template<typename T, typename Traits> friend class ConcurrentQueue;
637  friend class ConcurrentQueueTests;
638 
639 protected:
641 };
642 
643 
645 {
646  template<typename T, typename Traits>
648 
649  template<typename T, typename Traits>
651 
653  : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
654  {
655  }
656 
658  {
659  swap(other);
660  return *this;
661  }
662 
664  {
665  std::swap(initialOffset, other.initialOffset);
666  std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
667  std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
668  std::swap(currentProducer, other.currentProducer);
669  std::swap(desiredProducer, other.desiredProducer);
670  }
671 
672  // Disable copying and assignment
675 
676 private:
677  template<typename T, typename Traits> friend class ConcurrentQueue;
678  friend class ConcurrentQueueTests;
679 
680 private: // but shared with ConcurrentQueue
681  std::uint32_t initialOffset;
682  std::uint32_t lastKnownGlobalOffset;
686 };
687 
688 // Need to forward-declare this swap because it's in a namespace.
689 // See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
690 template<typename T, typename Traits>
692 
693 
694 template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
695 class ConcurrentQueue
696 {
697 public:
698  typedef ::moodycamel::ProducerToken producer_token_t;
699  typedef ::moodycamel::ConsumerToken consumer_token_t;
700 
701  typedef typename Traits::index_t index_t;
702  typedef typename Traits::size_t size_t;
703 
704  static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
705  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
706  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
707  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
708  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
709  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
710 #ifdef _MSC_VER
711 #pragma warning(push)
712 #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
713 #pragma warning(disable: 4309) // static_cast: Truncation of constant value
714 #endif
715  static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
716 #ifdef _MSC_VER
717 #pragma warning(pop)
718 #endif
719 
720  static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");
721  static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");
722  static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
723  static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
724  static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
725  static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
726  static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
727  static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
728  static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
729 
730 public:
731  // Creates a queue with at least `capacity` element slots; note that the
732  // actual number of elements that can be inserted without additional memory
733  // allocation depends on the number of producers and the block size (e.g. if
734  // the block size is equal to `capacity`, only a single block will be allocated
735  // up-front, which means only a single producer will be able to enqueue elements
736  // without an extra allocation -- blocks aren't shared between producers).
737  // This method is not thread safe -- it is up to the user to ensure that the
738  // queue is fully constructed before it starts being used by other threads (this
739  // includes making the memory effects of construction visible, possibly with a
740  // memory barrier).
741  explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
742  : producerListTail(nullptr),
743  producerCount(0),
744  initialBlockPoolIndex(0),
745  nextExplicitConsumerId(0),
746  globalExplicitConsumerOffset(0)
747  {
748  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
749  populate_initial_implicit_producer_hash();
750  populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
751 
752 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
753  // Track all the producers using a fully-resolved typed list for
754  // each kind; this makes it possible to debug them starting from
755  // the root queue object (otherwise wacky casts are needed that
756  // don't compile in the debugger's expression evaluator).
757  explicitProducers.store(nullptr, std::memory_order_relaxed);
758  implicitProducers.store(nullptr, std::memory_order_relaxed);
759 #endif
760  }
761 
762  // Computes the correct amount of pre-allocated blocks for you based
763  // on the minimum number of elements you want available at any given
764  // time, and the maximum concurrent number of each type of producer.
765  ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
766  : producerListTail(nullptr),
767  producerCount(0),
768  initialBlockPoolIndex(0),
769  nextExplicitConsumerId(0),
770  globalExplicitConsumerOffset(0)
771  {
772  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
773  populate_initial_implicit_producer_hash();
774  size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
775  populate_initial_block_list(blocks);
776 
777 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
778  explicitProducers.store(nullptr, std::memory_order_relaxed);
779  implicitProducers.store(nullptr, std::memory_order_relaxed);
780 #endif
781  }
782 
783  // Note: The queue should not be accessed concurrently while it's
784  // being deleted. It's up to the user to synchronize this.
785  // This method is not thread safe.
787  {
788  // Destroy producers
789  auto ptr = producerListTail.load(std::memory_order_relaxed);
790  while (ptr != nullptr) {
791  auto next = ptr->next_prod();
792  if (ptr->token != nullptr) {
793  ptr->token->producer = nullptr;
794  }
795  destroy(ptr);
796  ptr = next;
797  }
798 
799  // Destroy implicit producer hash tables
800  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
801  auto hash = implicitProducerHash.load(std::memory_order_relaxed);
802  while (hash != nullptr) {
803  auto prev = hash->prev;
804  if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
805  for (size_t i = 0; i != hash->capacity; ++i) {
806  hash->entries[i].~ImplicitProducerKVP();
807  }
808  hash->~ImplicitProducerHash();
809  (Traits::free)(hash);
810  }
811  hash = prev;
812  }
813  }
814 
815  // Destroy global free list
816  auto block = freeList.head_unsafe();
817  while (block != nullptr) {
818  auto next = block->freeListNext.load(std::memory_order_relaxed);
819  if (block->dynamicallyAllocated) {
820  destroy(block);
821  }
822  block = next;
823  }
824 
825  // Destroy initial free list
826  destroy_array(initialBlockPool, initialBlockPoolSize);
827  }
828 
829  // Disable copying and copy assignment
832 
833  // Moving is supported, but note that it is *not* a thread-safe operation.
834  // Nobody can use the queue while it's being moved, and the memory effects
835  // of that move must be propagated to other threads before they can use it.
836  // Note: When a queue is moved, its tokens are still valid but can only be
837  // used with the destination queue (i.e. semantically they are moved along
838  // with the queue itself).
840  : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
841  producerCount(other.producerCount.load(std::memory_order_relaxed)),
842  initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
843  initialBlockPool(other.initialBlockPool),
844  initialBlockPoolSize(other.initialBlockPoolSize),
845  freeList(std::move(other.freeList)),
846  nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
847  globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
848  {
849  // Move the other one into this, and leave the other one as an empty queue
850  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
851  populate_initial_implicit_producer_hash();
852  swap_implicit_producer_hashes(other);
853 
854  other.producerListTail.store(nullptr, std::memory_order_relaxed);
855  other.producerCount.store(0, std::memory_order_relaxed);
856  other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
857  other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
858 
859 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
860  explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
861  other.explicitProducers.store(nullptr, std::memory_order_relaxed);
862  implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
863  other.implicitProducers.store(nullptr, std::memory_order_relaxed);
864 #endif
865 
866  other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
867  other.initialBlockPoolSize = 0;
868  other.initialBlockPool = nullptr;
869 
870  reown_producers();
871  }
872 
873  inline ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
874  {
875  return swap_internal(other);
876  }
877 
878  // Swaps this queue's state with the other's. Not thread-safe.
879  // Swapping two queues does not invalidate their tokens, however
880  // the tokens that were created for one queue must be used with
881  // only the swapped queue (i.e. the tokens are tied to the
882  // queue's movable state, not the object itself).
883  inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
884  {
885  swap_internal(other);
886  }
887 
888 private:
890  {
891  if (this == &other) {
892  return *this;
893  }
894 
895  details::swap_relaxed(producerListTail, other.producerListTail);
896  details::swap_relaxed(producerCount, other.producerCount);
897  details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
898  std::swap(initialBlockPool, other.initialBlockPool);
899  std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
900  freeList.swap(other.freeList);
901  details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
902  details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
903 
904  swap_implicit_producer_hashes(other);
905 
906  reown_producers();
907  other.reown_producers();
908 
909 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
910  details::swap_relaxed(explicitProducers, other.explicitProducers);
911  details::swap_relaxed(implicitProducers, other.implicitProducers);
912 #endif
913 
914  return *this;
915  }
916 
917 public:
918  // Enqueues a single item (by copying it).
919  // Allocates memory if required. Only fails if memory allocation fails (or implicit
920  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
921  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
922  // Thread-safe.
923  inline bool enqueue(T const& item)
924  {
925  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
926  return inner_enqueue<CanAlloc>(item);
927  }
928 
929  // Enqueues a single item (by moving it, if possible).
930  // Allocates memory if required. Only fails if memory allocation fails (or implicit
931  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
932  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
933  // Thread-safe.
934  inline bool enqueue(T&& item)
935  {
936  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
937  return inner_enqueue<CanAlloc>(std::move(item));
938  }
939 
940  // Enqueues a single item (by copying it) using an explicit producer token.
941  // Allocates memory if required. Only fails if memory allocation fails (or
942  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
943  // Thread-safe.
944  inline bool enqueue(producer_token_t const& token, T const& item)
945  {
946  return inner_enqueue<CanAlloc>(token, item);
947  }
948 
949  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
950  // Allocates memory if required. Only fails if memory allocation fails (or
951  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
952  // Thread-safe.
953  inline bool enqueue(producer_token_t const& token, T&& item)
954  {
955  return inner_enqueue<CanAlloc>(token, std::move(item));
956  }
957 
958  // Enqueues several items.
959  // Allocates memory if required. Only fails if memory allocation fails (or
960  // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
961  // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
962  // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
963  // Thread-safe.
964  template<typename It>
965  bool enqueue_bulk(It itemFirst, size_t count)
966  {
967  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
968  return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
969  }
970 
971  // Enqueues several items using an explicit producer token.
972  // Allocates memory if required. Only fails if memory allocation fails
973  // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
974  // Note: Use std::make_move_iterator if the elements should be moved
975  // instead of copied.
976  // Thread-safe.
977  template<typename It>
978  bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
979  {
980  return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
981  }
982 
983  // Enqueues a single item (by copying it).
984  // Does not allocate memory. Fails if not enough room to enqueue (or implicit
985  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
986  // is 0).
987  // Thread-safe.
988  inline bool try_enqueue(T const& item)
989  {
990  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
991  return inner_enqueue<CannotAlloc>(item);
992  }
993 
994  // Enqueues a single item (by moving it, if possible).
995  // Does not allocate memory (except for one-time implicit producer).
996  // Fails if not enough room to enqueue (or implicit production is
997  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
998  // Thread-safe.
999  inline bool try_enqueue(T&& item)
1000  {
1001  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1002  return inner_enqueue<CannotAlloc>(std::move(item));
1003  }
1004 
1005  // Enqueues a single item (by copying it) using an explicit producer token.
1006  // Does not allocate memory. Fails if not enough room to enqueue.
1007  // Thread-safe.
1008  inline bool try_enqueue(producer_token_t const& token, T const& item)
1009  {
1010  return inner_enqueue<CannotAlloc>(token, item);
1011  }
1012 
1013  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1014  // Does not allocate memory. Fails if not enough room to enqueue.
1015  // Thread-safe.
1016  inline bool try_enqueue(producer_token_t const& token, T&& item)
1017  {
1018  return inner_enqueue<CannotAlloc>(token, std::move(item));
1019  }
1020 
1021  // Enqueues several items.
1022  // Does not allocate memory (except for one-time implicit producer).
1023  // Fails if not enough room to enqueue (or implicit production is
1024  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1025  // Note: Use std::make_move_iterator if the elements should be moved
1026  // instead of copied.
1027  // Thread-safe.
1028  template<typename It>
1029  bool try_enqueue_bulk(It itemFirst, size_t count)
1030  {
1031  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1032  return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1033  }
1034 
1035  // Enqueues several items using an explicit producer token.
1036  // Does not allocate memory. Fails if not enough room to enqueue.
1037  // Note: Use std::make_move_iterator if the elements should be moved
1038  // instead of copied.
1039  // Thread-safe.
1040  template<typename It>
1041  bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1042  {
1043  return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1044  }
1045 
1046 
1047 
1048  // Attempts to dequeue from the queue.
1049  // Returns false if all producer streams appeared empty at the time they
1050  // were checked (so, the queue is likely but not guaranteed to be empty).
1051  // Never allocates. Thread-safe.
1052  template<typename U>
1053  bool try_dequeue(U& item)
1054  {
1055  // Instead of simply trying each producer in turn (which could cause needless contention on the first
1056  // producer), we score them heuristically.
1057  size_t nonEmptyCount = 0;
1058  ProducerBase* best = nullptr;
1059  size_t bestSize = 0;
1060  for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1061  auto size = ptr->size_approx();
1062  if (size > 0) {
1063  if (size > bestSize) {
1064  bestSize = size;
1065  best = ptr;
1066  }
1067  ++nonEmptyCount;
1068  }
1069  }
1070 
1071  // If there was at least one non-empty queue but it appears empty at the time
1072  // we try to dequeue from it, we need to make sure every queue's been tried
1073  if (nonEmptyCount > 0) {
1074  if ((details::likely)(best->dequeue(item))) {
1075  return true;
1076  }
1077  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1078  if (ptr != best && ptr->dequeue(item)) {
1079  return true;
1080  }
1081  }
1082  }
1083  return false;
1084  }
1085 
1086  // Attempts to dequeue from the queue.
1087  // Returns false if all producer streams appeared empty at the time they
1088  // were checked (so, the queue is likely but not guaranteed to be empty).
1089  // This differs from the try_dequeue(item) method in that this one does
1090  // not attempt to reduce contention by interleaving the order that producer
1091  // streams are dequeued from. So, using this method can reduce overall throughput
1092  // under contention, but will give more predictable results in single-threaded
1093  // consumer scenarios. This is mostly only useful for internal unit tests.
1094  // Never allocates. Thread-safe.
1095  template<typename U>
1097  {
1098  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1099  if (ptr->dequeue(item)) {
1100  return true;
1101  }
1102  }
1103  return false;
1104  }
1105 
1106  // Attempts to dequeue from the queue using an explicit consumer token.
1107  // Returns false if all producer streams appeared empty at the time they
1108  // were checked (so, the queue is likely but not guaranteed to be empty).
1109  // Never allocates. Thread-safe.
1110  template<typename U>
1111  bool try_dequeue(consumer_token_t& token, U& item)
1112  {
1113  // The idea is roughly as follows:
1114  // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1115  // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1116  // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1117  // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1118 
1119  if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1120  if (!update_current_producer_after_rotation(token)) {
1121  return false;
1122  }
1123  }
1124 
1125  // If there was at least one non-empty queue but it appears empty at the time
1126  // we try to dequeue from it, we need to make sure every queue's been tried
1127  if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
1128  if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1129  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1130  }
1131  return true;
1132  }
1133 
1134  auto tail = producerListTail.load(std::memory_order_acquire);
1135  auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1136  if (ptr == nullptr) {
1137  ptr = tail;
1138  }
1139  while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1140  if (ptr->dequeue(item)) {
1141  token.currentProducer = ptr;
1142  token.itemsConsumedFromCurrent = 1;
1143  return true;
1144  }
1145  ptr = ptr->next_prod();
1146  if (ptr == nullptr) {
1147  ptr = tail;
1148  }
1149  }
1150  return false;
1151  }
1152 
1153  // Attempts to dequeue several elements from the queue.
1154  // Returns the number of items actually dequeued.
1155  // Returns 0 if all producer streams appeared empty at the time they
1156  // were checked (so, the queue is likely but not guaranteed to be empty).
1157  // Never allocates. Thread-safe.
1158  template<typename It>
1159  size_t try_dequeue_bulk(It itemFirst, size_t max)
1160  {
1161  size_t count = 0;
1162  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1163  count += ptr->dequeue_bulk(itemFirst, max - count);
1164  if (count == max) {
1165  break;
1166  }
1167  }
1168  return count;
1169  }
1170 
1171  // Attempts to dequeue several elements from the queue using an explicit consumer token.
1172  // Returns the number of items actually dequeued.
1173  // Returns 0 if all producer streams appeared empty at the time they
1174  // were checked (so, the queue is likely but not guaranteed to be empty).
1175  // Never allocates. Thread-safe.
1176  template<typename It>
1177  size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
1178  {
1179  if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1180  if (!update_current_producer_after_rotation(token)) {
1181  return 0;
1182  }
1183  }
1184 
1185  size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1186  if (count == max) {
1187  if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1188  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1189  }
1190  return max;
1191  }
1192  token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1193  max -= count;
1194 
1195  auto tail = producerListTail.load(std::memory_order_acquire);
1196  auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1197  if (ptr == nullptr) {
1198  ptr = tail;
1199  }
1200  while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1201  auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1202  count += dequeued;
1203  if (dequeued != 0) {
1204  token.currentProducer = ptr;
1205  token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1206  }
1207  if (dequeued == max) {
1208  break;
1209  }
1210  max -= dequeued;
1211  ptr = ptr->next_prod();
1212  if (ptr == nullptr) {
1213  ptr = tail;
1214  }
1215  }
1216  return count;
1217  }
1218 
1219 
1220 
1221  // Attempts to dequeue from a specific producer's inner queue.
1222  // If you happen to know which producer you want to dequeue from, this
1223  // is significantly faster than using the general-case try_dequeue methods.
1224  // Returns false if the producer's queue appeared empty at the time it
1225  // was checked (so, the queue is likely but not guaranteed to be empty).
1226  // Never allocates. Thread-safe.
1227  template<typename U>
1228  inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
1229  {
1230  return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
1231  }
1232 
1233  // Attempts to dequeue several elements from a specific producer's inner queue.
1234  // Returns the number of items actually dequeued.
1235  // If you happen to know which producer you want to dequeue from, this
1236  // is significantly faster than using the general-case try_dequeue methods.
1237  // Returns 0 if the producer's queue appeared empty at the time it
1238  // was checked (so, the queue is likely but not guaranteed to be empty).
1239  // Never allocates. Thread-safe.
1240  template<typename It>
1241  inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
1242  {
1243  return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
1244  }
1245 
1246 
1247  // Returns an estimate of the total number of elements currently in the queue. This
1248  // estimate is only accurate if the queue has completely stabilized before it is called
1249  // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1250  // visible on the calling thread, and no further operations start while this method is
1251  // being called).
1252  // Thread-safe.
1253  size_t size_approx() const
1254  {
1255  size_t size = 0;
1256  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1257  size += ptr->size_approx();
1258  }
1259  return size;
1260  }
1261 
1262 
1263  // Returns true if the underlying atomic variables used by
1264  // the queue are lock-free (they should be on most platforms).
1265  // Thread-safe.
1266  static bool is_lock_free()
1267  {
1268  return
1275  }
1276 
1277 
1278 private:
1279  friend struct ProducerToken;
1280  friend struct ConsumerToken;
1281  struct ExplicitProducer;
1282  friend struct ExplicitProducer;
1283  struct ImplicitProducer;
1284  friend struct ImplicitProducer;
1285  friend class ConcurrentQueueTests;
1286 
1287  enum AllocationMode { CanAlloc, CannotAlloc };
1288 
1289 
1290  ///////////////////////////////
1291  // Queue methods
1292  ///////////////////////////////
1293 
1294  template<AllocationMode canAlloc, typename U>
1295  inline bool inner_enqueue(producer_token_t const& token, U&& element)
1296  {
1297  return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1298  }
1299 
1300  template<AllocationMode canAlloc, typename U>
1301  inline bool inner_enqueue(U&& element)
1302  {
1303  auto producer = get_or_add_implicit_producer();
1304  return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1305  }
1306 
1307  template<AllocationMode canAlloc, typename It>
1308  inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1309  {
1310  return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1311  }
1312 
1313  template<AllocationMode canAlloc, typename It>
1314  inline bool inner_enqueue_bulk(It itemFirst, size_t count)
1315  {
1316  auto producer = get_or_add_implicit_producer();
1317  return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1318  }
1319 
1320  inline bool update_current_producer_after_rotation(consumer_token_t& token)
1321  {
1322  // Ah, there's been a rotation, figure out where we should be!
1323  auto tail = producerListTail.load(std::memory_order_acquire);
1324  if (token.desiredProducer == nullptr && tail == nullptr) {
1325  return false;
1326  }
1327  auto prodCount = producerCount.load(std::memory_order_relaxed);
1328  auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1329  if ((details::unlikely)(token.desiredProducer == nullptr)) {
1330  // Aha, first time we're dequeueing anything.
1331  // Figure out our local position
1332  // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1333  std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1334  token.desiredProducer = tail;
1335  for (std::uint32_t i = 0; i != offset; ++i) {
1336  token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1337  if (token.desiredProducer == nullptr) {
1338  token.desiredProducer = tail;
1339  }
1340  }
1341  }
1342 
1343  std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1344  if (delta >= prodCount) {
1345  delta = delta % prodCount;
1346  }
1347  for (std::uint32_t i = 0; i != delta; ++i) {
1348  token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1349  if (token.desiredProducer == nullptr) {
1350  token.desiredProducer = tail;
1351  }
1352  }
1353 
1354  token.lastKnownGlobalOffset = globalOffset;
1355  token.currentProducer = token.desiredProducer;
1356  token.itemsConsumedFromCurrent = 0;
1357  return true;
1358  }
1359 
1360 
1361  ///////////////////////////
1362  // Free list
1363  ///////////////////////////
1364 
1365  template <typename N>
1367  {
1368  FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
1369 
1370  std::atomic<std::uint32_t> freeListRefs;
1371  std::atomic<N*> freeListNext;
1372  };
1373 
1374  // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1375  // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1376  // speedy under low contention.
1377  template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1378  struct FreeList
1379  {
1380  FreeList() : freeListHead(nullptr) { }
1381  FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
1382  void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1383 
1385  FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1386 
1387  inline void add(N* node)
1388  {
1389 #if MCDBGQ_NOLOCKFREE_FREELIST
1390  debug::DebugLock lock(mutex);
1391 #endif
1392  // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1393  // set it using a fetch_add
1394  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1395  // Oh look! We were the last ones referencing this node, and we know
1396  // we want to add it to the free list, so let's do it!
1397  add_knowing_refcount_is_zero(node);
1398  }
1399  }
1400 
1401  inline N* try_get()
1402  {
1403 #if MCDBGQ_NOLOCKFREE_FREELIST
1404  debug::DebugLock lock(mutex);
1405 #endif
1406  auto head = freeListHead.load(std::memory_order_acquire);
1407  while (head != nullptr) {
1408  auto prevHead = head;
1409  auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1410  if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1411  head = freeListHead.load(std::memory_order_acquire);
1412  continue;
1413  }
1414 
1415  // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1416  // next and not worry about it changing between now and the time we do the CAS
1417  auto next = head->freeListNext.load(std::memory_order_relaxed);
1418  if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1419  // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1420  // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1421  assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1422 
1423  // Decrease refcount twice, once for our ref, and once for the list's ref
1424  head->freeListRefs.fetch_sub(2, std::memory_order_release);
1425  return head;
1426  }
1427 
1428  // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1429  // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1430  // count decrement happens-after the CAS on the head.
1431  refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1432  if (refs == SHOULD_BE_ON_FREELIST + 1) {
1433  add_knowing_refcount_is_zero(prevHead);
1434  }
1435  }
1436 
1437  return nullptr;
1438  }
1439 
1440  // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1441  N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1442 
1443  private:
1444  inline void add_knowing_refcount_is_zero(N* node)
1445  {
1446  // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1447  // only one copy of this method per node at a time, i.e. the single thread case), then we know
1448  // we can safely change the next pointer of the node; however, once the refcount is back above
1449  // zero, then other threads could increase it (happens under heavy contention, when the refcount
1450  // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1451  // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1452  // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1453  // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1454  auto head = freeListHead.load(std::memory_order_relaxed);
1455  while (true) {
1456  node->freeListNext.store(head, std::memory_order_relaxed);
1457  node->freeListRefs.store(1, std::memory_order_release);
1458  if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1459  // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1460  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1461  continue;
1462  }
1463  }
1464  return;
1465  }
1466  }
1467 
1468  private:
1469  // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1470  std::atomic<N*> freeListHead;
1471 
1472  static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1473  static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1474 
1475 #if MCDBGQ_NOLOCKFREE_FREELIST
1476  debug::DebugMutex mutex;
1477 #endif
1478  };
1479 
1480 
1481  ///////////////////////////
1482  // Block
1483  ///////////////////////////
1484 
1485  enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1486 
1487  struct Block
1488  {
1490  : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true)
1491  {
1492 #if MCDBGQ_TRACKMEM
1493  owner = nullptr;
1494 #endif
1495  }
1496 
1497  template<InnerQueueContext context>
1498  inline bool is_empty() const
1499  {
1500  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1501  // Check flags
1502  for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1503  if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1504  return false;
1505  }
1506  }
1507 
1508  // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1509  std::atomic_thread_fence(std::memory_order_acquire);
1510  return true;
1511  }
1512  else {
1513  // Check counter
1514  if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1515  std::atomic_thread_fence(std::memory_order_acquire);
1516  return true;
1517  }
1518  assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1519  return false;
1520  }
1521  }
1522 
1523  // Returns true if the block is now empty (does not apply in explicit context)
1524  template<InnerQueueContext context>
1525  inline bool set_empty(index_t i)
1526  {
1527  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1528  // Set flag
1529  assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1530  emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
1531  return false;
1532  }
1533  else {
1534  // Increment counter
1535  auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1536  assert(prevVal < BLOCK_SIZE);
1537  return prevVal == BLOCK_SIZE - 1;
1538  }
1539  }
1540 
1541  // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1542  // Returns true if the block is now empty (does not apply in explicit context).
1543  template<InnerQueueContext context>
1544  inline bool set_many_empty(index_t i, size_t count)
1545  {
1546  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1547  // Set flags
1548  std::atomic_thread_fence(std::memory_order_release);
1549  i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1550  for (size_t j = 0; j != count; ++j) {
1551  assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1552  emptyFlags[i + j].store(true, std::memory_order_relaxed);
1553  }
1554  return false;
1555  }
1556  else {
1557  // Increment counter
1558  auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1559  assert(prevVal + count <= BLOCK_SIZE);
1560  return prevVal + count == BLOCK_SIZE;
1561  }
1562  }
1563 
1564  template<InnerQueueContext context>
1565  inline void set_all_empty()
1566  {
1567  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1568  // Set all flags
1569  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1570  emptyFlags[i].store(true, std::memory_order_relaxed);
1571  }
1572  }
1573  else {
1574  // Reset counter
1575  elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1576  }
1577  }
1578 
1579  template<InnerQueueContext context>
1580  inline void reset_empty()
1581  {
1582  if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1583  // Reset flags
1584  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1585  emptyFlags[i].store(false, std::memory_order_relaxed);
1586  }
1587  }
1588  else {
1589  // Reset counter
1590  elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1591  }
1592  }
1593 
1594  inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1595  inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1596 
1597  private:
1598  // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of
1599  // addresses returned by malloc, that alignment will be preserved. Apparently clang actually
1600  // generates code that uses this assumption for AVX instructions in some cases. Ideally, we
1601  // should also align Block to the alignment of T in case it's higher than malloc's 16-byte
1602  // alignment, but this is hard to do in a cross-platform way. Assert for this case:
1603  static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time");
1604  // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since
1605  // otherwise the appropriate padding will not be added at the end of Block in order to make
1606  // arrays of Blocks all be properly aligned (not just the first one). We use a union to force
1607  // this.
1608  union {
1609  char elements[sizeof(T) * BLOCK_SIZE];
1611  };
1612  public:
1614  std::atomic<size_t> elementsCompletelyDequeued;
1615  std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1616  public:
1617  std::atomic<std::uint32_t> freeListRefs;
1618  std::atomic<Block*> freeListNext;
1619  std::atomic<bool> shouldBeOnFreeList;
1620  bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1621 
1622 #if MCDBGQ_TRACKMEM
1623  void* owner;
1624 #endif
1625  };
1626  static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1627 
1628 
1629 #if MCDBGQ_TRACKMEM
1630 public:
1631  struct MemStats;
1632 private:
1633 #endif
1634 
1635  ///////////////////////////
1636  // Producer base
1637  ///////////////////////////
1638 
1640  {
1641  ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
1642  tailIndex(0),
1643  headIndex(0),
1644  dequeueOptimisticCount(0),
1645  dequeueOvercommit(0),
1646  tailBlock(nullptr),
1647  isExplicit(isExplicit_),
1648  parent(parent_)
1649  {
1650  }
1651 
1652  virtual ~ProducerBase() { };
1653 
1654  template<typename U>
1655  inline bool dequeue(U& element)
1656  {
1657  if (isExplicit) {
1658  return static_cast<ExplicitProducer*>(this)->dequeue(element);
1659  }
1660  else {
1661  return static_cast<ImplicitProducer*>(this)->dequeue(element);
1662  }
1663  }
1664 
1665  template<typename It>
1666  inline size_t dequeue_bulk(It& itemFirst, size_t max)
1667  {
1668  if (isExplicit) {
1669  return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1670  }
1671  else {
1672  return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1673  }
1674  }
1675 
1676  inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
1677 
1678  inline size_t size_approx() const
1679  {
1680  auto tail = tailIndex.load(std::memory_order_relaxed);
1681  auto head = headIndex.load(std::memory_order_relaxed);
1682  return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1683  }
1684 
1685  inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1686  protected:
1687  std::atomic<index_t> tailIndex; // Where to enqueue to next
1688  std::atomic<index_t> headIndex; // Where to dequeue from next
1689 
1690  std::atomic<index_t> dequeueOptimisticCount;
1691  std::atomic<index_t> dequeueOvercommit;
1692 
1694 
1695  public:
1698 
1699  protected:
1700 #if MCDBGQ_TRACKMEM
1701  friend struct MemStats;
1702 #endif
1703  };
1704 
1705 
1706  ///////////////////////////
1707  // Explicit queue
1708  ///////////////////////////
1709 
1711  {
1712  explicit ExplicitProducer(ConcurrentQueue* parent) :
1713  ProducerBase(parent, true),
1714  blockIndex(nullptr),
1715  pr_blockIndexSlotsUsed(0),
1716  pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
1717  pr_blockIndexFront(0),
1718  pr_blockIndexEntries(nullptr),
1719  pr_blockIndexRaw(nullptr)
1720  {
1721  size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
1722  if (poolBasedIndexSize > pr_blockIndexSize) {
1723  pr_blockIndexSize = poolBasedIndexSize;
1724  }
1725 
1726  new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1727  }
1728 
1730  {
1731  // Destruct any elements not yet dequeued.
1732  // Since we're in the destructor, we can assume all elements
1733  // are either completely dequeued or completely not (no halfways).
1734  if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1735  // First find the block that's partially dequeued, if any
1736  Block* halfDequeuedBlock = nullptr;
1737  if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1738  // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1739  // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1740  size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1741  while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1742  i = (i + 1) & (pr_blockIndexSize - 1);
1743  }
1744  assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1745  halfDequeuedBlock = pr_blockIndexEntries[i].block;
1746  }
1747 
1748  // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1749  auto block = this->tailBlock;
1750  do {
1751  block = block->next;
1752  if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1753  continue;
1754  }
1755 
1756  size_t i = 0; // Offset into block
1757  if (block == halfDequeuedBlock) {
1758  i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1759  }
1760 
1761  // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
1762  auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1763  while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1764  (*block)[i++]->~T();
1765  }
1766  } while (block != this->tailBlock);
1767  }
1768 
1769  // Destroy all blocks that we own
1770  if (this->tailBlock != nullptr) {
1771  auto block = this->tailBlock;
1772  do {
1773  auto nextBlock = block->next;
1774  if (block->dynamicallyAllocated) {
1775  destroy(block);
1776  }
1777  else {
1778  this->parent->add_block_to_free_list(block);
1779  }
1780  block = nextBlock;
1781  } while (block != this->tailBlock);
1782  }
1783 
1784  // Destroy the block indices
1785  auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
1786  while (header != nullptr) {
1787  auto prev = static_cast<BlockIndexHeader*>(header->prev);
1788  header->~BlockIndexHeader();
1789  (Traits::free)(header);
1790  header = prev;
1791  }
1792  }
1793 
1794  template<AllocationMode allocMode, typename U>
1795  inline bool enqueue(U&& element)
1796  {
1797  index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1798  index_t newTailIndex = 1 + currentTailIndex;
1799  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1800  // We reached the end of a block, start a new one
1801  auto startBlock = this->tailBlock;
1802  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1803  if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1804  // We can re-use the block ahead of us, it's empty!
1805  this->tailBlock = this->tailBlock->next;
1806  this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1807 
1808  // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1809  // last block from it first -- except instead of removing then adding, we can just overwrite).
1810  // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1811  // it would have been re-attempted when adding the first block to the queue; since there is such
1812  // a block, a block index must have been successfully allocated.
1813  }
1814  else {
1815  // Whatever head value we see here is >= the last value we saw here (relatively),
1816  // and <= its current value. Since we have the most recent tail, the head must be
1817  // <= to it.
1818  auto head = this->headIndex.load(std::memory_order_relaxed);
1819  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1820  if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1821  || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1822  // We can't enqueue in another block because there's not enough leeway -- the
1823  // tail could surpass the head by the time the block fills up! (Or we'll exceed
1824  // the size limit, if the second part of the condition was true.)
1825  return false;
1826  }
1827  // We're going to need a new block; check that the block index has room
1828  if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1829  // Hmm, the circular block index is already full -- we'll need
1830  // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1831  // the initial allocation failed in the constructor.
1832 
1833  if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
1834  return false;
1835  }
1836  }
1837 
1838  // Insert a new block in the circular linked list
1839  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1840  if (newBlock == nullptr) {
1841  return false;
1842  }
1843 #if MCDBGQ_TRACKMEM
1844  newBlock->owner = this;
1845 #endif
1846  newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1847  if (this->tailBlock == nullptr) {
1848  newBlock->next = newBlock;
1849  }
1850  else {
1851  newBlock->next = this->tailBlock->next;
1852  this->tailBlock->next = newBlock;
1853  }
1854  this->tailBlock = newBlock;
1855  ++pr_blockIndexSlotsUsed;
1856  }
1857 
1858  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
1859  // The constructor may throw. We want the element not to appear in the queue in
1860  // that case (without corrupting the queue):
1861  MOODYCAMEL_TRY {
1862  new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1863  }
1864  MOODYCAMEL_CATCH (...) {
1865  // Revert change to the current block, but leave the new block available
1866  // for next time
1867  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1868  this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1870  }
1871  }
1872  else {
1873  (void)startBlock;
1874  (void)originalBlockIndexSlotsUsed;
1875  }
1876 
1877  // Add block to block index
1878  auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1879  entry.base = currentTailIndex;
1880  entry.block = this->tailBlock;
1881  blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1882  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1883 
1884  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
1885  this->tailIndex.store(newTailIndex, std::memory_order_release);
1886  return true;
1887  }
1888  }
1889 
1890  // Enqueue
1891  new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1892 
1893  this->tailIndex.store(newTailIndex, std::memory_order_release);
1894  return true;
1895  }
1896 
1897  template<typename U>
1898  bool dequeue(U& element)
1899  {
1900  auto tail = this->tailIndex.load(std::memory_order_relaxed);
1901  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1902  if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1903  // Might be something to dequeue, let's give it a try
1904 
1905  // Note that this if is purely for performance purposes in the common case when the queue is
1906  // empty and the values are eventually consistent -- we may enter here spuriously.
1907 
1908  // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
1909  // change them) and must be the same value at this point (inside the if) as when the if condition was
1910  // evaluated.
1911 
1912  // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
1913  // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
1914  // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
1915  // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
1916  // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
1917  // unfortunately that can't be shown to be correct using only the C++11 standard.
1918  // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
1919  std::atomic_thread_fence(std::memory_order_acquire);
1920 
1921  // Increment optimistic counter, then check if it went over the boundary
1922  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1923 
1924  // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
1925  // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
1926  // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
1927  // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
1928  // However, we can't assert this since both dequeueOptimisticCount and dequeueOvercommit may (independently)
1929  // overflow; in such a case, though, the logic still holds since the difference between the two is maintained.
1930 
1931  // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
1932  // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
1933  // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
1934  tail = this->tailIndex.load(std::memory_order_acquire);
1935  if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1936  // Guaranteed to be at least one element to dequeue!
1937 
1938  // Get the index. Note that since there's guaranteed to be at least one element, this
1939  // will never exceed tail. We need to do an acquire-release fence here since it's possible
1940  // that whatever condition got us to this point was for an earlier enqueued element (that
1941  // we already see the memory effects for), but that by the time we increment somebody else
1942  // has incremented it, and we need to see the memory effects for *that* element, which is
1943  // in such a case is necessarily visible on the thread that incremented it in the first
1944  // place with the more current condition (they must have acquired a tail that is at least
1945  // as recent).
1946  auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1947 
1948 
1949  // Determine which block the element is in
1950 
1951  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1952  auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1953 
1954  // We need to be careful here about subtracting and dividing because of index wrap-around.
1955  // When an index wraps, we need to preserve the sign of the offset when dividing it by the
1956  // block size (in order to get a correct signed block count offset in all cases):
1957  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1958  auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1959  auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / BLOCK_SIZE);
1960  auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
1961 
1962  // Dequeue
1963  auto& el = *((*block)[index]);
1964  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
1965  // Make sure the element is still fully dequeued and destroyed even if the assignment
1966  // throws
1967  struct Guard {
1968  Block* block;
1969  index_t index;
1970 
1971  ~Guard()
1972  {
1973  (*block)[index]->~T();
1974  block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1975  }
1976  } guard = { block, index };
1977 
1978  element = std::move(el);
1979  }
1980  else {
1981  element = std::move(el);
1982  el.~T();
1983  block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1984  }
1985 
1986  return true;
1987  }
1988  else {
1989  // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
1990  this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
1991  }
1992  }
1993 
1994  return false;
1995  }
1996 
1997  template<AllocationMode allocMode, typename It>
1998  bool enqueue_bulk(It itemFirst, size_t count)
1999  {
2000  // First, we need to make sure we have enough room to enqueue all of the elements;
2001  // this means pre-allocating blocks and putting them in the block index (but only if
2002  // all the allocations succeeded).
2003  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2004  auto startBlock = this->tailBlock;
2005  auto originalBlockIndexFront = pr_blockIndexFront;
2006  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2007 
2008  Block* firstAllocatedBlock = nullptr;
2009 
2010  // Figure out how many blocks we'll need to allocate, and do so
2011  size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2012  index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2013  if (blockBaseDiff > 0) {
2014  // Allocate as many blocks as possible from ahead
2015  while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2016  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2017  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2018 
2019  this->tailBlock = this->tailBlock->next;
2020  firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2021 
2022  auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2023  entry.base = currentTailIndex;
2024  entry.block = this->tailBlock;
2025  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2026  }
2027 
2028  // Now allocate as many blocks as necessary from the block pool
2029  while (blockBaseDiff > 0) {
2030  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2031  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2032 
2033  auto head = this->headIndex.load(std::memory_order_relaxed);
2034  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2035  bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2036  if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2037  if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) {
2038  // Failed to allocate, undo changes (but keep injected blocks)
2039  pr_blockIndexFront = originalBlockIndexFront;
2040  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2041  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2042  return false;
2043  }
2044 
2045  // pr_blockIndexFront is updated inside new_block_index, so we need to
2046  // update our fallback value too (since we keep the new index even if we
2047  // later fail)
2048  originalBlockIndexFront = originalBlockIndexSlotsUsed;
2049  }
2050 
2051  // Insert a new block in the circular linked list
2052  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2053  if (newBlock == nullptr) {
2054  pr_blockIndexFront = originalBlockIndexFront;
2055  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2056  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2057  return false;
2058  }
2059 
2060 #if MCDBGQ_TRACKMEM
2061  newBlock->owner = this;
2062 #endif
2063  newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2064  if (this->tailBlock == nullptr) {
2065  newBlock->next = newBlock;
2066  }
2067  else {
2068  newBlock->next = this->tailBlock->next;
2069  this->tailBlock->next = newBlock;
2070  }
2071  this->tailBlock = newBlock;
2072  firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2073 
2074  ++pr_blockIndexSlotsUsed;
2075 
2076  auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2077  entry.base = currentTailIndex;
2078  entry.block = this->tailBlock;
2079  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2080  }
2081 
2082  // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2083  // publish the new block index front
2084  auto block = firstAllocatedBlock;
2085  while (true) {
2086  block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2087  if (block == this->tailBlock) {
2088  break;
2089  }
2090  block = block->next;
2091  }
2092 
2093  if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2094  blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2095  }
2096  }
2097 
2098  // Enqueue, one block at a time
2099  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2100  currentTailIndex = startTailIndex;
2101  auto endBlock = this->tailBlock;
2102  this->tailBlock = startBlock;
2103  assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2104  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2105  this->tailBlock = firstAllocatedBlock;
2106  }
2107  while (true) {
2108  auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2109  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2110  stopIndex = newTailIndex;
2111  }
2112  if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2113  while (currentTailIndex != stopIndex) {
2114  new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2115  }
2116  }
2117  else {
2118  MOODYCAMEL_TRY {
2119  while (currentTailIndex != stopIndex) {
2120  // Must use copy constructor even if move constructor is available
2121  // because we may have to revert if there's an exception.
2122  // Sorry about the horrible templated next line, but it was the only way
2123  // to disable moving *at compile time*, which is important because a type
2124  // may only define a (noexcept) move constructor, and so calls to the
2125  // cctor will not compile, even if they are in an if branch that will never
2126  // be executed
2127  new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2128  ++currentTailIndex;
2129  ++itemFirst;
2130  }
2131  }
2132  MOODYCAMEL_CATCH (...) {
2133  // Oh dear, an exception's been thrown -- destroy the elements that
2134  // were enqueued so far and revert the entire bulk operation (we'll keep
2135  // any allocated blocks in our linked list for later, though).
2136  auto constructedStopIndex = currentTailIndex;
2137  auto lastBlockEnqueued = this->tailBlock;
2138 
2139  pr_blockIndexFront = originalBlockIndexFront;
2140  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2141  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2142 
2144  auto block = startBlock;
2145  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2146  block = firstAllocatedBlock;
2147  }
2148  currentTailIndex = startTailIndex;
2149  while (true) {
2150  stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2151  if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2152  stopIndex = constructedStopIndex;
2153  }
2154  while (currentTailIndex != stopIndex) {
2155  (*block)[currentTailIndex++]->~T();
2156  }
2157  if (block == lastBlockEnqueued) {
2158  break;
2159  }
2160  block = block->next;
2161  }
2162  }
2164  }
2165  }
2166 
2167  if (this->tailBlock == endBlock) {
2168  assert(currentTailIndex == newTailIndex);
2169  break;
2170  }
2171  this->tailBlock = this->tailBlock->next;
2172  }
2173 
2174  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst))) && firstAllocatedBlock != nullptr) {
2175  blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2176  }
2177 
2178  this->tailIndex.store(newTailIndex, std::memory_order_release);
2179  return true;
2180  }
2181 
2182  template<typename It>
2183  size_t dequeue_bulk(It& itemFirst, size_t max)
2184  {
2185  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2186  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2187  auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2188  if (details::circular_less_than<size_t>(0, desiredCount)) {
2189  desiredCount = desiredCount < max ? desiredCount : max;
2190  std::atomic_thread_fence(std::memory_order_acquire);
2191 
2192  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);;
2193 
2194  tail = this->tailIndex.load(std::memory_order_acquire);
2195  auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2196  if (details::circular_less_than<size_t>(0, actualCount)) {
2197  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2198  if (actualCount < desiredCount) {
2199  this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2200  }
2201 
2202  // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2203  // will never exceed tail.
2204  auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2205 
2206  // Determine which block the first element is in
2207  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2208  auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2209 
2210  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2211  auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2212  auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
2213  auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2214 
2215  // Iterate the blocks and dequeue
2216  auto index = firstIndex;
2217  do {
2218  auto firstIndexInBlock = index;
2219  auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2220  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2221  auto block = localBlockIndex->entries[indexIndex].block;
2222  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2223  while (index != endIndex) {
2224  auto& el = *((*block)[index]);
2225  *itemFirst++ = std::move(el);
2226  el.~T();
2227  ++index;
2228  }
2229  }
2230  else {
2231  MOODYCAMEL_TRY {
2232  while (index != endIndex) {
2233  auto& el = *((*block)[index]);
2234  *itemFirst = std::move(el);
2235  ++itemFirst;
2236  el.~T();
2237  ++index;
2238  }
2239  }
2240  MOODYCAMEL_CATCH (...) {
2241  // It's too late to revert the dequeue, but we can make sure that all
2242  // the dequeued objects are properly destroyed and the block index
2243  // (and empty count) are properly updated before we propagate the exception
2244  do {
2245  block = localBlockIndex->entries[indexIndex].block;
2246  while (index != endIndex) {
2247  (*block)[index++]->~T();
2248  }
2249  block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2250  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2251 
2252  firstIndexInBlock = index;
2253  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2254  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2255  } while (index != firstIndex + actualCount);
2256 
2258  }
2259  }
2260  block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2261  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2262  } while (index != firstIndex + actualCount);
2263 
2264  return actualCount;
2265  }
2266  else {
2267  // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2268  this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2269  }
2270  }
2271 
2272  return 0;
2273  }
2274 
2275  private:
2277  {
2278  index_t base;
2280  };
2281 
2283  {
2284  size_t size;
2285  std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2287  void* prev;
2288  };
2289 
2290 
2291  bool new_block_index(size_t numberOfFilledSlotsToExpose)
2292  {
2293  auto prevBlockSizeMask = pr_blockIndexSize - 1;
2294 
2295  // Create the new block
2296  pr_blockIndexSize <<= 1;
2297  auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
2298  if (newRawPtr == nullptr) {
2299  pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2300  return false;
2301  }
2302 
2303  auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
2304 
2305  // Copy in all the old indices, if any
2306  size_t j = 0;
2307  if (pr_blockIndexSlotsUsed != 0) {
2308  auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2309  do {
2310  newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2311  i = (i + 1) & prevBlockSizeMask;
2312  } while (i != pr_blockIndexFront);
2313  }
2314 
2315  // Update everything
2316  auto header = new (newRawPtr) BlockIndexHeader;
2317  header->size = pr_blockIndexSize;
2318  header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2319  header->entries = newBlockIndexEntries;
2320  header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2321 
2322  pr_blockIndexFront = j;
2323  pr_blockIndexEntries = newBlockIndexEntries;
2324  pr_blockIndexRaw = newRawPtr;
2325  blockIndex.store(header, std::memory_order_release);
2326 
2327  return true;
2328  }
2329 
2330  private:
2331  std::atomic<BlockIndexHeader*> blockIndex;
2332 
2333  // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2336  size_t pr_blockIndexFront; // Next slot (not current)
2339 
2340 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2341  public:
2342  ExplicitProducer* nextExplicitProducer;
2343  private:
2344 #endif
2345 
2346 #if MCDBGQ_TRACKMEM
2347  friend struct MemStats;
2348 #endif
2349  };
2350 
2351 
2352  //////////////////////////////////
2353  // Implicit queue
2354  //////////////////////////////////
2355 
2357  {
2359  ProducerBase(parent, false),
2360  nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
2361  blockIndex(nullptr)
2362  {
2363  new_block_index();
2364  }
2365 
2367  {
2368  // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2369  // completed already; this means that all undequeued elements are placed contiguously across
2370  // contiguous blocks, and that only the first and last remaining blocks can be only partially
2371  // empty (all other remaining blocks must be completely full).
2372 
2373 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2374  // Unregister ourselves for thread termination notification
2375  if (!this->inactive.load(std::memory_order_relaxed)) {
2376  details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2377  }
2378 #endif
2379 
2380  // Destroy all remaining elements!
2381  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2382  auto index = this->headIndex.load(std::memory_order_relaxed);
2383  Block* block = nullptr;
2384  assert(index == tail || details::circular_less_than(index, tail));
2385  bool forceFreeLastBlock = index != tail; // If we enter the loop, then the last (tail) block will not be freed
2386  while (index != tail) {
2387  if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2388  if (block != nullptr) {
2389  // Free the old block
2390  this->parent->add_block_to_free_list(block);
2391  }
2392 
2393  block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2394  }
2395 
2396  ((*block)[index])->~T();
2397  ++index;
2398  }
2399  // Even if the queue is empty, there's still one block that's not on the free list
2400  // (unless the head index reached the end of it, in which case the tail will be poised
2401  // to create a new block).
2402  if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2403  this->parent->add_block_to_free_list(this->tailBlock);
2404  }
2405 
2406  // Destroy block index
2407  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2408  if (localBlockIndex != nullptr) {
2409  for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2410  localBlockIndex->index[i]->~BlockIndexEntry();
2411  }
2412  do {
2413  auto prev = localBlockIndex->prev;
2414  localBlockIndex->~BlockIndexHeader();
2415  (Traits::free)(localBlockIndex);
2416  localBlockIndex = prev;
2417  } while (localBlockIndex != nullptr);
2418  }
2419  }
2420 
2421  template<AllocationMode allocMode, typename U>
2422  inline bool enqueue(U&& element)
2423  {
2424  index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2425  index_t newTailIndex = 1 + currentTailIndex;
2426  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2427  // We reached the end of a block, start a new one
2428  auto head = this->headIndex.load(std::memory_order_relaxed);
2429  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2430  if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2431  return false;
2432  }
2433 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2434  debug::DebugLock lock(mutex);
2435 #endif
2436  // Find out where we'll be inserting this block in the block index
2437  BlockIndexEntry* idxEntry;
2438  if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2439  return false;
2440  }
2441 
2442  // Get ahold of a new block
2443  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2444  if (newBlock == nullptr) {
2445  rewind_block_index_tail();
2446  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2447  return false;
2448  }
2449 #if MCDBGQ_TRACKMEM
2450  newBlock->owner = this;
2451 #endif
2452  newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2453 
2454  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
2455  // May throw, try to insert now before we publish the fact that we have this new block
2456  MOODYCAMEL_TRY {
2457  new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2458  }
2459  MOODYCAMEL_CATCH (...) {
2460  rewind_block_index_tail();
2461  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2462  this->parent->add_block_to_free_list(newBlock);
2464  }
2465  }
2466 
2467  // Insert the new block into the index
2468  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2469 
2470  this->tailBlock = newBlock;
2471 
2472  if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
2473  this->tailIndex.store(newTailIndex, std::memory_order_release);
2474  return true;
2475  }
2476  }
2477 
2478  // Enqueue
2479  new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2480 
2481  this->tailIndex.store(newTailIndex, std::memory_order_release);
2482  return true;
2483  }
2484 
2485  template<typename U>
2486  bool dequeue(U& element)
2487  {
2488  // See ExplicitProducer::dequeue for rationale and explanation
2489  index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2490  index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2491  if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2492  std::atomic_thread_fence(std::memory_order_acquire);
2493 
2494  index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2495  tail = this->tailIndex.load(std::memory_order_acquire);
2496  if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2497  index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2498 
2499  // Determine which block the element is in
2500  auto entry = get_block_index_entry_for_index(index);
2501 
2502  // Dequeue
2503  auto block = entry->value.load(std::memory_order_relaxed);
2504  auto& el = *((*block)[index]);
2505 
2506  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2507 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2508  // Note: Acquiring the mutex with every dequeue instead of only when a block
2509  // is released is very sub-optimal, but it is, after all, purely debug code.
2510  debug::DebugLock lock(producer->mutex);
2511 #endif
2512  struct Guard {
2513  Block* block;
2514  index_t index;
2515  BlockIndexEntry* entry;
2516  ConcurrentQueue* parent;
2517 
2518  ~Guard()
2519  {
2520  (*block)[index]->~T();
2521  if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2522  entry->value.store(nullptr, std::memory_order_relaxed);
2523  parent->add_block_to_free_list(block);
2524  }
2525  }
2526  } guard = { block, index, entry, this->parent };
2527 
2528  element = std::move(el);
2529  }
2530  else {
2531  element = std::move(el);
2532  el.~T();
2533 
2534  if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2535  {
2536 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2537  debug::DebugLock lock(mutex);
2538 #endif
2539  // Add the block back into the global free pool (and remove from block index)
2540  entry->value.store(nullptr, std::memory_order_relaxed);
2541  }
2542  this->parent->add_block_to_free_list(block); // releases the above store
2543  }
2544  }
2545 
2546  return true;
2547  }
2548  else {
2549  this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2550  }
2551  }
2552 
2553  return false;
2554  }
2555 
2556  template<AllocationMode allocMode, typename It>
2557  bool enqueue_bulk(It itemFirst, size_t count)
2558  {
2559  // First, we need to make sure we have enough room to enqueue all of the elements;
2560  // this means pre-allocating blocks and putting them in the block index (but only if
2561  // all the allocations succeeded).
2562 
2563  // Note that the tailBlock we start off with may not be owned by us any more;
2564  // this happens if it was filled up exactly to the top (setting tailIndex to
2565  // the first index of the next block which is not yet allocated), then dequeued
2566  // completely (putting it on the free list) before we enqueue again.
2567 
2568  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2569  auto startBlock = this->tailBlock;
2570  Block* firstAllocatedBlock = nullptr;
2571  auto endBlock = this->tailBlock;
2572 
2573  // Figure out how many blocks we'll need to allocate, and do so
2574  size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2575  index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2576  if (blockBaseDiff > 0) {
2577 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2578  debug::DebugLock lock(mutex);
2579 #endif
2580  do {
2581  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2582  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2583 
2584  // Find out where we'll be inserting this block in the block index
2585  BlockIndexEntry* idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
2586  Block* newBlock;
2587  bool indexInserted = false;
2588  auto head = this->headIndex.load(std::memory_order_relaxed);
2589  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2590  bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2591  if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
2592  // Index allocation or block allocation failed; revert any other allocations
2593  // and index insertions done so far for this operation
2594  if (indexInserted) {
2595  rewind_block_index_tail();
2596  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2597  }
2598  currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2599  for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2600  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2601  idxEntry = get_block_index_entry_for_index(currentTailIndex);
2602  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2603  rewind_block_index_tail();
2604  }
2605  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2606  this->tailBlock = startBlock;
2607 
2608  return false;
2609  }
2610 
2611 #if MCDBGQ_TRACKMEM
2612  newBlock->owner = this;
2613 #endif
2614  newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2615  newBlock->next = nullptr;
2616 
2617  // Insert the new block into the index
2618  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2619 
2620  // Store the chain of blocks so that we can undo if later allocations fail,
2621  // and so that we can find the blocks when we do the actual enqueueing
2622  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
2623  assert(this->tailBlock != nullptr);
2624  this->tailBlock->next = newBlock;
2625  }
2626  this->tailBlock = newBlock;
2627  endBlock = newBlock;
2628  firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2629  } while (blockBaseDiff > 0);
2630  }
2631 
2632  // Enqueue, one block at a time
2633  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2634  currentTailIndex = startTailIndex;
2635  this->tailBlock = startBlock;
2636  assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2637  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2638  this->tailBlock = firstAllocatedBlock;
2639  }
2640  while (true) {
2641  auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2642  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2643  stopIndex = newTailIndex;
2644  }
2645  if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2646  while (currentTailIndex != stopIndex) {
2647  new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2648  }
2649  }
2650  else {
2651  MOODYCAMEL_TRY {
2652  while (currentTailIndex != stopIndex) {
2653  new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2654  ++currentTailIndex;
2655  ++itemFirst;
2656  }
2657  }
2658  MOODYCAMEL_CATCH (...) {
2659  auto constructedStopIndex = currentTailIndex;
2660  auto lastBlockEnqueued = this->tailBlock;
2661 
2663  auto block = startBlock;
2664  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2665  block = firstAllocatedBlock;
2666  }
2667  currentTailIndex = startTailIndex;
2668  while (true) {
2669  stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2670  if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2671  stopIndex = constructedStopIndex;
2672  }
2673  while (currentTailIndex != stopIndex) {
2674  (*block)[currentTailIndex++]->~T();
2675  }
2676  if (block == lastBlockEnqueued) {
2677  break;
2678  }
2679  block = block->next;
2680  }
2681  }
2682 
2683  currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2684  for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2685  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2686  auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2687  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2688  rewind_block_index_tail();
2689  }
2690  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2691  this->tailBlock = startBlock;
2693  }
2694  }
2695 
2696  if (this->tailBlock == endBlock) {
2697  assert(currentTailIndex == newTailIndex);
2698  break;
2699  }
2700  this->tailBlock = this->tailBlock->next;
2701  }
2702  this->tailIndex.store(newTailIndex, std::memory_order_release);
2703  return true;
2704  }
2705 
2706  template<typename It>
2707  size_t dequeue_bulk(It& itemFirst, size_t max)
2708  {
2709  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2710  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2711  auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2712  if (details::circular_less_than<size_t>(0, desiredCount)) {
2713  desiredCount = desiredCount < max ? desiredCount : max;
2714  std::atomic_thread_fence(std::memory_order_acquire);
2715 
2716  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2717 
2718  tail = this->tailIndex.load(std::memory_order_acquire);
2719  auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2720  if (details::circular_less_than<size_t>(0, actualCount)) {
2721  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2722  if (actualCount < desiredCount) {
2723  this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2724  }
2725 
2726  // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2727  // will never exceed tail.
2728  auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2729 
2730  // Iterate the blocks and dequeue
2731  auto index = firstIndex;
2732  BlockIndexHeader* localBlockIndex;
2733  auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2734  do {
2735  auto blockStartIndex = index;
2736  auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2737  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2738 
2739  auto entry = localBlockIndex->index[indexIndex];
2740  auto block = entry->value.load(std::memory_order_relaxed);
2741  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2742  while (index != endIndex) {
2743  auto& el = *((*block)[index]);
2744  *itemFirst++ = std::move(el);
2745  el.~T();
2746  ++index;
2747  }
2748  }
2749  else {
2750  MOODYCAMEL_TRY {
2751  while (index != endIndex) {
2752  auto& el = *((*block)[index]);
2753  *itemFirst = std::move(el);
2754  ++itemFirst;
2755  el.~T();
2756  ++index;
2757  }
2758  }
2759  MOODYCAMEL_CATCH (...) {
2760  do {
2761  entry = localBlockIndex->index[indexIndex];
2762  block = entry->value.load(std::memory_order_relaxed);
2763  while (index != endIndex) {
2764  (*block)[index++]->~T();
2765  }
2766 
2767  if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2768 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2769  debug::DebugLock lock(mutex);
2770 #endif
2771  entry->value.store(nullptr, std::memory_order_relaxed);
2772  this->parent->add_block_to_free_list(block);
2773  }
2774  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2775 
2776  blockStartIndex = index;
2777  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2778  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2779  } while (index != firstIndex + actualCount);
2780 
2782  }
2783  }
2784  if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2785  {
2786 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2787  debug::DebugLock lock(mutex);
2788 #endif
2789  // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
2790  // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
2791  entry->value.store(nullptr, std::memory_order_relaxed);
2792  }
2793  this->parent->add_block_to_free_list(block); // releases the above store
2794  }
2795  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2796  } while (index != firstIndex + actualCount);
2797 
2798  return actualCount;
2799  }
2800  else {
2801  this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2802  }
2803  }
2804 
2805  return 0;
2806  }
2807 
2808  private:
2809  // The block size must be > 1, so any number with the low bit set is an invalid block base index
2810  static const index_t INVALID_BLOCK_BASE = 1;
2811 
2813  {
2814  std::atomic<index_t> key;
2815  std::atomic<Block*> value;
2816  };
2817 
2819  {
2820  size_t capacity;
2821  std::atomic<size_t> tail;
2825  };
2826 
2827  template<AllocationMode allocMode>
2828  inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
2829  {
2830  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
2831  if (localBlockIndex == nullptr) {
2832  return false; // this can happen if new_block_index failed in the constructor
2833  }
2834  auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2835  idxEntry = localBlockIndex->index[newTail];
2836  if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2837  idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
2838 
2839  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2840  localBlockIndex->tail.store(newTail, std::memory_order_release);
2841  return true;
2842  }
2843 
2844  // No room in the old block index, try to allocate another one!
2845  if (allocMode == CannotAlloc || !new_block_index()) {
2846  return false;
2847  }
2848  localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2849  newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2850  idxEntry = localBlockIndex->index[newTail];
2851  assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2852  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2853  localBlockIndex->tail.store(newTail, std::memory_order_release);
2854  return true;
2855  }
2856 
2858  {
2859  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2860  localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2861  }
2862 
2864  {
2865  BlockIndexHeader* localBlockIndex;
2866  auto idx = get_block_index_index_for_index(index, localBlockIndex);
2867  return localBlockIndex->index[idx];
2868  }
2869 
2870  inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
2871  {
2872 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2873  debug::DebugLock lock(mutex);
2874 #endif
2875  index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2876  localBlockIndex = blockIndex.load(std::memory_order_acquire);
2877  auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2878  auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2879  assert(tailBase != INVALID_BLOCK_BASE);
2880  // Note: Must use division instead of shift because the index may wrap around, causing a negative
2881  // offset, whose negativity we want to preserve
2882  auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
2883  size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2884  assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
2885  return idx;
2886  }
2887 
2889  {
2890  auto prev = blockIndex.load(std::memory_order_relaxed);
2891  size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
2892  auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
2893  auto raw = static_cast<char*>((Traits::malloc)(
2894  sizeof(BlockIndexHeader) +
2895  std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
2896  std::alignment_of<BlockIndexEntry*>::value - 1 + sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
2897  if (raw == nullptr) {
2898  return false;
2899  }
2900 
2901  auto header = new (raw) BlockIndexHeader;
2902  auto entries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
2903  auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount));
2904  if (prev != nullptr) {
2905  auto prevTail = prev->tail.load(std::memory_order_relaxed);
2906  auto prevPos = prevTail;
2907  size_t i = 0;
2908  do {
2909  prevPos = (prevPos + 1) & (prev->capacity - 1);
2910  index[i++] = prev->index[prevPos];
2911  } while (prevPos != prevTail);
2912  assert(i == prevCapacity);
2913  }
2914  for (size_t i = 0; i != entryCount; ++i) {
2915  new (entries + i) BlockIndexEntry;
2916  entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2917  index[prevCapacity + i] = entries + i;
2918  }
2919  header->prev = prev;
2920  header->entries = entries;
2921  header->index = index;
2922  header->capacity = nextBlockIndexCapacity;
2923  header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
2924 
2925  blockIndex.store(header, std::memory_order_release);
2926 
2927  nextBlockIndexCapacity <<= 1;
2928 
2929  return true;
2930  }
2931 
2932  private:
2934  std::atomic<BlockIndexHeader*> blockIndex;
2935 
2936 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2937  public:
2938  details::ThreadExitListener threadExitListener;
2939  private:
2940 #endif
2941 
2942 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2943  public:
2944  ImplicitProducer* nextImplicitProducer;
2945  private:
2946 #endif
2947 
2948 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2949  mutable debug::DebugMutex mutex;
2950 #endif
2951 #if MCDBGQ_TRACKMEM
2952  friend struct MemStats;
2953 #endif
2954  };
2955 
2956 
2957  //////////////////////////////////
2958  // Block pool manipulation
2959  //////////////////////////////////
2960 
2961  void populate_initial_block_list(size_t blockCount)
2962  {
2963  initialBlockPoolSize = blockCount;
2964  if (initialBlockPoolSize == 0) {
2965  initialBlockPool = nullptr;
2966  return;
2967  }
2968 
2969  initialBlockPool = create_array<Block>(blockCount);
2970  if (initialBlockPool == nullptr) {
2971  initialBlockPoolSize = 0;
2972  }
2973  for (size_t i = 0; i < initialBlockPoolSize; ++i) {
2974  initialBlockPool[i].dynamicallyAllocated = false;
2975  }
2976  }
2977 
2979  {
2980  if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
2981  return nullptr;
2982  }
2983 
2984  auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
2985 
2986  return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
2987  }
2988 
2989  inline void add_block_to_free_list(Block* block)
2990  {
2991 #if MCDBGQ_TRACKMEM
2992  block->owner = nullptr;
2993 #endif
2994  freeList.add(block);
2995  }
2996 
2997  inline void add_blocks_to_free_list(Block* block)
2998  {
2999  while (block != nullptr) {
3000  auto next = block->next;
3001  add_block_to_free_list(block);
3002  block = next;
3003  }
3004  }
3005 
3007  {
3008  return freeList.try_get();
3009  }
3010 
3011  // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3012  template<AllocationMode canAlloc>
3014  {
3015  auto block = try_get_block_from_initial_pool();
3016  if (block != nullptr) {
3017  return block;
3018  }
3019 
3020  block = try_get_block_from_free_list();
3021  if (block != nullptr) {
3022  return block;
3023  }
3024 
3025  if (canAlloc == CanAlloc) {
3026  return create<Block>();
3027  }
3028 
3029  return nullptr;
3030  }
3031 
3032 
3033 #if MCDBGQ_TRACKMEM
3034  public:
3035  struct MemStats {
3036  size_t allocatedBlocks;
3037  size_t usedBlocks;
3038  size_t freeBlocks;
3039  size_t ownedBlocksExplicit;
3040  size_t ownedBlocksImplicit;
3041  size_t implicitProducers;
3042  size_t explicitProducers;
3043  size_t elementsEnqueued;
3044  size_t blockClassBytes;
3045  size_t queueClassBytes;
3046  size_t implicitBlockIndexBytes;
3047  size_t explicitBlockIndexBytes;
3048 
3049  friend class ConcurrentQueue;
3050 
3051  private:
3052  static MemStats getFor(ConcurrentQueue* q)
3053  {
3054  MemStats stats = { 0 };
3055 
3056  stats.elementsEnqueued = q->size_approx();
3057 
3058  auto block = q->freeList.head_unsafe();
3059  while (block != nullptr) {
3060  ++stats.allocatedBlocks;
3061  ++stats.freeBlocks;
3062  block = block->freeListNext.load(std::memory_order_relaxed);
3063  }
3064 
3065  for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3066  bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3067  stats.implicitProducers += implicit ? 1 : 0;
3068  stats.explicitProducers += implicit ? 0 : 1;
3069 
3070  if (implicit) {
3071  auto prod = static_cast<ImplicitProducer*>(ptr);
3072  stats.queueClassBytes += sizeof(ImplicitProducer);
3073  auto head = prod->headIndex.load(std::memory_order_relaxed);
3074  auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3075  auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3076  if (hash != nullptr) {
3077  for (size_t i = 0; i != hash->capacity; ++i) {
3078  if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3079  ++stats.allocatedBlocks;
3080  ++stats.ownedBlocksImplicit;
3081  }
3082  }
3083  stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3084  for (; hash != nullptr; hash = hash->prev) {
3085  stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3086  }
3087  }
3088  for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3089  //auto block = prod->get_block_index_entry_for_index(head);
3090  ++stats.usedBlocks;
3091  }
3092  }
3093  else {
3094  auto prod = static_cast<ExplicitProducer*>(ptr);
3095  stats.queueClassBytes += sizeof(ExplicitProducer);
3096  auto tailBlock = prod->tailBlock;
3097  bool wasNonEmpty = false;
3098  if (tailBlock != nullptr) {
3099  auto block = tailBlock;
3100  do {
3101  ++stats.allocatedBlocks;
3102  if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3103  ++stats.usedBlocks;
3104  wasNonEmpty = wasNonEmpty || block != tailBlock;
3105  }
3106  ++stats.ownedBlocksExplicit;
3107  block = block->next;
3108  } while (block != tailBlock);
3109  }
3110  auto index = prod->blockIndex.load(std::memory_order_relaxed);
3111  while (index != nullptr) {
3112  stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3113  index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3114  }
3115  }
3116  }
3117 
3118  auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3119  stats.allocatedBlocks += freeOnInitialPool;
3120  stats.freeBlocks += freeOnInitialPool;
3121 
3122  stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3123  stats.queueClassBytes += sizeof(ConcurrentQueue);
3124 
3125  return stats;
3126  }
3127  };
3128 
3129  // For debugging only. Not thread-safe.
3130  MemStats getMemStats()
3131  {
3132  return MemStats::getFor(this);
3133  }
3134  private:
3135  friend struct MemStats;
3136 #endif
3137 
3138 
3139  //////////////////////////////////
3140  // Producer list manipulation
3141  //////////////////////////////////
3142 
3143  ProducerBase* recycle_or_create_producer(bool isExplicit)
3144  {
3145  bool recycled;
3146  return recycle_or_create_producer(isExplicit, recycled);
3147  }
3148 
3149  ProducerBase* recycle_or_create_producer(bool isExplicit, bool& recycled)
3150  {
3151 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3152  debug::DebugLock lock(implicitProdMutex);
3153 #endif
3154  // Try to re-use one first
3155  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3156  if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3157  bool expected = true;
3158  if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
3159  // We caught one! It's been marked as activated, the caller can have it
3160  recycled = true;
3161  return ptr;
3162  }
3163  }
3164  }
3165 
3166  recycled = false;
3167  return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
3168  }
3169 
3170  ProducerBase* add_producer(ProducerBase* producer)
3171  {
3172  // Handle failed memory allocation
3173  if (producer == nullptr) {
3174  return nullptr;
3175  }
3176 
3177  producerCount.fetch_add(1, std::memory_order_relaxed);
3178 
3179  // Add it to the lock-free list
3180  auto prevTail = producerListTail.load(std::memory_order_relaxed);
3181  do {
3182  producer->next = prevTail;
3183  } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3184 
3185 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3186  if (producer->isExplicit) {
3187  auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3188  do {
3189  static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3190  } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3191  }
3192  else {
3193  auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3194  do {
3195  static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3196  } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3197  }
3198 #endif
3199 
3200  return producer;
3201  }
3202 
3204  {
3205  // After another instance is moved-into/swapped-with this one, all the
3206  // producers we stole still think their parents are the other queue.
3207  // So fix them up!
3208  for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3209  ptr->parent = this;
3210  }
3211  }
3212 
3213 
3214  //////////////////////////////////
3215  // Implicit producer hash
3216  //////////////////////////////////
3217 
3219  {
3220  std::atomic<details::thread_id_t> key;
3221  ImplicitProducer* value; // No need for atomicity since it's only read by the thread that sets it in the first place
3222 
3223  ImplicitProducerKVP() : value(nullptr) { }
3224 
3225  ImplicitProducerKVP(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3226  {
3227  key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3228  value = other.value;
3229  }
3230 
3231  inline ImplicitProducerKVP& operator=(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3232  {
3233  swap(other);
3234  return *this;
3235  }
3236 
3237  inline void swap(ImplicitProducerKVP& other) MOODYCAMEL_NOEXCEPT
3238  {
3239  if (this != &other) {
3240  details::swap_relaxed(key, other.key);
3241  std::swap(value, other.value);
3242  }
3243  }
3244  };
3245 
3246  template<typename XT, typename XTraits>
3248 
3250  {
3251  size_t capacity;
3254  };
3255 
3257  {
3258  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3259 
3260  implicitProducerHashCount.store(0, std::memory_order_relaxed);
3261  auto hash = &initialImplicitProducerHash;
3262  hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3263  hash->entries = &initialImplicitProducerHashEntries[0];
3264  for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3265  initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3266  }
3267  hash->prev = nullptr;
3268  implicitProducerHash.store(hash, std::memory_order_relaxed);
3269  }
3270 
3272  {
3273  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3274 
3275  // Swap (assumes our implicit producer hash is initialized)
3276  initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3277  initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3279 
3280  details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3281 
3282  details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3283  if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3284  implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3285  }
3286  else {
3287  ImplicitProducerHash* hash;
3288  for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3289  continue;
3290  }
3291  hash->prev = &initialImplicitProducerHash;
3292  }
3293  if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3294  other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3295  }
3296  else {
3297  ImplicitProducerHash* hash;
3298  for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3299  continue;
3300  }
3301  hash->prev = &other.initialImplicitProducerHash;
3302  }
3303  }
3304 
3305  // Only fails (returns nullptr) if memory allocation fails
3306  ImplicitProducer* get_or_add_implicit_producer()
3307  {
3308  // Note that since the data is essentially thread-local (key is thread ID),
3309  // there's a reduced need for fences (memory ordering is already consistent
3310  // for any individual thread), except for the current table itself.
3311 
3312  // Start by looking for the thread ID in the current and all previous hash tables.
3313  // If it's not found, it must not be in there yet, since this same thread would
3314  // have added it previously to one of the tables that we traversed.
3315 
3316  // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3317 
3318 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3319  debug::DebugLock lock(implicitProdMutex);
3320 #endif
3321 
3322  auto id = details::thread_id();
3323  auto hashedId = details::hash_thread_id(id);
3324 
3325  auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3326  for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3327  // Look for the id in this hash
3328  auto index = hashedId;
3329  while (true) { // Not an infinite loop because at least one slot is free in the hash table
3330  index &= hash->capacity - 1;
3331 
3332  auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3333  if (probedKey == id) {
3334  // Found it! If we had to search several hashes deep, though, we should lazily add it
3335  // to the current main hash table to avoid the extended search next time.
3336  // Note there's guaranteed to be room in the current hash table since every subsequent
3337  // table implicitly reserves space for all previous tables (there's only one
3338  // implicitProducerHashCount).
3339  auto value = hash->entries[index].value;
3340  if (hash != mainHash) {
3341  index = hashedId;
3342  while (true) {
3343  index &= mainHash->capacity - 1;
3344  probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3345  auto empty = details::invalid_thread_id;
3346 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3347  auto reusable = details::invalid_thread_id2;
3348  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3349  (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3350 #else
3351  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3352 #endif
3353  mainHash->entries[index].value = value;
3354  break;
3355  }
3356  ++index;
3357  }
3358  }
3359 
3360  return value;
3361  }
3362  if (probedKey == details::invalid_thread_id) {
3363  break; // Not in this hash table
3364  }
3365  ++index;
3366  }
3367  }
3368 
3369  // Insert!
3370  auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3371  while (true) {
3372  if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3373  // We've acquired the resize lock, try to allocate a bigger hash table.
3374  // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3375  // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3376  // locked block).
3377  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3378  if (newCount >= (mainHash->capacity >> 1)) {
3379  auto newCapacity = mainHash->capacity << 1;
3380  while (newCount >= (newCapacity >> 1)) {
3381  newCapacity <<= 1;
3382  }
3383  auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
3384  if (raw == nullptr) {
3385  // Allocation failed
3386  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3387  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3388  return nullptr;
3389  }
3390 
3391  auto newHash = new (raw) ImplicitProducerHash;
3392  newHash->capacity = newCapacity;
3393  newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
3394  for (size_t i = 0; i != newCapacity; ++i) {
3395  new (newHash->entries + i) ImplicitProducerKVP;
3396  newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3397  }
3398  newHash->prev = mainHash;
3399  implicitProducerHash.store(newHash, std::memory_order_release);
3400  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3401  mainHash = newHash;
3402  }
3403  else {
3404  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3405  }
3406  }
3407 
3408  // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3409  // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3410  // always be true)
3411  if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3412  bool recycled;
3413  auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
3414  if (producer == nullptr) {
3415  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3416  return nullptr;
3417  }
3418  if (recycled) {
3419  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3420  }
3421 
3422 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3423  producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3424  producer->threadExitListener.userData = producer;
3425  details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3426 #endif
3427 
3428  auto index = hashedId;
3429  while (true) {
3430  index &= mainHash->capacity - 1;
3431  auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3432 
3433  auto empty = details::invalid_thread_id;
3434 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3435  auto reusable = details::invalid_thread_id2;
3436  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3437  (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3438 #else
3439  if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3440 #endif
3441  mainHash->entries[index].value = producer;
3442  break;
3443  }
3444  ++index;
3445  }
3446  return producer;
3447  }
3448 
3449  // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3450  // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3451  // we try to allocate ourselves).
3452  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3453  }
3454  }
3455 
3456 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3457  void implicit_producer_thread_exited(ImplicitProducer* producer)
3458  {
3459  // Remove from thread exit listeners
3460  details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3461 
3462  // Remove from hash
3463 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3464  debug::DebugLock lock(implicitProdMutex);
3465 #endif
3466  auto hash = implicitProducerHash.load(std::memory_order_acquire);
3467  assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3468  auto id = details::thread_id();
3469  auto hashedId = details::hash_thread_id(id);
3470  details::thread_id_t probedKey;
3471 
3472  // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3473  // trying to add an entry thinking there's a free slot (because they reused a producer)
3474  for (; hash != nullptr; hash = hash->prev) {
3475  auto index = hashedId;
3476  do {
3477  index &= hash->capacity - 1;
3478  probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3479  if (probedKey == id) {
3480  hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3481  break;
3482  }
3483  ++index;
3484  } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3485  }
3486 
3487  // Mark the queue as being recyclable
3488  producer->inactive.store(true, std::memory_order_release);
3489  }
3490 
3491  static void implicit_producer_thread_exited_callback(void* userData)
3492  {
3493  auto producer = static_cast<ImplicitProducer*>(userData);
3494  auto queue = producer->parent;
3495  queue->implicit_producer_thread_exited(producer);
3496  }
3497 #endif
3498 
3499  //////////////////////////////////
3500  // Utility functions
3501  //////////////////////////////////
3502 
3503  template<typename U>
3504  static inline U* create_array(size_t count)
3505  {
3506  assert(count > 0);
3507  auto p = static_cast<U*>((Traits::malloc)(sizeof(U) * count));
3508  if (p == nullptr) {
3509  return nullptr;
3510  }
3511 
3512  for (size_t i = 0; i != count; ++i) {
3513  new (p + i) U();
3514  }
3515  return p;
3516  }
3517 
3518  template<typename U>
3519  static inline void destroy_array(U* p, size_t count)
3520  {
3521  if (p != nullptr) {
3522  assert(count > 0);
3523  for (size_t i = count; i != 0; ) {
3524  (p + --i)->~U();
3525  }
3526  (Traits::free)(p);
3527  }
3528  }
3529 
3530  template<typename U>
3531  static inline U* create()
3532  {
3533  auto p = (Traits::malloc)(sizeof(U));
3534  return p != nullptr ? new (p) U : nullptr;
3535  }
3536 
3537  template<typename U, typename A1>
3538  static inline U* create(A1&& a1)
3539  {
3540  auto p = (Traits::malloc)(sizeof(U));
3541  return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
3542  }
3543 
3544  template<typename U>
3545  static inline void destroy(U* p)
3546  {
3547  if (p != nullptr) {
3548  p->~U();
3549  }
3550  (Traits::free)(p);
3551  }
3552 
3553 private:
3554  std::atomic<ProducerBase*> producerListTail;
3555  std::atomic<std::uint32_t> producerCount;
3556 
3557  std::atomic<size_t> initialBlockPoolIndex;
3560 
3561 #if !MCDBGQ_USEDEBUGFREELIST
3562  FreeList<Block> freeList;
3563 #else
3564  debug::DebugFreeList<Block> freeList;
3565 #endif
3566 
3567  std::atomic<ImplicitProducerHash*> implicitProducerHash;
3568  std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3569  ImplicitProducerHash initialImplicitProducerHash;
3570  std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3572 
3573  std::atomic<std::uint32_t> nextExplicitConsumerId;
3574  std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3575 
3576 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3577  debug::DebugMutex implicitProdMutex;
3578 #endif
3579 
3580 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3581  std::atomic<ExplicitProducer*> explicitProducers;
3582  std::atomic<ImplicitProducer*> implicitProducers;
3583 #endif
3584 };
3585 
3586 
3587 template<typename T, typename Traits>
3589  : producer(queue.recycle_or_create_producer(true))
3590 {
3591  if (producer != nullptr) {
3592  producer->token = this;
3593  }
3594 }
3595 
3596 template<typename T, typename Traits>
3598  : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3599 {
3600  if (producer != nullptr) {
3601  producer->token = this;
3602  }
3603 }
3604 
3605 template<typename T, typename Traits>
3607  : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3608 {
3609  initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3610  lastKnownGlobalOffset = -1;
3611 }
3612 
3613 template<typename T, typename Traits>
3616 {
3617  initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3618  lastKnownGlobalOffset = -1;
3619 }
3620 
3621 template<typename T, typename Traits>
3622 inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
3623 {
3624  a.swap(b);
3625 }
3626 
3627 inline void swap(ProducerToken& a, ProducerToken& b) MOODYCAMEL_NOEXCEPT
3628 {
3629  a.swap(b);
3630 }
3631 
3632 inline void swap(ConsumerToken& a, ConsumerToken& b) MOODYCAMEL_NOEXCEPT
3633 {
3634  a.swap(b);
3635 }
3636 
3637 template<typename T, typename Traits>
3639 {
3640  a.swap(b);
3641 }
3642 
3643 }
3644 
3645 #if defined(__GNUC__)
3646 #pragma GCC diagnostic pop
3647 #endif
bool try_enqueue(T const &item)
#define MOODYCAMEL_NOEXCEPT
std::uintptr_t thread_id_t
bool set_many_empty(index_t i, size_t count)
void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT
std::atomic< size_t > elementsCompletelyDequeued
details::ConcurrentQueueProducerTypelessBase * producer
T const * operator[](index_t idx) const MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_RETHROW
std::atomic< std::uint32_t > freeListRefs
ConsumerToken & operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
::moodycamel::ConsumerToken consumer_token_t
static T const & nomove(T const &x)
std::array< ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE > initialImplicitProducerHashEntries
static std::uint64_t hash(std::uint64_t h)
std::atomic< BlockIndexHeader * > blockIndex
void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT
size_t dequeue_bulk(It &itemFirst, size_t max)
ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_CATCH(...)
static T const & eval(T const &x)
void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT
std::atomic< details::thread_id_t > key
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
bool inner_enqueue(producer_token_t const &token, U &&element)
void swap(T &lhs, T &rhs)
Definition: pugixml.cpp:7340
static void destroy_array(U *p, size_t count)
std::atomic< ImplicitProducerHash * > implicitProducerHash
ProducerBase * recycle_or_create_producer(bool isExplicit)
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ProducerBase * add_producer(ProducerBase *producer)
bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex)
std::uint32_t lastKnownGlobalOffset
ImplicitProducerKVP & operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
size_t try_dequeue_bulk(It itemFirst, size_t max)
void swap_implicit_producer_hashes(ConcurrentQueue &other)
details::ConcurrentQueueProducerTypelessBase * currentProducer
ConsumerToken(ConcurrentQueue< T, Traits > &q)
bool try_enqueue(producer_token_t const &token, T const &item)
static void swap_relaxed(std::atomic< T > &left, std::atomic< T > &right)
void add_blocks_to_free_list(Block *block)
static thread_id_t thread_id()
bool enqueue(T const &item)
static U * create_array(size_t count)
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
bool new_block_index(size_t numberOfFilledSlotsToExpose)
ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
bool update_current_producer_after_rotation(consumer_token_t &token)
std::atomic< Block * > freeListNext
static thread_id_hash_t prehash(thread_id_t const &x)
std::atomic< std::uint32_t > globalExplicitConsumerOffset
static std::uint32_t hash(std::uint32_t h)
bool inner_enqueue_bulk(It itemFirst, size_t count)
static char * align_for(char *ptr)
static bool circular_less_than(T a, T b)
bool try_enqueue(producer_token_t const &token, T &&item)
bool enqueue_bulk(It itemFirst, size_t count)
bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
void swap(typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &a, typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT
bool enqueue_bulk(It itemFirst, size_t count)
std::atomic< std::uint32_t > nextExplicitConsumerId
static const thread_id_t invalid_thread_id
static bool() unlikely(bool x)
size_t dequeue_bulk(It &itemFirst, size_t max)
static U * create(A1 &&a1)
bool inner_enqueue(U &&element)
std::atomic< std::uint32_t > producerCount
#define MOODYCAMEL_THREADLOCAL
ProducerToken & operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
bool try_dequeue(consumer_token_t &token, U &item)
ConcurrentQueue & swap_internal(ConcurrentQueue &other)
static auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
std::uint32_t itemsConsumedFromCurrent
ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
bool try_dequeue_non_interleaved(U &item)
#define MOODYCAMEL_TRY
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
ProducerToken(ConcurrentQueue< T, Traits > &queue)
T * operator[](index_t idx) MOODYCAMEL_NOEXCEPT
ImplicitProducer * get_or_add_implicit_producer()
size_t get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const
std::atomic_flag implicitProducerHashResizeInProgress
void add_block_to_free_list(Block *block)
ConcurrentQueue & operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
std::atomic< std::uint32_t > freeListRefs
static size_t hash_thread_id(thread_id_t id)
ImplicitProducerHash initialImplicitProducerHash
bool try_enqueue_bulk(It itemFirst, size_t count)
bool enqueue_bulk(It itemFirst, size_t count)
#define MOODYCAMEL_DELETE_FUNCTION
std::atomic< size_t > initialBlockPoolIndex
BlockIndexEntry * get_block_index_entry_for_index(index_t index) const
static T ceil_to_pow_2(T x)
std::atomic< BlockIndexHeader * > blockIndex
std::atomic< size_t > implicitProducerHashCount
ProducerBase * recycle_or_create_producer(bool isExplicit, bool &recycled)
ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
static bool() likely(bool x)
ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
static auto eval(U &&x) -> decltype(std::forward< U >(x))
std::max_align_t std_max_align_t
bool enqueue(producer_token_t const &token, T const &item)
void populate_initial_block_list(size_t blockCount)
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
details::ConcurrentQueueProducerTypelessBase * desiredProducer
bool enqueue(producer_token_t const &token, T &&item)
static const thread_id_t invalid_thread_id2
::moodycamel::ProducerToken producer_token_t
std::atomic< ProducerBase * > producerListTail
ConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
size_t dequeue_bulk(It &itemFirst, size_t max)
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)