3#include "gaia/config/config.h"
7#include "gaia/cnt/sarray.h"
8#include "gaia/config/profiler.h"
9#include "gaia/core/utility.h"
10#include "gaia/mt/jobhandle.h"
14GAIA_MSVC_WARNING_PUSH()
15GAIA_MSVC_WARNING_DISABLE(4324)
22 template <const u
int32_t N = 1 << 12>
24 static_assert(N >= 2);
25 static_assert((N & (N - 1)) == 0, "Extent of JobQueue must be a power of 2");
26 static constexpr u
int32_t MASK = N - 1;
28 static_assert(sizeof(std::atomic_u
int32_t) == sizeof(JobHandle));
29 cnt::sarray<std::atomic_u
int32_t, N> m_buffer;
30 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_u
int32_t m_bottom;
31 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_u
int32_t m_top;
38 ~JobQueue() = default;
39 JobQueue(const JobQueue&) = default;
40 JobQueue& operator=(const JobQueue&) = default;
41 JobQueue(JobQueue&&) noexcept = default;
42 JobQueue& operator=(JobQueue&&) noexcept = default;
47 for (auto& val: m_buffer)
48 val.store(((JobHandle)JobNull_t()).value());
54 GAIA_PROF_SCOPE(JobQueue::empty);
56 const u
int32_t b = m_bottom.load(std::memory_order_relaxed);
57 const u
int32_t t = m_top.load(std::memory_order_relaxed);
58 return
int32_t(b - t) <= 0;
63 GAIA_NODISCARD
bool try_push(JobHandle jobHandle) {
64 GAIA_PROF_SCOPE(JobQueue::try_push);
66 const u
int32_t b = m_bottom.load(std::memory_order_relaxed);
67 const u
int32_t t = m_top.load(std::memory_order_acquire);
68 const u
int32_t used = b - t;
72 m_buffer[b & MASK].store(jobHandle.value(), std::memory_order_relaxed);
74 std::atomic_thread_fence(std::memory_order_release);
75 m_bottom.store(b + 1, std::memory_order_relaxed);
82 GAIA_NODISCARD u
int32_t try_push(std::span<JobHandle> jobHandles) {
83 GAIA_PROF_SCOPE(JobQueue::try_push);
85 const u
int32_t cnt = (u
int32_t)jobHandles.size();
86 u
int32_t b = m_bottom.load(std::memory_order_relaxed);
87 const u
int32_t t = m_top.load(std::memory_order_acquire);
88 const u
int32_t used = b - t;
89 const u
int32_t free = (MASK + 1) - used;
90 const u
int32_t freeFinal = core::get_min(cnt, free);
92 for (u
int32_t i = 0; i < freeFinal; i++, b++)
93 m_buffer[b & MASK].store(jobHandles[i].value(), std::memory_order_relaxed);
95 std::atomic_thread_fence(std::memory_order_release);
96 m_bottom.store(b, std::memory_order_relaxed);
103 GAIA_NODISCARD
bool try_pop(JobHandle& jobHandle) {
104 GAIA_PROF_SCOPE(JobQueue::try_pop);
106 u
int32_t jobHandleValue = ((JobHandle)JobNull_t{}).value();
108 const u
int32_t b = m_bottom.load(std::memory_order_relaxed) - 1;
109 m_bottom.store(b, std::memory_order_relaxed);
110 std::atomic_thread_fence(std::memory_order_seq_cst);
111 u
int32_t t = m_top.load(std::memory_order_relaxed);
113 if (
int(t - b) <= 0) {
115 jobHandleValue = m_buffer[b & MASK].load(std::memory_order_relaxed);
120 m_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
121 m_bottom.store(b + 1, std::memory_order_relaxed);
122 jobHandle = JobHandle(jobHandleValue);
123 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
127 jobHandle = JobHandle(jobHandleValue);
128 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
133 m_bottom.store(b + 1, std::memory_order_relaxed);
139 GAIA_NODISCARD
bool try_steal(JobHandle& jobHandle) {
140 GAIA_PROF_SCOPE(JobQueue::try_steal);
142 u
int32_t t = m_top.load(std::memory_order_acquire);
143 std::atomic_thread_fence(std::memory_order_seq_cst);
144 const u
int32_t b = m_bottom.load(std::memory_order_acquire);
146 if (
int(b - t) <= 0) {
147 jobHandle = (JobHandle)JobNull_t{};
151 const u
int32_t jobHandleValue = m_buffer[t & MASK].load(std::memory_order_relaxed);
154 const
bool ret = m_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
155 jobHandle = JobHandle(jobHandleValue);
156 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
163 template <
class T, const u
int32_t N = 1 << 12>
165 static_assert(N >= 2);
166 static_assert((N & (N - 1)) == 0, "Extent of MpmcQueue must be a power of 2");
167 static constexpr u
int32_t MASK = N - 1;
170 std::atomic_u
int32_t sequence{};
173 using view_policy = mem::data_view_policy_aos<Node>;
175 static constexpr u
int32_t extent = N;
176 static constexpr u
int32_t allocated_
bytes = view_policy::get_min_
byte_size(0, N);
180 GAIA_MSVC_WARNING_PUSH()
181 GAIA_MSVC_WARNING_DISABLE(4324)
183 mem::raw_data_holder<Node, allocated_
bytes> m_data;
184 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_u
int32_t m_pushPos;
185 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_u
int32_t m_popPos;
187 GAIA_MSVC_WARNING_POP()
197 MpmcQueue(MpmcQueue&&) = delete;
198 MpmcQueue(const MpmcQueue&) = delete;
199 MpmcQueue& operator=(MpmcQueue&&) = delete;
200 MpmcQueue& operator=(const MpmcQueue&) = delete;
203 GAIA_NODISCARD constexpr Node* data() noexcept {
204 return GAIA_ACC((Node*)&m_data[0]);
207 GAIA_NODISCARD constexpr const Node* data() const noexcept {
208 return GAIA_ACC((const Node*)&m_data[0]);
212 Node* pNodes = data();
215 Node* pNode = &pNodes[i];
216 core::call_ctor(&pNode->sequence, i);
219 m_pushPos.store(0, std::memory_order_relaxed);
220 m_popPos.store(0, std::memory_order_relaxed);
224 Node* pNodes = data();
226 uint32_t enqPos = m_pushPos.load(std::memory_order_relaxed);
227 uint32_t deqPos = m_popPos.load(std::memory_order_relaxed);
228 for (uint32_t pos = deqPos; pos != enqPos; ++pos) {
229 Node* pNode = &pNodes[pos & MASK];
230 if (pNode->sequence.load(std::memory_order_relaxed) == pos + 1)
235 Node* pNode = &pNodes[i];
236 core::call_dtor(&pNode->sequence);
244 GAIA_PROF_SCOPE(MpmcQueue::empty);
246 const uint32_t pos = m_popPos.load(std::memory_order_relaxed);
247 const auto* pNode = &data()[pos & MASK];
248 const uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
255 template <
typename TT>
256 bool try_push(TT&& item) {
257 GAIA_PROF_SCOPE(MpmcQueue::try_push);
259 Node* pNodes = data();
261 Node* pNode =
nullptr;
262 uint32_t pos = m_pushPos.load(std::memory_order_relaxed);
264 pNode = &pNodes[pos & MASK];
265 uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
266 int32_t diff = int32_t(seq) - int32_t(pos);
268 if (m_pushPos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
270 }
else if (diff < 0) {
274 pos = m_pushPos.load(std::memory_order_relaxed);
278 core::call_ctor(&pNode->item, GAIA_FWD(item));
279 pNode->sequence.store(pos + 1, std::memory_order_release);
286 bool try_pop(T& item) {
287 GAIA_PROF_SCOPE(MpmcQueue::try_pop);
289 Node* pNodes = data();
291 Node* pNode =
nullptr;
292 uint32_t pos = m_popPos.load(std::memory_order_relaxed);
294 pNode = &pNodes[pos & MASK];
295 uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
296 int32_t diff = int32_t(seq) - int32_t(pos + 1);
298 if (m_popPos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
300 }
else if (diff < 0) {
304 pos = m_popPos.load(std::memory_order_relaxed);
308 item = GAIA_MOV(pNode->item);
309 core::call_dtor(&pNode->item);
310 pNode->sequence.store(pos + MASK + 1, std::memory_order_release);
317GAIA_MSVC_WARNING_POP()
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9