Gaia-ECS v0.9.3
A simple and powerful entity component system
Loading...
Searching...
No Matches
jobqueue.h
1#pragma once
2
3#include "gaia/config/config.h"
4
5#include <atomic>
6
7#include "gaia/cnt/sarray.h"
8#include "gaia/config/profiler.h"
9#include "gaia/core/utility.h"
10#include "gaia/mt/jobhandle.h"
11
12// MSVC might warn about applying additional padding around alignas usage.
13// This is perfectly fine but can cause builds with warning-as-error turned on to fail.
14GAIA_MSVC_WARNING_PUSH()
15GAIA_MSVC_WARNING_DISABLE(4324)
16
17namespace gaia {
18 namespace mt {
19
22 template <const uint32_t N = 1 << 12>
23 class JobQueue {
24 static_assert(N >= 2);
25 static_assert((N & (N - 1)) == 0, "Extent of JobQueue must be a power of 2");
26 static constexpr uint32_t MASK = N - 1;
27
28 static_assert(sizeof(std::atomic_uint32_t) == sizeof(JobHandle));
29 cnt::sarray<std::atomic_uint32_t, N> m_buffer;
30 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_uint32_t m_bottom;
31 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_uint32_t m_top;
32
33 public:
34 JobQueue() {
35 clear();
36 }
37
38 ~JobQueue() = default;
39 JobQueue(const JobQueue&) = default;
40 JobQueue& operator=(const JobQueue&) = default;
41 JobQueue(JobQueue&&) noexcept = default;
42 JobQueue& operator=(JobQueue&&) noexcept = default;
43
44 void clear() {
45 m_bottom.store(0);
46 m_top.store(0);
47 for (auto& val: m_buffer)
48 val.store(((JobHandle)JobNull_t()).value());
49 }
50
53 bool empty() const {
54 GAIA_PROF_SCOPE(JobQueue::empty);
55
56 const uint32_t b = m_bottom.load(std::memory_order_relaxed);
57 const uint32_t t = m_top.load(std::memory_order_relaxed);
58 return int32_t(b - t) <= 0; // b<=t, but handles overflows, too
59 }
60
63 GAIA_NODISCARD bool try_push(JobHandle jobHandle) {
64 GAIA_PROF_SCOPE(JobQueue::try_push);
65
66 const uint32_t b = m_bottom.load(std::memory_order_relaxed);
67 const uint32_t t = m_top.load(std::memory_order_acquire);
68 const uint32_t used = b - t;
69 if (used > MASK)
70 return false;
71
72 m_buffer[b & MASK].store(jobHandle.value(), std::memory_order_relaxed);
73 // Make sure the handle is written before we update the bottom
74 std::atomic_thread_fence(std::memory_order_release);
75 m_bottom.store(b + 1, std::memory_order_relaxed);
76
77 return true;
78 }
79
82 GAIA_NODISCARD uint32_t try_push(std::span<JobHandle> jobHandles) {
83 GAIA_PROF_SCOPE(JobQueue::try_push);
84
85 const uint32_t cnt = (uint32_t)jobHandles.size();
86 uint32_t b = m_bottom.load(std::memory_order_relaxed);
87 const uint32_t t = m_top.load(std::memory_order_acquire);
88 const uint32_t used = b - t;
89 const uint32_t free = (MASK + 1) - used;
90 const uint32_t freeFinal = core::get_min(cnt, free);
91
92 for (uint32_t i = 0; i < freeFinal; i++, b++)
93 m_buffer[b & MASK].store(jobHandles[i].value(), std::memory_order_relaxed);
94 // Make sure handles are written before we update the bottom
95 std::atomic_thread_fence(std::memory_order_release);
96 m_bottom.store(b, std::memory_order_relaxed);
97
98 return freeFinal;
99 }
100
103 GAIA_NODISCARD bool try_pop(JobHandle& jobHandle) {
104 GAIA_PROF_SCOPE(JobQueue::try_pop);
105
106 uint32_t jobHandleValue = ((JobHandle)JobNull_t{}).value();
107
108 const uint32_t b = m_bottom.load(std::memory_order_relaxed) - 1;
109 m_bottom.store(b, std::memory_order_relaxed);
110 std::atomic_thread_fence(std::memory_order_seq_cst);
111 uint32_t t = m_top.load(std::memory_order_relaxed);
112
113 if (int(t - b) <= 0) { // t <= b, but handles overflows, too
114 // non-empty queue
115 jobHandleValue = m_buffer[b & MASK].load(std::memory_order_relaxed);
116
117 if (t == b) {
118 // last element in the queue
119 const bool ret =
120 m_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
121 m_bottom.store(b + 1, std::memory_order_relaxed);
122 jobHandle = JobHandle(jobHandleValue);
123 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
124 return ret; // false = failed race, don't use jobHandle; true = found a result
125 }
126
127 jobHandle = JobHandle(jobHandleValue);
128 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
129 return true;
130 }
131
132 // empty queue
133 m_bottom.store(b + 1, std::memory_order_relaxed);
134 return false; // false = empty, don't use jobHandle
135 }
136
139 GAIA_NODISCARD bool try_steal(JobHandle& jobHandle) {
140 GAIA_PROF_SCOPE(JobQueue::try_steal);
141
142 uint32_t t = m_top.load(std::memory_order_acquire);
143 std::atomic_thread_fence(std::memory_order_seq_cst);
144 const uint32_t b = m_bottom.load(std::memory_order_acquire);
145
146 if (int(b - t) <= 0) { // t >= b, but handles overflows, too
147 jobHandle = (JobHandle)JobNull_t{};
148 return true; // true + JobNull = empty, don't use jobHandle
149 }
150
151 const uint32_t jobHandleValue = m_buffer[t & MASK].load(std::memory_order_relaxed);
152
153 // We fail if concurrent pop()/steal() operation changed the current top
154 const bool ret = m_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
155 jobHandle = JobHandle(jobHandleValue);
156 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
157 return ret; // false = failed race, don't use jobHandle; true = found a result
158 }
159 };
160
163 template <class T, const uint32_t N = 1 << 12>
164 class MpmcQueue {
165 static_assert(N >= 2);
166 static_assert((N & (N - 1)) == 0, "Extent of MpmcQueue must be a power of 2");
167 static constexpr uint32_t MASK = N - 1;
168
169 struct Node {
170 std::atomic_uint32_t sequence{};
171 T item;
172 };
173 using view_policy = mem::data_view_policy_aos<Node>;
174
175 static constexpr uint32_t extent = N;
176 static constexpr uint32_t allocated_bytes = view_policy::get_min_byte_size(0, N);
177
178 // MSVC might warn about applying additional padding to an instance of StackAllocator.
179 // This is perfectly fine, but might make builds with warning-as-error turned on to fail.
180 GAIA_MSVC_WARNING_PUSH()
181 GAIA_MSVC_WARNING_DISABLE(4324)
182
183 mem::raw_data_holder<Node, allocated_bytes> m_data;
184 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_uint32_t m_pushPos;
185 GAIA_ALIGNAS(GAIA_CACHELINE_SIZE) std::atomic_uint32_t m_popPos;
186
187 GAIA_MSVC_WARNING_POP()
188
189 public:
190 MpmcQueue() {
191 init();
192 }
193 ~MpmcQueue() {
194 free();
195 }
196
197 MpmcQueue(MpmcQueue&&) = delete;
198 MpmcQueue(const MpmcQueue&) = delete;
199 MpmcQueue& operator=(MpmcQueue&&) = delete;
200 MpmcQueue& operator=(const MpmcQueue&) = delete;
201
202 private:
203 GAIA_NODISCARD constexpr Node* data() noexcept {
204 return GAIA_ACC((Node*)&m_data[0]);
205 }
206
207 GAIA_NODISCARD constexpr const Node* data() const noexcept {
208 return GAIA_ACC((const Node*)&m_data[0]);
209 }
210
211 void init() {
212 Node* pNodes = data();
213
214 GAIA_FOR(extent) {
215 Node* pNode = &pNodes[i];
216 core::call_ctor(&pNode->sequence, i);
217 }
218
219 m_pushPos.store(0, std::memory_order_relaxed);
220 m_popPos.store(0, std::memory_order_relaxed);
221 }
222
223 void free() {
224 Node* pNodes = data();
225
226 uint32_t enqPos = m_pushPos.load(std::memory_order_relaxed);
227 uint32_t deqPos = m_popPos.load(std::memory_order_relaxed);
228 for (uint32_t pos = deqPos; pos != enqPos; ++pos) {
229 Node* pNode = &pNodes[pos & MASK];
230 if (pNode->sequence.load(std::memory_order_relaxed) == pos + 1)
231 pNode->item.~T();
232 }
233
234 GAIA_FOR(extent) {
235 Node* pNode = &pNodes[i];
236 core::call_dtor(&pNode->sequence);
237 }
238 }
239
240 public:
243 bool empty() const {
244 GAIA_PROF_SCOPE(MpmcQueue::empty);
245
246 const uint32_t pos = m_popPos.load(std::memory_order_relaxed);
247 const auto* pNode = &data()[pos & MASK];
248 const uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
249 return pos >= seq;
250 }
251
255 template <typename TT>
256 bool try_push(TT&& item) {
257 GAIA_PROF_SCOPE(MpmcQueue::try_push);
258
259 Node* pNodes = data();
260
261 Node* pNode = nullptr;
262 uint32_t pos = m_pushPos.load(std::memory_order_relaxed);
263 while (true) {
264 pNode = &pNodes[pos & MASK];
265 uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
266 int32_t diff = int32_t(seq) - int32_t(pos);
267 if (diff == 0) {
268 if (m_pushPos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
269 break;
270 } else if (diff < 0) {
271 // The queue is full, we can't push
272 return false;
273 } else {
274 pos = m_pushPos.load(std::memory_order_relaxed);
275 }
276 }
277
278 core::call_ctor(&pNode->item, GAIA_FWD(item));
279 pNode->sequence.store(pos + 1, std::memory_order_release);
280 return true;
281 }
282
286 bool try_pop(T& item) {
287 GAIA_PROF_SCOPE(MpmcQueue::try_pop);
288
289 Node* pNodes = data();
290
291 Node* pNode = nullptr;
292 uint32_t pos = m_popPos.load(std::memory_order_relaxed);
293 while (true) {
294 pNode = &pNodes[pos & MASK];
295 uint32_t seq = pNode->sequence.load(std::memory_order_acquire);
296 int32_t diff = int32_t(seq) - int32_t(pos + 1);
297 if (diff == 0) {
298 if (m_popPos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
299 break;
300 } else if (diff < 0) {
301 // The queue is empty, we can't pop
302 return false;
303 } else {
304 pos = m_popPos.load(std::memory_order_relaxed);
305 }
306 }
307
308 item = GAIA_MOV(pNode->item);
309 core::call_dtor(&pNode->item);
310 pNode->sequence.store(pos + MASK + 1, std::memory_order_release);
311 return true;
312 }
313 };
314 } // namespace mt
315} // namespace gaia
316
317GAIA_MSVC_WARNING_POP()
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9