mirror of
https://github.com/wolfpld/tracy
synced 2025-04-29 12:23:53 +00:00
Remove dead code from concurrentqueue.
This commit is contained in:
parent
b5590ed197
commit
ca198e44d3
@ -243,7 +243,6 @@ struct ProducerToken;
|
|||||||
struct ConsumerToken;
|
struct ConsumerToken;
|
||||||
|
|
||||||
template<typename T, typename Traits> class ConcurrentQueue;
|
template<typename T, typename Traits> class ConcurrentQueue;
|
||||||
class ConcurrentQueueTests;
|
|
||||||
|
|
||||||
|
|
||||||
namespace details
|
namespace details
|
||||||
@ -413,7 +412,6 @@ struct ProducerToken
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
template<typename T, typename Traits> friend class ConcurrentQueue;
|
template<typename T, typename Traits> friend class ConcurrentQueue;
|
||||||
friend class ConcurrentQueueTests;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
details::ConcurrentQueueProducerTypelessBase* producer;
|
details::ConcurrentQueueProducerTypelessBase* producer;
|
||||||
@ -451,7 +449,6 @@ struct ConsumerToken
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
template<typename T, typename Traits> friend class ConcurrentQueue;
|
template<typename T, typename Traits> friend class ConcurrentQueue;
|
||||||
friend class ConcurrentQueueTests;
|
|
||||||
|
|
||||||
private: // but shared with ConcurrentQueue
|
private: // but shared with ConcurrentQueue
|
||||||
std::uint32_t initialOffset;
|
std::uint32_t initialOffset;
|
||||||
@ -562,231 +559,15 @@ public:
|
|||||||
|
|
||||||
// Disable copying and copy assignment
|
// Disable copying and copy assignment
|
||||||
ConcurrentQueue(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
|
ConcurrentQueue(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
|
||||||
|
ConcurrentQueue(ConcurrentQueue&& other) MOODYCAMEL_DELETE_FUNCTION;
|
||||||
ConcurrentQueue& operator=(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
|
ConcurrentQueue& operator=(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
|
||||||
|
ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_DELETE_FUNCTION;
|
||||||
// Moving is supported, but note that it is *not* a thread-safe operation.
|
|
||||||
// Nobody can use the queue while it's being moved, and the memory effects
|
|
||||||
// of that move must be propagated to other threads before they can use it.
|
|
||||||
// Note: When a queue is moved, its tokens are still valid but can only be
|
|
||||||
// used with the destination queue (i.e. semantically they are moved along
|
|
||||||
// with the queue itself).
|
|
||||||
ConcurrentQueue(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
|
|
||||||
: producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
|
|
||||||
producerCount(other.producerCount.load(std::memory_order_relaxed)),
|
|
||||||
initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
|
|
||||||
initialBlockPool(other.initialBlockPool),
|
|
||||||
initialBlockPoolSize(other.initialBlockPoolSize),
|
|
||||||
freeList(std::move(other.freeList)),
|
|
||||||
nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
|
|
||||||
globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
|
|
||||||
{
|
|
||||||
other.producerListTail.store(nullptr, std::memory_order_relaxed);
|
|
||||||
other.producerCount.store(0, std::memory_order_relaxed);
|
|
||||||
other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
|
|
||||||
other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
|
|
||||||
|
|
||||||
other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
|
|
||||||
other.initialBlockPoolSize = 0;
|
|
||||||
other.initialBlockPool = nullptr;
|
|
||||||
|
|
||||||
reown_producers();
|
|
||||||
}
|
|
||||||
|
|
||||||
inline ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
|
|
||||||
{
|
|
||||||
return swap_internal(other);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Swaps this queue's state with the other's. Not thread-safe.
|
|
||||||
// Swapping two queues does not invalidate their tokens, however
|
|
||||||
// the tokens that were created for one queue must be used with
|
|
||||||
// only the swapped queue (i.e. the tokens are tied to the
|
|
||||||
// queue's movable state, not the object itself).
|
|
||||||
inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
|
|
||||||
{
|
|
||||||
swap_internal(other);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
ConcurrentQueue& swap_internal(ConcurrentQueue& other)
|
|
||||||
{
|
|
||||||
if (this == &other) {
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
details::swap_relaxed(producerListTail, other.producerListTail);
|
|
||||||
details::swap_relaxed(producerCount, other.producerCount);
|
|
||||||
details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
|
|
||||||
std::swap(initialBlockPool, other.initialBlockPool);
|
|
||||||
std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
|
|
||||||
freeList.swap(other.freeList);
|
|
||||||
details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
|
|
||||||
details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
|
|
||||||
|
|
||||||
reown_producers();
|
|
||||||
other.reown_producers();
|
|
||||||
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
// Enqueues a single item (by copying it) using an explicit producer token.
|
|
||||||
// Allocates memory if required. Only fails if memory allocation fails (or
|
|
||||||
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
|
||||||
// Thread-safe.
|
|
||||||
inline bool enqueue(producer_token_t const& token, T const& item)
|
|
||||||
{
|
|
||||||
return inner_enqueue(token, item);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
|
|
||||||
// Allocates memory if required. Only fails if memory allocation fails (or
|
|
||||||
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
|
||||||
// Thread-safe.
|
|
||||||
inline bool enqueue(producer_token_t const& token, T&& item)
|
|
||||||
{
|
|
||||||
return inner_enqueue(token, std::move(item));
|
|
||||||
}
|
|
||||||
|
|
||||||
tracy_force_inline T* enqueue_begin(producer_token_t const& token, index_t& currentTailIndex)
|
tracy_force_inline T* enqueue_begin(producer_token_t const& token, index_t& currentTailIndex)
|
||||||
{
|
{
|
||||||
return inner_enqueue_begin(token, currentTailIndex);
|
return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueues several items using an explicit producer token.
|
|
||||||
// Allocates memory if required. Only fails if memory allocation fails
|
|
||||||
// (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
|
||||||
// Note: Use std::make_move_iterator if the elements should be moved
|
|
||||||
// instead of copied.
|
|
||||||
// Thread-safe.
|
|
||||||
template<typename It>
|
|
||||||
bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
|
|
||||||
{
|
|
||||||
return inner_enqueue_bulk(token, itemFirst, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempts to dequeue from the queue.
|
|
||||||
// Returns false if all producer streams appeared empty at the time they
|
|
||||||
// were checked (so, the queue is likely but not guaranteed to be empty).
|
|
||||||
// Never allocates. Thread-safe.
|
|
||||||
template<typename U>
|
|
||||||
bool try_dequeue(U& item)
|
|
||||||
{
|
|
||||||
// Instead of simply trying each producer in turn (which could cause needless contention on the first
|
|
||||||
// producer), we score them heuristically.
|
|
||||||
size_t nonEmptyCount = 0;
|
|
||||||
ProducerBase* best = nullptr;
|
|
||||||
size_t bestSize = 0;
|
|
||||||
for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
|
|
||||||
auto size = ptr->size_approx();
|
|
||||||
if (size > 0) {
|
|
||||||
if (size > bestSize) {
|
|
||||||
bestSize = size;
|
|
||||||
best = ptr;
|
|
||||||
}
|
|
||||||
++nonEmptyCount;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there was at least one non-empty queue but it appears empty at the time
|
|
||||||
// we try to dequeue from it, we need to make sure every queue's been tried
|
|
||||||
if (nonEmptyCount > 0) {
|
|
||||||
if (details::cqLikely(best->dequeue(item))) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
|
|
||||||
if (ptr != best && ptr->dequeue(item)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempts to dequeue from the queue.
|
|
||||||
// Returns false if all producer streams appeared empty at the time they
|
|
||||||
// were checked (so, the queue is likely but not guaranteed to be empty).
|
|
||||||
// This differs from the try_dequeue(item) method in that this one does
|
|
||||||
// not attempt to reduce contention by interleaving the order that producer
|
|
||||||
// streams are dequeued from. So, using this method can reduce overall throughput
|
|
||||||
// under contention, but will give more predictable results in single-threaded
|
|
||||||
// consumer scenarios. This is mostly only useful for internal unit tests.
|
|
||||||
// Never allocates. Thread-safe.
|
|
||||||
template<typename U>
|
|
||||||
bool try_dequeue_non_interleaved(U& item)
|
|
||||||
{
|
|
||||||
for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
|
|
||||||
if (ptr->dequeue(item)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempts to dequeue from the queue using an explicit consumer token.
|
|
||||||
// Returns false if all producer streams appeared empty at the time they
|
|
||||||
// were checked (so, the queue is likely but not guaranteed to be empty).
|
|
||||||
// Never allocates. Thread-safe.
|
|
||||||
template<typename U>
|
|
||||||
bool try_dequeue(consumer_token_t& token, U& item)
|
|
||||||
{
|
|
||||||
// The idea is roughly as follows:
|
|
||||||
// Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
|
|
||||||
// If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
|
|
||||||
// If there's no items where you're supposed to be, keep moving until you find a producer with some items
|
|
||||||
// If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
|
|
||||||
|
|
||||||
if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
|
|
||||||
if (!update_current_producer_after_rotation(token)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there was at least one non-empty queue but it appears empty at the time
|
|
||||||
// we try to dequeue from it, we need to make sure every queue's been tried
|
|
||||||
if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
|
|
||||||
if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
|
|
||||||
globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto tail = producerListTail.load(std::memory_order_acquire);
|
|
||||||
auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
|
|
||||||
if (ptr == nullptr) {
|
|
||||||
ptr = tail;
|
|
||||||
}
|
|
||||||
while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
|
|
||||||
if (ptr->dequeue(item)) {
|
|
||||||
token.currentProducer = ptr;
|
|
||||||
token.itemsConsumedFromCurrent = 1;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
ptr = ptr->next_prod();
|
|
||||||
if (ptr == nullptr) {
|
|
||||||
ptr = tail;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempts to dequeue several elements from the queue.
|
|
||||||
// Returns the number of items actually dequeued.
|
|
||||||
// Returns 0 if all producer streams appeared empty at the time they
|
|
||||||
// were checked (so, the queue is likely but not guaranteed to be empty).
|
|
||||||
// Never allocates. Thread-safe.
|
|
||||||
template<typename It>
|
|
||||||
size_t try_dequeue_bulk(It itemFirst, size_t max)
|
|
||||||
{
|
|
||||||
size_t count = 0;
|
|
||||||
for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
|
|
||||||
count += ptr->dequeue_bulk(itemFirst, max - count);
|
|
||||||
if (count == max) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempts to dequeue several elements from the queue using an explicit consumer token.
|
// Attempts to dequeue several elements from the queue using an explicit consumer token.
|
||||||
// Returns the number of items actually dequeued.
|
// Returns the number of items actually dequeued.
|
||||||
@ -886,31 +667,6 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempts to dequeue from a specific producer's inner queue.
|
|
||||||
// If you happen to know which producer you want to dequeue from, this
|
|
||||||
// is significantly faster than using the general-case try_dequeue methods.
|
|
||||||
// Returns false if the producer's queue appeared empty at the time it
|
|
||||||
// was checked (so, the queue is likely but not guaranteed to be empty).
|
|
||||||
// Never allocates. Thread-safe.
|
|
||||||
template<typename U>
|
|
||||||
inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
|
|
||||||
{
|
|
||||||
return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempts to dequeue several elements from a specific producer's inner queue.
|
|
||||||
// Returns the number of items actually dequeued.
|
|
||||||
// If you happen to know which producer you want to dequeue from, this
|
|
||||||
// is significantly faster than using the general-case try_dequeue methods.
|
|
||||||
// Returns 0 if the producer's queue appeared empty at the time it
|
|
||||||
// was checked (so, the queue is likely but not guaranteed to be empty).
|
|
||||||
// Never allocates. Thread-safe.
|
|
||||||
template<typename It>
|
|
||||||
inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
|
|
||||||
{
|
|
||||||
return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Returns an estimate of the total number of elements currently in the queue. This
|
// Returns an estimate of the total number of elements currently in the queue. This
|
||||||
// estimate is only accurate if the queue has completely stabilized before it is called
|
// estimate is only accurate if the queue has completely stabilized before it is called
|
||||||
@ -946,31 +702,12 @@ private:
|
|||||||
friend struct ProducerToken;
|
friend struct ProducerToken;
|
||||||
friend struct ConsumerToken;
|
friend struct ConsumerToken;
|
||||||
friend struct ExplicitProducer;
|
friend struct ExplicitProducer;
|
||||||
friend class ConcurrentQueueTests;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////
|
///////////////////////////////
|
||||||
// Queue methods
|
// Queue methods
|
||||||
///////////////////////////////
|
///////////////////////////////
|
||||||
|
|
||||||
template<typename U>
|
|
||||||
inline bool inner_enqueue(producer_token_t const& token, U&& element)
|
|
||||||
{
|
|
||||||
return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue(std::forward<U>(element));
|
|
||||||
}
|
|
||||||
|
|
||||||
tracy_force_inline T* inner_enqueue_begin(producer_token_t const& token, index_t& currentTailIndex)
|
|
||||||
{
|
|
||||||
return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename It>
|
|
||||||
inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
|
|
||||||
{
|
|
||||||
return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_bulk(itemFirst, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
inline bool update_current_producer_after_rotation(consumer_token_t& token)
|
inline bool update_current_producer_after_rotation(consumer_token_t& token)
|
||||||
{
|
{
|
||||||
// Ah, there's been a rotation, figure out where we should be!
|
// Ah, there's been a rotation, figure out where we should be!
|
||||||
@ -1274,12 +1011,6 @@ private:
|
|||||||
|
|
||||||
virtual ~ProducerBase() { };
|
virtual ~ProducerBase() { };
|
||||||
|
|
||||||
template<typename U>
|
|
||||||
inline bool dequeue(U& element)
|
|
||||||
{
|
|
||||||
return static_cast<ExplicitProducer*>(this)->dequeue(element);
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename It>
|
template<typename It>
|
||||||
inline size_t dequeue_bulk(It& itemFirst, size_t max)
|
inline size_t dequeue_bulk(It& itemFirst, size_t max)
|
||||||
{
|
{
|
||||||
@ -1398,106 +1129,6 @@ private:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename U>
|
|
||||||
inline bool enqueue(U&& element)
|
|
||||||
{
|
|
||||||
index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
|
|
||||||
index_t newTailIndex = 1 + currentTailIndex;
|
|
||||||
if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
|
|
||||||
// We reached the end of a block, start a new one
|
|
||||||
auto startBlock = this->tailBlock;
|
|
||||||
auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
|
|
||||||
if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::is_empty()) {
|
|
||||||
// We can re-use the block ahead of us, it's empty!
|
|
||||||
this->tailBlock = this->tailBlock->next;
|
|
||||||
this->tailBlock->ConcurrentQueue::Block::reset_empty();
|
|
||||||
|
|
||||||
// We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
|
|
||||||
// last block from it first -- except instead of removing then adding, we can just overwrite).
|
|
||||||
// Note that there must be a valid block index here, since even if allocation failed in the ctor,
|
|
||||||
// it would have been re-attempted when adding the first block to the queue; since there is such
|
|
||||||
// a block, a block index must have been successfully allocated.
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// Whatever head value we see here is >= the last value we saw here (relatively),
|
|
||||||
// and <= its current value. Since we have the most recent tail, the head must be
|
|
||||||
// <= to it.
|
|
||||||
auto head = this->headIndex.load(std::memory_order_relaxed);
|
|
||||||
assert(!details::circular_less_than<index_t>(currentTailIndex, head));
|
|
||||||
if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
|
|
||||||
|| (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
|
|
||||||
// We can't enqueue in another block because there's not enough leeway -- the
|
|
||||||
// tail could surpass the head by the time the block fills up! (Or we'll exceed
|
|
||||||
// the size limit, if the second part of the condition was true.)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// We're going to need a new block; check that the block index has room
|
|
||||||
if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
|
|
||||||
// Hmm, the circular block index is already full -- we'll need
|
|
||||||
// to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
|
|
||||||
// the initial allocation failed in the constructor.
|
|
||||||
|
|
||||||
if (!new_block_index(pr_blockIndexSlotsUsed)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert a new block in the circular linked list
|
|
||||||
auto newBlock = this->parent->ConcurrentQueue::requisition_block();
|
|
||||||
if (newBlock == nullptr) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
newBlock->ConcurrentQueue::Block::reset_empty();
|
|
||||||
if (this->tailBlock == nullptr) {
|
|
||||||
newBlock->next = newBlock;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
newBlock->next = this->tailBlock->next;
|
|
||||||
this->tailBlock->next = newBlock;
|
|
||||||
}
|
|
||||||
this->tailBlock = newBlock;
|
|
||||||
++pr_blockIndexSlotsUsed;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
|
|
||||||
// The constructor may throw. We want the element not to appear in the queue in
|
|
||||||
// that case (without corrupting the queue):
|
|
||||||
MOODYCAMEL_TRY {
|
|
||||||
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
|
|
||||||
}
|
|
||||||
MOODYCAMEL_CATCH (...) {
|
|
||||||
// Revert change to the current block, but leave the new block available
|
|
||||||
// for next time
|
|
||||||
pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
|
|
||||||
this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
|
|
||||||
MOODYCAMEL_RETHROW;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
(void)startBlock;
|
|
||||||
(void)originalBlockIndexSlotsUsed;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add block to block index
|
|
||||||
auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
|
|
||||||
entry.base = currentTailIndex;
|
|
||||||
entry.block = this->tailBlock;
|
|
||||||
blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
|
|
||||||
pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
|
|
||||||
|
|
||||||
if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
|
|
||||||
this->tailIndex.store(newTailIndex, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue
|
|
||||||
new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
|
|
||||||
|
|
||||||
this->tailIndex.store(newTailIndex, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
inline void enqueue_begin_alloc(index_t currentTailIndex)
|
inline void enqueue_begin_alloc(index_t currentTailIndex)
|
||||||
{
|
{
|
||||||
// We reached the end of a block, start a new one
|
// We reached the end of a block, start a new one
|
||||||
@ -1556,287 +1187,6 @@ private:
|
|||||||
{
|
{
|
||||||
return this->tailIndex;
|
return this->tailIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename U>
|
|
||||||
bool dequeue(U& element)
|
|
||||||
{
|
|
||||||
auto tail = this->tailIndex.load(std::memory_order_relaxed);
|
|
||||||
auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
|
|
||||||
if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
|
|
||||||
// Might be something to dequeue, let's give it a try
|
|
||||||
|
|
||||||
// Note that this if is purely for performance purposes in the common case when the queue is
|
|
||||||
// empty and the values are eventually consistent -- we may enter here spuriously.
|
|
||||||
|
|
||||||
// Note that whatever the values of overcommit and tail are, they are not going to change (unless we
|
|
||||||
// change them) and must be the same value at this point (inside the if) as when the if condition was
|
|
||||||
// evaluated.
|
|
||||||
|
|
||||||
// We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
|
|
||||||
// This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
|
|
||||||
// the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
|
|
||||||
// Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
|
|
||||||
// read-modify-write operations are guaranteed to work on the latest value in the modification order), but
|
|
||||||
// unfortunately that can't be shown to be correct using only the C++11 standard.
|
|
||||||
// See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
|
|
||||||
std::atomic_thread_fence(std::memory_order_acquire);
|
|
||||||
|
|
||||||
// Increment optimistic counter, then check if it went over the boundary
|
|
||||||
auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
|
|
||||||
|
|
||||||
// Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
|
|
||||||
// incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
|
|
||||||
// have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
|
|
||||||
// incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
|
|
||||||
assert(overcommit <= myDequeueCount);
|
|
||||||
|
|
||||||
// Note that we reload tail here in case it changed; it will be the same value as before or greater, since
|
|
||||||
// this load is sequenced after (happens after) the earlier load above. This is supported by read-read
|
|
||||||
// coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
|
|
||||||
tail = this->tailIndex.load(std::memory_order_acquire);
|
|
||||||
if (details::cqLikely(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
|
|
||||||
// Guaranteed to be at least one element to dequeue!
|
|
||||||
|
|
||||||
// Get the index. Note that since there's guaranteed to be at least one element, this
|
|
||||||
// will never exceed tail. We need to do an acquire-release fence here since it's possible
|
|
||||||
// that whatever condition got us to this point was for an earlier enqueued element (that
|
|
||||||
// we already see the memory effects for), but that by the time we increment somebody else
|
|
||||||
// has incremented it, and we need to see the memory effects for *that* element, which is
|
|
||||||
// in such a case is necessarily visible on the thread that incremented it in the first
|
|
||||||
// place with the more current condition (they must have acquired a tail that is at least
|
|
||||||
// as recent).
|
|
||||||
auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
|
|
||||||
|
|
||||||
|
|
||||||
// Determine which block the element is in
|
|
||||||
|
|
||||||
auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
|
|
||||||
auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
|
|
||||||
|
|
||||||
// We need to be careful here about subtracting and dividing because of index wrap-around.
|
|
||||||
// When an index wraps, we need to preserve the sign of the offset when dividing it by the
|
|
||||||
// block size (in order to get a correct signed block count offset in all cases):
|
|
||||||
auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
|
|
||||||
auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
|
|
||||||
auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / BLOCK_SIZE);
|
|
||||||
auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
|
|
||||||
|
|
||||||
// Dequeue
|
|
||||||
auto& el = *((*block)[index]);
|
|
||||||
if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
|
|
||||||
// Make sure the element is still fully dequeued and destroyed even if the assignment
|
|
||||||
// throws
|
|
||||||
struct Guard {
|
|
||||||
Block* block;
|
|
||||||
index_t index;
|
|
||||||
|
|
||||||
~Guard()
|
|
||||||
{
|
|
||||||
(*block)[index]->~T();
|
|
||||||
block->ConcurrentQueue::Block::set_empty(index);
|
|
||||||
}
|
|
||||||
} guard = { block, index };
|
|
||||||
|
|
||||||
element = std::move(el);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
element = std::move(el);
|
|
||||||
el.~T();
|
|
||||||
block->ConcurrentQueue::Block::set_empty(index);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
|
|
||||||
this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename It>
|
|
||||||
bool enqueue_bulk(It itemFirst, size_t count)
|
|
||||||
{
|
|
||||||
// First, we need to make sure we have enough room to enqueue all of the elements;
|
|
||||||
// this means pre-allocating blocks and putting them in the block index (but only if
|
|
||||||
// all the allocations succeeded).
|
|
||||||
index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
|
|
||||||
auto startBlock = this->tailBlock;
|
|
||||||
auto originalBlockIndexFront = pr_blockIndexFront;
|
|
||||||
auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
|
|
||||||
|
|
||||||
Block* firstAllocatedBlock = nullptr;
|
|
||||||
|
|
||||||
// Figure out how many blocks we'll need to allocate, and do so
|
|
||||||
size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
|
|
||||||
index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
|
|
||||||
if (blockBaseDiff > 0) {
|
|
||||||
// Allocate as many blocks as possible from ahead
|
|
||||||
while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::is_empty()) {
|
|
||||||
blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
|
|
||||||
currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
|
|
||||||
|
|
||||||
this->tailBlock = this->tailBlock->next;
|
|
||||||
firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
|
|
||||||
|
|
||||||
auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
|
|
||||||
entry.base = currentTailIndex;
|
|
||||||
entry.block = this->tailBlock;
|
|
||||||
pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now allocate as many blocks as necessary from the block pool
|
|
||||||
while (blockBaseDiff > 0) {
|
|
||||||
blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
|
|
||||||
currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
|
|
||||||
|
|
||||||
auto head = this->headIndex.load(std::memory_order_relaxed);
|
|
||||||
assert(!details::circular_less_than<index_t>(currentTailIndex, head));
|
|
||||||
bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
|
|
||||||
if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
|
|
||||||
if (full || !new_block_index(originalBlockIndexSlotsUsed)) {
|
|
||||||
// Failed to allocate, undo changes (but keep injected blocks)
|
|
||||||
pr_blockIndexFront = originalBlockIndexFront;
|
|
||||||
pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
|
|
||||||
this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// pr_blockIndexFront is updated inside new_block_index, so we need to
|
|
||||||
// update our fallback value too (since we keep the new index even if we
|
|
||||||
// later fail)
|
|
||||||
originalBlockIndexFront = originalBlockIndexSlotsUsed;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert a new block in the circular linked list
|
|
||||||
auto newBlock = this->parent->ConcurrentQueue::requisition_block();
|
|
||||||
if (newBlock == nullptr) {
|
|
||||||
pr_blockIndexFront = originalBlockIndexFront;
|
|
||||||
pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
|
|
||||||
this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
newBlock->ConcurrentQueue::Block::set_all_empty();
|
|
||||||
if (this->tailBlock == nullptr) {
|
|
||||||
newBlock->next = newBlock;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
newBlock->next = this->tailBlock->next;
|
|
||||||
this->tailBlock->next = newBlock;
|
|
||||||
}
|
|
||||||
this->tailBlock = newBlock;
|
|
||||||
firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
|
|
||||||
|
|
||||||
++pr_blockIndexSlotsUsed;
|
|
||||||
|
|
||||||
auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
|
|
||||||
entry.base = currentTailIndex;
|
|
||||||
entry.block = this->tailBlock;
|
|
||||||
pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
|
|
||||||
// publish the new block index front
|
|
||||||
auto block = firstAllocatedBlock;
|
|
||||||
while (true) {
|
|
||||||
block->ConcurrentQueue::Block::reset_empty();
|
|
||||||
if (block == this->tailBlock) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
block = block->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
|
|
||||||
blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue, one block at a time
|
|
||||||
index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
|
|
||||||
currentTailIndex = startTailIndex;
|
|
||||||
auto endBlock = this->tailBlock;
|
|
||||||
this->tailBlock = startBlock;
|
|
||||||
assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
|
|
||||||
if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
|
|
||||||
this->tailBlock = firstAllocatedBlock;
|
|
||||||
}
|
|
||||||
while (true) {
|
|
||||||
auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
|
|
||||||
if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
|
|
||||||
stopIndex = newTailIndex;
|
|
||||||
}
|
|
||||||
if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
|
|
||||||
while (currentTailIndex != stopIndex) {
|
|
||||||
new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
MOODYCAMEL_TRY {
|
|
||||||
while (currentTailIndex != stopIndex) {
|
|
||||||
// Must use copy constructor even if move constructor is available
|
|
||||||
// because we may have to revert if there's an exception.
|
|
||||||
// Sorry about the horrible templated next line, but it was the only way
|
|
||||||
// to disable moving *at compile time*, which is important because a type
|
|
||||||
// may only define a (noexcept) move constructor, and so calls to the
|
|
||||||
// cctor will not compile, even if they are in an if branch that will never
|
|
||||||
// be executed
|
|
||||||
new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
|
|
||||||
++currentTailIndex;
|
|
||||||
++itemFirst;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MOODYCAMEL_CATCH (...) {
|
|
||||||
// Oh dear, an exception's been thrown -- destroy the elements that
|
|
||||||
// were enqueued so far and revert the entire bulk operation (we'll keep
|
|
||||||
// any allocated blocks in our linked list for later, though).
|
|
||||||
auto constructedStopIndex = currentTailIndex;
|
|
||||||
auto lastBlockEnqueued = this->tailBlock;
|
|
||||||
|
|
||||||
pr_blockIndexFront = originalBlockIndexFront;
|
|
||||||
pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
|
|
||||||
this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
|
|
||||||
|
|
||||||
if (!details::is_trivially_destructible<T>::value) {
|
|
||||||
auto block = startBlock;
|
|
||||||
if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
|
|
||||||
block = firstAllocatedBlock;
|
|
||||||
}
|
|
||||||
currentTailIndex = startTailIndex;
|
|
||||||
while (true) {
|
|
||||||
stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
|
|
||||||
if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
|
|
||||||
stopIndex = constructedStopIndex;
|
|
||||||
}
|
|
||||||
while (currentTailIndex != stopIndex) {
|
|
||||||
(*block)[currentTailIndex++]->~T();
|
|
||||||
}
|
|
||||||
if (block == lastBlockEnqueued) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
block = block->next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MOODYCAMEL_RETHROW;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this->tailBlock == endBlock) {
|
|
||||||
assert(currentTailIndex == newTailIndex);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
this->tailBlock = this->tailBlock->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst))) && firstAllocatedBlock != nullptr) {
|
|
||||||
blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
|
|
||||||
}
|
|
||||||
|
|
||||||
this->tailIndex.store(newTailIndex, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename It>
|
template<typename It>
|
||||||
size_t dequeue_bulk(It& itemFirst, size_t max)
|
size_t dequeue_bulk(It& itemFirst, size_t max)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user