Gaia-ECS v0.9.3
A simple and powerful entity component system
Loading...
Searching...
No Matches
jobqueue.h
1#pragma once
2
3#include "gaia/config/config.h"
4
5#include <atomic>
6#include <mutex>
7
8#include "gaia/cnt/sarray.h"
9#include "gaia/config/profiler.h"
10#include "gaia/core/utility.h"
11#include "gaia/mt/jobhandle.h"
12
13// MSVC might warn about applying additional padding around alignas usage.
14// This is perfectly fine but can cause builds with warning-as-error turned on to fail.
15GAIA_MSVC_WARNING_PUSH()
16GAIA_MSVC_WARNING_DISABLE(4324)
17
18namespace gaia {
19 namespace mt {
20
23 template <const uint32_t N = 1 << 12>
24 class JobQueue {
25 static_assert(N >= 2);
26 static_assert((N & (N - 1)) == 0, "Extent of JobQueue must be a power of 2");
27 static constexpr uint32_t MASK = N - 1;
28
29 static_assert(sizeof(std::atomic_uint32_t) == sizeof(JobHandle));
30 cnt::sarray<std::atomic_uint32_t, N> m_buffer;
31 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_uint32_t m_bottom;
32 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_uint32_t m_top;
33
34 public:
35 JobQueue() {
36 clear();
37 }
38
39 ~JobQueue() = default;
40 JobQueue(const JobQueue&) = default;
41 JobQueue& operator=(const JobQueue&) = default;
42 JobQueue(JobQueue&&) noexcept = default;
43 JobQueue& operator=(JobQueue&&) noexcept = default;
44
45 void clear() {
46 m_bottom.store(0);
47 m_top.store(0);
48 for (auto& val: m_buffer)
49 val.store(((JobHandle)JobNull_t()).value());
50 }
51
54 bool empty() const {
55 GAIA_PROF_SCOPE(JobQueue::empty);
56
57 const uint32_t b = m_bottom.load(std::memory_order_relaxed);
58 const uint32_t t = m_top.load(std::memory_order_relaxed);
59 return int32_t(b - t) <= 0; // b<=t, but handles overflows, too
60 }
61
64 GAIA_NODISCARD bool try_push(JobHandle jobHandle) {
65 GAIA_PROF_SCOPE(JobQueue::try_push);
66
67 const uint32_t b = m_bottom.load(std::memory_order_relaxed);
68 const uint32_t t = m_top.load(std::memory_order_acquire);
69 const uint32_t used = b - t;
70 if (used > MASK)
71 return false;
72
73 m_buffer[b & MASK].store(jobHandle.value(), std::memory_order_relaxed);
74 // Make sure the handle is written before we update the bottom
75 std::atomic_thread_fence(std::memory_order_release);
76 m_bottom.store(b + 1, std::memory_order_relaxed);
77
78 return true;
79 }
80
83 GAIA_NODISCARD uint32_t try_push(std::span<JobHandle> jobHandles) {
84 GAIA_PROF_SCOPE(JobQueue::try_push);
85
86 const uint32_t cnt = (uint32_t)jobHandles.size();
87 uint32_t b = m_bottom.load(std::memory_order_relaxed);
88 const uint32_t t = m_top.load(std::memory_order_acquire);
89 const uint32_t used = b - t;
90 const uint32_t free = (MASK + 1) - used;
91 const uint32_t freeFinal = core::get_min(cnt, free);
92
93 for (uint32_t i = 0; i < freeFinal; i++, b++)
94 m_buffer[b & MASK].store(jobHandles[i].value(), std::memory_order_relaxed);
95 // Make sure handles are written before we update the bottom
96 std::atomic_thread_fence(std::memory_order_release);
97 m_bottom.store(b, std::memory_order_relaxed);
98
99 return freeFinal;
100 }
101
104 GAIA_NODISCARD bool try_pop(JobHandle& jobHandle) {
105 GAIA_PROF_SCOPE(JobQueue::try_pop);
106
107 uint32_t jobHandleValue = ((JobHandle)JobNull_t{}).value();
108
109 const uint32_t b = m_bottom.load(std::memory_order_relaxed) - 1;
110 m_bottom.store(b, std::memory_order_relaxed);
111 std::atomic_thread_fence(std::memory_order_seq_cst);
112 uint32_t t = m_top.load(std::memory_order_relaxed);
113
114 if (int(t - b) <= 0) { // t <= b, but handles overflows, too
115 // non-empty queue
116 jobHandleValue = m_buffer[b & MASK].load(std::memory_order_relaxed);
117
118 if (t == b) {
119 // last element in the queue
120 const bool ret =
121 m_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
122 m_bottom.store(b + 1, std::memory_order_relaxed);
123 jobHandle = JobHandle(jobHandleValue);
124 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
125 return ret; // false = failed race, don't use jobHandle; true = found a result
126 }
127
128 jobHandle = JobHandle(jobHandleValue);
129 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
130 return true;
131 }
132
133 // empty queue
134 m_bottom.store(b + 1, std::memory_order_relaxed);
135 return false; // false = empty, don't use jobHandle
136 }
137
140 GAIA_NODISCARD bool try_steal(JobHandle& jobHandle) {
141 GAIA_PROF_SCOPE(JobQueue::try_steal);
142
143 uint32_t t = m_top.load(std::memory_order_acquire);
144 std::atomic_thread_fence(std::memory_order_seq_cst);
145 const uint32_t b = m_bottom.load(std::memory_order_acquire);
146
147 if (int(b - t) <= 0) { // t >= b, but handles overflows, too
148 jobHandle = (JobHandle)JobNull_t{};
149 return true; // true + JobNull = empty, don't use jobHandle
150 }
151
152 const uint32_t jobHandleValue = m_buffer[t & MASK].load(std::memory_order_relaxed);
153
154 // We fail if concurrent pop()/steal() operation changed the current top
155 const bool ret = m_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
156 jobHandle = JobHandle(jobHandleValue);
157 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
158 return ret; // false = failed race, don't use jobHandle; true = found a result
159 }
160 };
161
164 template <class T, const uint32_t N = 1 << 12>
165 class MpmcQueue {
166 static_assert(N >= 2);
167 static_assert((N & (N - 1)) == 0, "Extent of MpmcQueue must be a power of 2");
168 static constexpr uint32_t MASK = N - 1;
169
170 struct Node {
171 std::atomic_uint32_t sequence{};
172 T item;
173 };
174 using view_policy = mem::data_view_policy_aos<Node>;
175
176 static constexpr uint32_t extent = N;
177 static constexpr uint32_t allocated_bytes = view_policy::get_min_byte_size(0, N);
178
179 // MSVC might warn about applying additional padding to an instance of StackAllocator.
180 // This is perfectly fine, but might make builds with warning-as-error turned on to fail.
181 GAIA_MSVC_WARNING_PUSH()
182 GAIA_MSVC_WARNING_DISABLE(4324)
183
184 mem::raw_data_holder<Node, allocated_bytes> m_data;
185 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_uint32_t m_pushPos;
186 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_uint32_t m_popPos;
187
188 GAIA_MSVC_WARNING_POP()
189
190 public:
191 MpmcQueue() {
192 init();
193 }
194 ~MpmcQueue() {
195 free();
196 }
197
198 MpmcQueue(MpmcQueue&&) = delete;
199 MpmcQueue(const MpmcQueue&) = delete;
200 MpmcQueue& operator=(MpmcQueue&&) = delete;
201 MpmcQueue& operator=(const MpmcQueue&) = delete;
202
203 private:
204 GAIA_NODISCARD constexpr Node* data() noexcept {
205 return GAIA_ACC((Node*)&m_data[0]);
206 }
207
208 GAIA_NODISCARD constexpr const Node* data() const noexcept {
209 return GAIA_ACC((const Node*)&m_data[0]);
210 }
211
212 void init() {
213 Node* pNodes = data();
214
215 GAIA_FOR(extent) {
216 Node* pNode = &pNodes[i];
217 core::call_ctor(&pNode->sequence, i);
218 }
219
220 m_pushPos.store(0, std::memory_order_relaxed);
221 m_popPos.store(0, std::memory_order_relaxed);
222 }
223
224 void free() {
225 Node* pNodes = data();
226
227 uint32_t enqPos = m_pushPos.load(std::memory_order_relaxed);
228 uint32_t deqPos = m_popPos.load(std::memory_order_relaxed);
229 for (uint32_t pos = deqPos; pos != enqPos; ++pos) {
230 Node* pNode = &pNodes[pos & MASK];
231 if (pNode->sequence.load(std::memory_order_relaxed) == pos + 1)
232 pNode->item.~T();
233 }
234
235 GAIA_FOR(extent) {
236 Node* pNode = &pNodes[i];
237 core::call_dtor(&pNode->sequence);
238 }
239 }
240
241 public:
244 bool empty() const {
245 GAIA_PROF_SCOPE(MpmcQueue::empty);
246
247 const uint32_t pos = m_popPos.load(std::memory_order_relaxed);
248 const auto* pNode = &data()[pos & MASK];
249 const uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
250 return pos >= seq;
251 }
252
256 template <typename TT>
257 bool try_push(TT&& item) {
258 GAIA_PROF_SCOPE(MpmcQueue::try_push);
259
260 Node* pNodes = data();
261
262 Node* pNode = nullptr;
263 uint32_t pos = m_pushPos.load(std::memory_order_relaxed);
264 while (true) {
265 pNode = &pNodes[pos & MASK];
266 uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
267 int32_t diff = int32_t(seq) - int32_t(pos);
268 if (diff == 0) {
269 if (m_pushPos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
270 break;
271 } else if (diff < 0) {
272 // The queue is full, we can't push
273 return false;
274 } else {
275 pos = m_pushPos.load(std::memory_order_relaxed);
276 }
277 }
278
279 core::call_ctor(&pNode->item, GAIA_FWD(item));
280 pNode->sequence.store(pos + 1, std::memory_order_release);
281 return true;
282 }
283
287 bool try_pop(T& item) {
288 GAIA_PROF_SCOPE(MpmcQueue::try_pop);
289
290 Node* pNodes = data();
291
292 Node* pNode = nullptr;
293 uint32_t pos = m_popPos.load(std::memory_order_relaxed);
294 while (true) {
295 pNode = &pNodes[pos & MASK];
296 uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
297 int32_t diff = int32_t(seq) - int32_t(pos + 1);
298 if (diff == 0) {
299 if (m_popPos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
300 break;
301 } else if (diff < 0) {
302 // The queue is empty, we can't pop
303 return false;
304 } else {
305 pos = m_popPos.load(std::memory_order_relaxed);
306 }
307 }
308
309 item = GAIA_MOV(pNode->item);
310 core::call_dtor(&pNode->item);
311 pNode->sequence.store(pos + MASK + 1, std::memory_order_release);
312 return true;
313 }
314 };
315 } // namespace mt
316} // namespace gaia
317
318GAIA_MSVC_WARNING_POP()
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9