I wanted to try using atomics and implement a lock-free multiple-producer, single-consumer (MPSC) queue.
From what I understood about memory barriers, reads happen after acquire barrier and writes happen before release barrier i.e. those can't be reordered by the compiler. Relaxed memory barrier means the operation is not dependent on other operations. So, I have doubts about empty()/front() implementation, I have used relaxed barrier for now. Should I use acquire barrier instead?
Also, in push(), the else block runs a while loop. In the while loop, next.store() is called. Is there any chance it will mess up the pointers if compare_exchange_strong fails?
#include <atomic>
#include <cassert>
#include <iostream>
#include <thread>
#include <vector>
template <typename T>
struct mpsc_queue_node {
T value;
std::atomic<mpsc_queue_node<T>*> next;
};
template <typename T>
class mpsc_queue {
std::atomic<mpsc_queue_node<T>*> m_head{nullptr};
std::atomic<mpsc_queue_node<T>*> m_tail{nullptr};
public:
mpsc_queue() = default;
~mpsc_queue() {
while (!empty()) {
pop();
}
}
mpsc_queue(const mpsc_queue& other) = delete;
mpsc_queue(mpsc_queue&& other) = delete;
mpsc_queue& operator=(const mpsc_queue& other) = delete;
mpsc_queue& operator=(mpsc_queue&& other) = delete;
void push(T&& value) {
mpsc_queue_node<T>* node =
new mpsc_queue_node<T>{std::move(value), nullptr};
if (!node) {
throw std::bad_alloc{};
}
mpsc_queue_node<T>* expected_empty_value = nullptr;
auto tail_ptr = m_tail.load(std::memory_order_acquire);
if (tail_ptr == nullptr) {
if (m_tail.compare_exchange_strong(expected_empty_value, node,
std::memory_order_release,
std::memory_order_relaxed)) {
m_head.compare_exchange_strong(expected_empty_value, node,
std::memory_order_release,
std::memory_order_relaxed);
}
} else {
while (true) {
auto tail_ptr = m_tail.load(std::memory_order_acquire);
tail_ptr->next.store(node, std::memory_order_release);
if (m_tail.compare_exchange_strong(tail_ptr, node,
std::memory_order_release,
std::memory_order_relaxed)) {
break;
}
}
}
}
void pop() {
auto head_ptr = m_head.load(std::memory_order_acquire);
if (head_ptr == nullptr) {
return;
}
auto tail_ptr = m_tail.load(std::memory_order_acquire);
if (head_ptr == tail_ptr) {
if (m_tail.compare_exchange_strong(tail_ptr, nullptr,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
m_head.store(nullptr, std::memory_order_release);
delete head_ptr;
return;
}
}
mpsc_queue_node<T>* head_next_ptr = nullptr;
// tail_ptr may have been updated by a new producer, but head->next may be still
// nullptr, wait for it to get updated
while ((head_next_ptr = head_ptr->next.load(
std::memory_order_acquire)) == nullptr) {
; // run until the next ptr gets updated
}
m_head.store(head_next_ptr, std::memory_order_release);
delete head_ptr;
}
bool empty() const { return front() == nullptr; }
mpsc_queue_node<T>* front() const {
return m_head.load(std::memory_order_relaxed);
}
};
int main() {
mpsc_queue<int> q;
int i = 0;
auto push_fn = [&q](int i) {
std::cout << "Pushing " << i << '\n';
q.push(std::move(i));
};
std::vector<std::thread> threads(8);
for (auto& t : threads) {
t = std::thread(push_fn, i);
i++;
}
while (!q.empty())
{
std::cout << "Popping " << q.front()->value << '\n';
q.pop();
}
for (auto& t : threads) {
t.join();
}
}
What can be improved?
EDIT: I made some changes based on the reviews. Please check here. Thank you all :-)
Please check the continuation of this post here.
m_head.loadinfront()definitely needs to beacquireat minimum. That silences the sanitizer. \$\endgroup\$