3#include "gaia/config/config.h"
8#include "gaia/cnt/sarray.h"
9#include "gaia/config/profiler.h"
10#include "gaia/core/utility.h"
11#include "gaia/mt/jobhandle.h"
15GAIA_MSVC_WARNING_PUSH()
16GAIA_MSVC_WARNING_DISABLE(4324)
23 template <const u
int32_t N = 1 << 12>
25 static_assert(N >= 2);
26 static_assert((N & (N - 1)) == 0, "Extent of JobQueue must be a power of 2");
27 static constexpr u
int32_t MASK = N - 1;
29 static_assert(sizeof(std::atomic_u
int32_t) == sizeof(JobHandle));
30 cnt::sarray<std::atomic_u
int32_t, N> m_buffer;
31 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_u
int32_t m_bottom;
32 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_u
int32_t m_top;
39 ~JobQueue() = default;
40 JobQueue(const JobQueue&) = default;
41 JobQueue& operator=(const JobQueue&) = default;
42 JobQueue(JobQueue&&) noexcept = default;
43 JobQueue& operator=(JobQueue&&) noexcept = default;
48 for (auto& val: m_buffer)
49 val.store(((JobHandle)JobNull_t()).value());
55 GAIA_PROF_SCOPE(JobQueue::empty);
57 const u
int32_t b = m_bottom.load(std::memory_order_relaxed);
58 const u
int32_t t = m_top.load(std::memory_order_relaxed);
59 return
int32_t(b - t) <= 0;
64 GAIA_NODISCARD
bool try_push(JobHandle jobHandle) {
65 GAIA_PROF_SCOPE(JobQueue::try_push);
67 const u
int32_t b = m_bottom.load(std::memory_order_relaxed);
68 const u
int32_t t = m_top.load(std::memory_order_acquire);
69 const u
int32_t used = b - t;
73 m_buffer[b & MASK].store(jobHandle.value(), std::memory_order_relaxed);
75 std::atomic_thread_fence(std::memory_order_release);
76 m_bottom.store(b + 1, std::memory_order_relaxed);
83 GAIA_NODISCARD u
int32_t try_push(std::span<JobHandle> jobHandles) {
84 GAIA_PROF_SCOPE(JobQueue::try_push);
86 const u
int32_t cnt = (u
int32_t)jobHandles.size();
87 u
int32_t b = m_bottom.load(std::memory_order_relaxed);
88 const u
int32_t t = m_top.load(std::memory_order_acquire);
89 const u
int32_t used = b - t;
90 const u
int32_t free = (MASK + 1) - used;
91 const u
int32_t freeFinal = core::get_min(cnt, free);
93 for (u
int32_t i = 0; i < freeFinal; i++, b++)
94 m_buffer[b & MASK].store(jobHandles[i].value(), std::memory_order_relaxed);
96 std::atomic_thread_fence(std::memory_order_release);
97 m_bottom.store(b, std::memory_order_relaxed);
104 GAIA_NODISCARD
bool try_pop(JobHandle& jobHandle) {
105 GAIA_PROF_SCOPE(JobQueue::try_pop);
107 u
int32_t jobHandleValue = ((JobHandle)JobNull_t{}).value();
109 const u
int32_t b = m_bottom.load(std::memory_order_relaxed) - 1;
110 m_bottom.store(b, std::memory_order_relaxed);
111 std::atomic_thread_fence(std::memory_order_seq_cst);
112 u
int32_t t = m_top.load(std::memory_order_relaxed);
114 if (
int(t - b) <= 0) {
116 jobHandleValue = m_buffer[b & MASK].load(std::memory_order_relaxed);
121 m_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
122 m_bottom.store(b + 1, std::memory_order_relaxed);
123 jobHandle = JobHandle(jobHandleValue);
124 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
128 jobHandle = JobHandle(jobHandleValue);
129 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
134 m_bottom.store(b + 1, std::memory_order_relaxed);
140 GAIA_NODISCARD
bool try_steal(JobHandle& jobHandle) {
141 GAIA_PROF_SCOPE(JobQueue::try_steal);
143 u
int32_t t = m_top.load(std::memory_order_acquire);
144 std::atomic_thread_fence(std::memory_order_seq_cst);
145 const u
int32_t b = m_bottom.load(std::memory_order_acquire);
147 if (
int(b - t) <= 0) {
148 jobHandle = (JobHandle)JobNull_t{};
152 const u
int32_t jobHandleValue = m_buffer[t & MASK].load(std::memory_order_relaxed);
155 const
bool ret = m_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
156 jobHandle = JobHandle(jobHandleValue);
157 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
164 template <
class T, const u
int32_t N = 1 << 12>
166 static_assert(N >= 2);
167 static_assert((N & (N - 1)) == 0, "Extent of MpmcQueue must be a power of 2");
168 static constexpr u
int32_t MASK = N - 1;
171 std::atomic_u
int32_t sequence{};
174 using view_policy = mem::data_view_policy_aos<Node>;
176 static constexpr u
int32_t extent = N;
177 static constexpr u
int32_t allocated_
bytes = view_policy::get_min_
byte_size(0, N);
181 GAIA_MSVC_WARNING_PUSH()
182 GAIA_MSVC_WARNING_DISABLE(4324)
184 mem::raw_data_holder<Node, allocated_
bytes> m_data;
185 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_u
int32_t m_pushPos;
186 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_u
int32_t m_popPos;
188 GAIA_MSVC_WARNING_POP()
198 MpmcQueue(MpmcQueue&&) = delete;
199 MpmcQueue(const MpmcQueue&) = delete;
200 MpmcQueue& operator=(MpmcQueue&&) = delete;
201 MpmcQueue& operator=(const MpmcQueue&) = delete;
204 GAIA_NODISCARD constexpr Node* data() noexcept {
205 return GAIA_ACC((Node*)&m_data[0]);
208 GAIA_NODISCARD constexpr const Node* data() const noexcept {
209 return GAIA_ACC((const Node*)&m_data[0]);
213 Node* pNodes = data();
216 Node* pNode = &pNodes[i];
217 core::call_ctor(&pNode->sequence, i);
220 m_pushPos.store(0, std::memory_order_relaxed);
221 m_popPos.store(0, std::memory_order_relaxed);
225 Node* pNodes = data();
227 uint32_t enqPos = m_pushPos.load(std::memory_order_relaxed);
228 uint32_t deqPos = m_popPos.load(std::memory_order_relaxed);
229 for (uint32_t pos = deqPos; pos != enqPos; ++pos) {
230 Node* pNode = &pNodes[pos & MASK];
231 if (pNode->sequence.load(std::memory_order_relaxed) == pos + 1)
236 Node* pNode = &pNodes[i];
237 core::call_dtor(&pNode->sequence);
245 GAIA_PROF_SCOPE(MpmcQueue::empty);
247 const uint32_t pos = m_popPos.load(std::memory_order_relaxed);
248 const auto* pNode = &data()[pos & MASK];
249 const uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
256 template <
typename TT>
257 bool try_push(TT&& item) {
258 GAIA_PROF_SCOPE(MpmcQueue::try_push);
260 Node* pNodes = data();
262 Node* pNode =
nullptr;
263 uint32_t pos = m_pushPos.load(std::memory_order_relaxed);
265 pNode = &pNodes[pos & MASK];
266 uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
267 int32_t diff = int32_t(seq) - int32_t(pos);
269 if (m_pushPos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
271 }
else if (diff < 0) {
275 pos = m_pushPos.load(std::memory_order_relaxed);
279 core::call_ctor(&pNode->item, GAIA_FWD(item));
280 pNode->sequence.store(pos + 1, std::memory_order_release);
287 bool try_pop(T& item) {
288 GAIA_PROF_SCOPE(MpmcQueue::try_pop);
290 Node* pNodes = data();
292 Node* pNode =
nullptr;
293 uint32_t pos = m_popPos.load(std::memory_order_relaxed);
295 pNode = &pNodes[pos & MASK];
296 uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
297 int32_t diff = int32_t(seq) - int32_t(pos + 1);
299 if (m_popPos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
301 }
else if (diff < 0) {
305 pos = m_popPos.load(std::memory_order_relaxed);
309 item = GAIA_MOV(pNode->item);
310 core::call_dtor(&pNode->item);
311 pNode->sequence.store(pos + MASK + 1, std::memory_order_release);
318GAIA_MSVC_WARNING_POP()
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9