8
\$\begingroup\$

I wanted to try using atomics and implement a lock-free multiple-producer, single-consumer (MPSC) queue.

From what I understood about memory barriers, reads happen after acquire barrier and writes happen before release barrier i.e. those can't be reordered by the compiler. Relaxed memory barrier means the operation is not dependent on other operations. So, I have doubts about empty()/front() implementation, I have used relaxed barrier for now. Should I use acquire barrier instead?

Also, in push(), the else block runs a while loop. In the while loop, next.store() is called. Is there any chance it will mess up the pointers if compare_exchange_strong fails?

#include <atomic>
#include <cassert>
#include <iostream>
#include <thread>
#include <vector>

template <typename T>
struct mpsc_queue_node {
    T value;
    std::atomic<mpsc_queue_node<T>*> next;
};

template <typename T>
class mpsc_queue {
    std::atomic<mpsc_queue_node<T>*> m_head{nullptr};
    std::atomic<mpsc_queue_node<T>*> m_tail{nullptr};

   public:
    mpsc_queue() = default;
    ~mpsc_queue() {
        while (!empty()) {
            pop();
        }
    }

    mpsc_queue(const mpsc_queue& other) = delete;
    mpsc_queue(mpsc_queue&& other) = delete;
    mpsc_queue& operator=(const mpsc_queue& other) = delete;
    mpsc_queue& operator=(mpsc_queue&& other) = delete;

    void push(T&& value) {
        mpsc_queue_node<T>* node =
            new mpsc_queue_node<T>{std::move(value), nullptr};
        if (!node) {
            throw std::bad_alloc{};
        }
        mpsc_queue_node<T>* expected_empty_value = nullptr;
        auto tail_ptr = m_tail.load(std::memory_order_acquire);
        if (tail_ptr == nullptr) {
            if (m_tail.compare_exchange_strong(expected_empty_value, node,
                                               std::memory_order_release,
                                               std::memory_order_relaxed)) {
                m_head.compare_exchange_strong(expected_empty_value, node,
                                               std::memory_order_release,
                                               std::memory_order_relaxed);
            }
        } else {
            while (true) {
                auto tail_ptr = m_tail.load(std::memory_order_acquire);
                tail_ptr->next.store(node, std::memory_order_release);
                if (m_tail.compare_exchange_strong(tail_ptr, node,
                                                   std::memory_order_release,
                                                   std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    void pop() {
        auto head_ptr = m_head.load(std::memory_order_acquire);
        if (head_ptr == nullptr) {
            return;
        }
        auto tail_ptr = m_tail.load(std::memory_order_acquire);
        if (head_ptr == tail_ptr) {
            if (m_tail.compare_exchange_strong(tail_ptr, nullptr,
                                               std::memory_order_acq_rel,
                                               std::memory_order_relaxed)) {
                m_head.store(nullptr, std::memory_order_release);
                delete head_ptr;
                return;
            }
        }
        mpsc_queue_node<T>* head_next_ptr = nullptr;
        // tail_ptr may have been updated by a new producer, but head->next may be still
        // nullptr, wait for it to get updated
        while ((head_next_ptr = head_ptr->next.load(
                    std::memory_order_acquire)) == nullptr) {
            ;  // run until the next ptr gets updated
        }
        m_head.store(head_next_ptr, std::memory_order_release);
        delete head_ptr;
    }

    bool empty() const { return front() == nullptr; }

    mpsc_queue_node<T>* front() const {
        return m_head.load(std::memory_order_relaxed);
    }
};

int main() {
    mpsc_queue<int> q;

    int i = 0;
    auto push_fn = [&q](int i) {
        std::cout << "Pushing " << i << '\n';
        q.push(std::move(i));
    };

    std::vector<std::thread> threads(8);

    for (auto& t : threads) {
        t = std::thread(push_fn, i);
        i++;
    }
    while (!q.empty())
    {
        std::cout << "Popping " << q.front()->value << '\n';
        q.pop();
    }
    for (auto& t : threads) {
        t.join();
    }
}

What can be improved?

EDIT: I made some changes based on the reviews. Please check here. Thank you all :-)

Please check the continuation of this post here.

\$\endgroup\$
4
  • 2
    \$\begingroup\$ The first thing I'm worried about when seeing a linked-list queue is the ABA problem potentially letting a CAS succeed after freeing and reallocating the same block of memory. I don't see any comments mention it; are you sure it's a non-problem? (I don't have time at the moment to write a proper answer; I might have another look tomorrow.) \$\endgroup\$ Commented yesterday
  • \$\begingroup\$ @PeterCordes, I just read about ABA. I am honestly not sure if it can occur anywhere in code. I am looking at the updated code btw (godbolt link is in question). \$\endgroup\$ Commented yesterday
  • 2
    \$\begingroup\$ ThreadSanitizer reports a data race. Oh, m_head.load in front() definitely needs to be acquire at minimum. That silences the sanitizer. \$\endgroup\$ Commented 22 hours ago
  • \$\begingroup\$ @NateEldredge, got it, thanks :-) \$\endgroup\$ Commented 22 hours ago

2 Answers 2

9
\$\begingroup\$

Remove empty()

A big issue with thread-safe data structures is that they are going to be accessed by multiple threads. You also cannot assume in which order and what times threads will actually run. And while an operation might look atomic, the problem is that when you combine multiple atomic operations, the result is no longer atomic.

While empty() itself atomically checks whether the queue is empty, the answer it returns might no longer be true when the caller checks it. That matters even in a single-consumer queue, where there is only one thread calling empty(). Consider your main(): first you are starting 8 threads, and then you are checking if the queue is empty. You are assuming that the first time you call q.empty(), some of the producer threads have already got a time slice and pushed an item. But that's not necessarily the case; the operating system is free to delay starting those producer threads until after your main thread calls q.empty().

Note that even adding a while (q.empty()) {} right after starting the producer threads is not enough to be safe: your consumer thread might outpace the 8 producer threads at any time.

You should remove the member function empty(), because you just cannot trust the return value. You have to find some other way to signal that all your producers are done producing.

Consider making pop() return a std::optional<T>

Your push() moves a value into a new mpsc_queue_node. Since you are already paying that overhead, you could have pop() move the value from the node into the return value. And to handle the case of an empty queue, wrap this in a std::optional. That way, your while-loop in main() could be rewritten to:

while (auto value = q.pop()) {
    std::cout << "Popping " << *value << '\n';
}

It can look like:

std::optional<T> pop() {
    auto head_ptr = m_head.load(std::memory_order_acquire);
    if (head_ptr == nullptr) {
        return {}; // empty optional
    }
    …
    std::optional<T> value = std::move(head_ptr->value);
    delete head_ptr;
    return value;
}
\$\endgroup\$
5
  • \$\begingroup\$ Why return a std::optional instead of just a pointer? front() currently returns a pointer which is nullptr in the empty case, and that would work for your while(auto val = q.pop()) loop since null pointers are false. (Agreed that it's weird that pop doesn't return the element it popped, though, requiring the caller to call front() separately!) I'd normally expect pop() to wait for an element in the empty case (spin-wait and/or .wait/.notify), but the OP's is already a try_pop() that returns immediately without doing anything if the queue is empty. \$\endgroup\$ Commented yesterday
  • 3
    \$\begingroup\$ Oh right, in the current design the caller has to do front()->value since they get a pointer to a queue node and have to deal with its internals, not a pointer to a value directly! Yeah, move to a return value could avoid that, but then you lose the in-band signalling for empty, which is needed for a try_pop() function like this. Or maybe just return a T* which is either null or a pointer to ->value. Like return p ? &(p->value) : nullptr... or no, that doesn't solve the deallocation problem. I can see now how they got to this front() then pop() awkward API design. \$\endgroup\$ Commented yesterday
  • \$\begingroup\$ @PeterCordes front() and pop() makes sense in the non-multi-threaded case: you avoid having to move at all, so it's very efficient. This is why the standard library uses it. But in all other regards having pop() return the value is just better, both for multi-threaded code where you want to atomically pop a value, and just because you can write much more readable code with it. \$\endgroup\$ Commented yesterday
  • 1
    \$\begingroup\$ front and pop_back both make sense to exist separately for containers like std::vector. For something that's only ever intended to be used as a queue, it's less nice. But yeah, good point it's the only way to avoid copying in the abstract machine. Anything else would require the compiler to optimize away temporaries and just read the T value; member of the node struct. (For T=int it can obviously just load it into a register. For T=int[100], not copying matters. For T=std::string and similar, move semantics help since any large data is only pointed-to, not held directly.) \$\endgroup\$ Commented yesterday
  • \$\begingroup\$ @PeterCordes, yes front() shouldn't have returned a mpsc_queue_node<T>*, it should have returned the actual front value. I fixed that in the updated code, please check link in the question. \$\endgroup\$ Commented yesterday
8
\$\begingroup\$

First impressions - it looks generally good, and compiles cleanly with a comprehensive set of warnings - nice.


Minor - given that we declare copy construction and assignment, the move equivalents are automatically deleted, so no need to declare those.


push() should probably take its argument by value, so we can copy lvalues into the queue without having to manually invoke T's copy constructor. That would be in line with user expectations.


This test is unnecessary:

        mpsc_queue_node<T>* node =
            new mpsc_queue_node<T>{std::move(value), nullptr};
        if (!node) {
            throw std::bad_alloc{};
        }

The new operator cannot return a null pointer, and will throw std::bad_alloc itself.


I'm concerned about the special case in push() to an empty queue. If two threads arrive there, I don't see how we get the "loser" to re-try.

To show this, let's make each thread push to many queues, then check whether they all have the expected content:

#include <cstdlib>
#include <print>
int main()
{
    std::vector<mpsc_queue<int>> queues(10000);

    auto push_fn = [&](int i) {
        for (auto& q: queues) {
            q.push(i);
        }
    };

    std::vector<std::thread> threads(8);

    int i = 0;
    for (auto& t : threads) {
        t = std::thread(push_fn, i++);
    }
    for (auto& t : threads) {
        t.join();
    }
    unsigned failures = 0;
    for (auto& q: queues) {
        unsigned mask = 0;
        while (!q.empty()) {
            mask |= ( 1u << q.front()->value);
            q.pop();
        }
        if (mask != 0xff) {
            std::println(std::cerr, "{:08b}", mask);
            ++failures;
        }
    }

    if (failures) {
            std::println(std::cerr, "Got {} failure(s) from {} tests.",
                         failures, queues.size());
        return EXIT_FAILURE;
    }
}

Sample output:

10111111
10111111
10111111
10111111
01111111
11101111
11101111
11011111
01111111
11011111
Got 10 failure(s) from 10000 tests.

There's also something wrong when we push from a single thread:

#include <algorithm>
#include <array>
#include <cassert>
#include <cstdlib>
#include <thread>

int main()
{
    std::array<char, 1000000> a = {};
    mpsc_queue<char*> q;

    auto push_fn = [&]() {
        for (auto& c: a) {
            q.push(&c);
        }
    };

    auto t = std::thread(push_fn);

    for (auto i = 0u;  i < a.size();  ++i) {
        while (q.empty())
            ;
        *q.front()->value = 1;
        q.pop();
    }

    t.join();
    assert(q.empty());
    assert(std::ranges::all_of(a, std::identity{}));
}

Sometimes this segfaults from the while loop in push() - evidently m_tail becomes null while it's executing.

Some other times, the pop() loop doesn't complete - I haven't determined whether not all values are successfully pushed or whether they are not all retrieved (or both).


In the test program, we should show the need to continue fetching from the queue after joining all the threads (if we don't do that, we destroy the queue with elements still in it).

    for (auto& t : threads) {
        t.join();
    }
    while (!q.empty()) {
        std::cout << "Popping " << q.front()->value << '\n';
        q.pop();
    }

Also in the test program, there's a possibility of the threads becoming serialised due to hidden locks on std::cout (but I do get interrupted output lines between the string and the integer and the newline, so there's hope that's not interfering too much).


Missing functionality:

  • We could do with a function for the consumer to wait for an item to become available. Polling front() is inefficient.

  • It would be handy to have an emplace() function, for types that can't be moved.

\$\endgroup\$
4
  • 1
    \$\begingroup\$ Great point about the test program. With line-buffered cout on a terminal, each thread probably spends most of its time inside write system calls, so pops that are actually concurrent with a push might be a lot rarer than with higher-throughput testing conditions. (Especially with the push threads also printing between each element.) \$\endgroup\$ Commented yesterday
  • \$\begingroup\$ I fixed the push logic (I think). Also, I made some changes to main() function. Please check the updated code in the link I added to the question. \$\endgroup\$ Commented yesterday
  • 1
    \$\begingroup\$ I'm guessing you don't want a review of the updated code. If you do, you can post it as a new question. Provide a link to this one to help with the context. \$\endgroup\$ Commented yesterday
  • 1
    \$\begingroup\$ @TobySpeight, posted a new question here. \$\endgroup\$ Commented 22 hours ago

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.