Gaia-ECS v0.9.3
A simple and powerful entity component system
Loading...
Searching...
No Matches
jobmanager.h
1#pragma once
2
3#include "gaia/config/config.h"
4#include "gaia/config/profiler.h"
5
6#include <atomic>
7#include <cinttypes>
8
9#include "gaia/cnt/ilist.h"
10#include "gaia/core/span.h"
11#include "gaia/core/utility.h"
12#include "gaia/mem/mem_alloc.h"
13#include "gaia/mt/jobcommon.h"
14#include "gaia/mt/jobhandle.h"
15
16#define GAIA_LOG_JOB_STATES 0
17
18namespace gaia {
19 namespace mt {
21 enum JobState : uint32_t {
22 DEP_BITS_START = 0,
23 DEP_BITS = 27,
24 DEP_BITS_MASK = (uint32_t)((1u << DEP_BITS) - 1),
25
26 STATE_BITS_START = DEP_BITS_START + DEP_BITS,
27 STATE_BITS = 4,
28 STATE_BITS_MASK = (uint32_t)(((1u << STATE_BITS) - 1) << STATE_BITS_START),
29
30 // STATE
31
33 Submitted = 0x01 << STATE_BITS_START,
35 Processing = 0x02 << STATE_BITS_START,
37 Executing = 0x03 << STATE_BITS_START,
39 Done = 0x04 << STATE_BITS_START,
41 Released = 0x05 << STATE_BITS_START
42 };
43
44 struct JobContainer;
45
46 namespace detail {
47 inline void signal_edge(JobContainer& jobData);
48 }
49
52 struct JobEdges {
55 union {
56 JobHandle dep;
57 JobHandle* pDeps;
58 };
60 uint32_t depCnt = 0;
61
62 JobEdges() {
63 dep = {};
64 }
65 };
66
79 std::atomic_uint32_t state;
81 JobPriority prio;
83 JobCreationFlags flags;
88
89 JobContainer() = default;
90 ~JobContainer() = default;
91
92 JobContainer(const JobContainer& other) = delete;
93 JobContainer& operator=(const JobContainer& other) = delete;
94
95 JobContainer(JobContainer&& other): cnt::ilist_item(GAIA_MOV(other)) {
96 state = other.state.load();
97 prio = other.prio;
98 flags = other.flags;
99 func = GAIA_MOV(other.func);
100
101 // if (edges.depCnt > 0)
102 // detail::signal_edge(*this);
103 edges = other.edges;
104 other.edges.depCnt = 0;
105 }
106 JobContainer& operator=(JobContainer&& other) {
107 GAIA_ASSERT(core::addressof(other) != this);
108 cnt::ilist_item::operator=(GAIA_MOV(other));
109 state = other.state.load();
110 prio = other.prio;
111 flags = other.flags;
112 func = GAIA_MOV(other.func);
113
114 // if (edges.depCnt > 0)
115 // detail::signal_edge(*this);
116 edges = other.edges;
117 other.edges.depCnt = 0;
118
119 return *this;
120 }
121
127 GAIA_NODISCARD static JobContainer create(uint32_t index, uint32_t generation, void* pCtx) {
128 auto* ctx = (JobAllocCtx*)pCtx;
129
130 JobContainer jc{};
131 jc.idx = index;
132 jc.data.gen = generation;
133 jc.prio = ctx->priority;
134
135 return jc;
136 }
137
141 GAIA_NODISCARD static JobHandle handle(const JobContainer& jc) {
142 return JobHandle(jc.idx, jc.data.gen, (jc.prio == JobPriority::Low) != 0);
143 }
144 };
145
148 static constexpr uint32_t IdMask = uint32_t(-1);
149
150 uint32_t m_id = IdMask;
151 uint32_t m_gen = 0;
152
158 ParallelCallbackHandle(uint32_t id, uint32_t gen): m_id(id), m_gen(gen) {}
159
161 GAIA_NODISCARD uint32_t id() const {
162 return m_id;
163 }
164
166 GAIA_NODISCARD uint32_t gen() const {
167 return m_gen;
168 }
169
171 GAIA_NODISCARD bool operator==(const ParallelCallbackHandle& other) const {
172 return m_id == other.m_id && m_gen == other.m_gen;
173 }
174 };
175
178 JobArgsFunc callback;
179 uint32_t refs = 0;
180 };
181
184 JobArgsFunc callback;
185 std::atomic_uint32_t refs = 0;
186
191
193 ParallelCallbackRecord& operator=(const ParallelCallbackRecord&) = delete;
194
196 cnt::ilist_item(GAIA_MOV(other)), callback(GAIA_MOV(other.callback)) {
197 refs.store(other.refs.load(std::memory_order_relaxed), std::memory_order_relaxed);
198 }
199
200 ParallelCallbackRecord& operator=(ParallelCallbackRecord&& other) noexcept {
201 GAIA_ASSERT(core::addressof(other) != this);
202 cnt::ilist_item::operator=(GAIA_MOV(other));
203 callback = GAIA_MOV(other.callback);
204 refs.store(other.refs.load(std::memory_order_relaxed), std::memory_order_relaxed);
205 return *this;
206 }
207
213 GAIA_NODISCARD static ParallelCallbackRecord create(uint32_t index, uint32_t generation, void* pCtx) {
214 auto* ctx = (ParallelCallbackAllocCtx*)pCtx;
215
216 ParallelCallbackRecord record{};
217 record.idx = index;
218 record.data.gen = generation;
219 record.callback = GAIA_MOV(ctx->callback);
220 record.refs.store(ctx->refs, std::memory_order_relaxed);
221 return record;
222 }
223
227 GAIA_NODISCARD static ParallelCallbackHandle handle(const ParallelCallbackRecord& record) {
228 return ParallelCallbackHandle(record.idx, record.data.gen);
229 }
230 };
231
235 static constexpr uint32_t JobDataPageCount = JobDataLayout::page_count_for_capacity(JobHandle::IdMask);
236
242
243 public:
247 return m_jobData.live_unsafe(jobHandle.id());
248 }
251 const JobContainer& data(JobHandle jobHandle) const {
252 return m_jobData.live_unsafe(jobHandle.id());
253 }
254
258 GAIA_NODISCARD JobHandle alloc_job(Job job) {
259 JobAllocCtx ctx{job.priority};
260
261 auto handle = m_jobData.alloc(&ctx);
262 auto& j = m_jobData[handle.id()];
263
264 // Make sure there is not state yet
265 GAIA_ASSERT(j.state == 0 || j.state == JobState::Released);
266
267 j.edges = {};
268 j.prio = ctx.priority;
269 j.state.store(0);
270 j.func = GAIA_MOV(job.func);
271 j.flags = job.flags;
272 return handle;
273 }
274
279 GAIA_NODISCARD ParallelCallbackHandle alloc_parallel_callback(JobArgsFunc callback, uint32_t refs) {
281 ctx.callback = GAIA_MOV(callback);
282 ctx.refs = refs;
283 return m_parallelCallbacks.alloc(&ctx);
284 }
285
290 void free_job(JobHandle jobHandle) {
291 auto& jobData = m_jobData.live_unsafe(jobHandle.id());
292 GAIA_ASSERT(done(jobData));
293 jobData.state.store(JobState::Released);
294 m_jobData.free_keep_live(jobHandle);
295 }
296
300 auto& record = m_parallelCallbacks[handle.id()];
301 record.callback.reset();
302 record.refs.store(0, std::memory_order_relaxed);
303 m_parallelCallbacks.free(handle);
304 }
305
307 void reset() {
308 m_jobData.clear();
309 m_parallelCallbacks.clear();
310 }
311
314 static void run(JobContainer& jobData) {
315 if (jobData.func.operator bool())
316 jobData.func();
317
318 finalize(jobData);
319 }
320
324 static bool signal_edge(JobContainer& jobData) {
325 // Subtract from dependency counter
326 const auto state = jobData.state.fetch_sub(1) - 1;
327
328 // If the job is not submitted, we can't accept it
329 const auto s = state & JobState::STATE_BITS_MASK;
330 if (s != JobState::Submitted)
331 return false;
332
333 // If the job still has some dependencies left we can't accept it
334 const auto deps = state & JobState::DEP_BITS_MASK;
335 return deps == 0;
336 }
337
340 static void free_edges(JobContainer& jobData) {
341 // We only allocate an array for 2 and more dependencies
342 if (jobData.edges.depCnt <= 1)
343 return;
344
345 mem::AllocHelper::free(jobData.edges.pDeps);
346 // jobData.edges.depCnt = 0;
347 // jobData.edges.pDeps = nullptr;
348 }
349
355 void dep(JobHandle jobFirst, JobHandle jobSecond) {
356 dep(std::span(&jobFirst, 1), jobSecond);
357 }
358
364 void dep(std::span<JobHandle> jobsFirst, JobHandle jobSecond) {
365 GAIA_ASSERT(!jobsFirst.empty());
366
367 GAIA_PROF_SCOPE(JobManager::dep);
368
369 auto& secondData = data(jobSecond);
370
371#if GAIA_ASSERT_ENABLED
372 GAIA_ASSERT(!busy(const_cast<const JobContainer&>(secondData)));
373 for (auto jobFirst: jobsFirst) {
374 const auto& firstData = data(jobFirst);
375 GAIA_ASSERT(!busy(firstData));
376 }
377#endif
378
379 for (auto jobFirst: jobsFirst)
380 dep_internal(jobFirst, jobSecond);
381
382 // Tell jobSecond that it has new dependencies
383 // secondData.canWait = true;
384 const uint32_t cnt = (uint32_t)jobsFirst.size();
385 [[maybe_unused]] const uint32_t statePrev = secondData.state.fetch_add(cnt);
386 GAIA_ASSERT((statePrev & JobState::DEP_BITS_MASK) < DEP_BITS_MASK - 1);
387 }
388
395 void dep_refresh(std::span<JobHandle> jobsFirst, JobHandle jobSecond) {
396 GAIA_ASSERT(!jobsFirst.empty());
397
398 GAIA_PROF_SCOPE(JobManager::dep_refresh);
399
400 auto& secondData = data(jobSecond);
401
402#if GAIA_ASSERT_ENABLED
403 GAIA_ASSERT(!busy(const_cast<const JobContainer&>(secondData)));
404 for (auto jobFirst: jobsFirst) {
405 const auto& firstData = data(jobFirst);
406 GAIA_ASSERT(!busy(firstData));
407 }
408
409 for (auto jobFirst: jobsFirst)
410 dep_refresh_internal(jobFirst, jobSecond);
411#endif
412
413 // Tell jobSecond that it has new dependencies
414 // secondData.canWait = true;
415 const uint32_t cnt = (uint32_t)jobsFirst.size();
416 [[maybe_unused]] const uint32_t statePrev = secondData.state.fetch_add(cnt);
417 GAIA_ASSERT((statePrev & JobState::DEP_BITS_MASK) < DEP_BITS_MASK - 1);
418 }
419
423 static uint32_t submit(JobContainer& jobData) {
424 [[maybe_unused]] const auto state = jobData.state.load() & JobState::STATE_BITS_MASK;
425 GAIA_ASSERT(state < JobState::Submitted);
426 const auto val = jobData.state.fetch_add(JobState::Submitted) + (uint32_t)JobState::Submitted;
427#if GAIA_LOG_JOB_STATES
428 GAIA_LOG_N("JobHandle %u.%u - SUBMITTED", jobData.idx, jobData.gen);
429#endif
430 return val;
431 }
432
435 static void processing(JobContainer& jobData) {
436 GAIA_ASSERT(submitted(const_cast<const JobContainer&>(jobData)));
437 jobData.state.store(JobState::Processing);
438#if GAIA_LOG_JOB_STATES
439 GAIA_LOG_N("JobHandle %u.%u - PROCESSING", jobData.idx, jobData.gen);
440#endif
441 }
442
446 static void executing(JobContainer& jobData, uint32_t workerIdx) {
447 GAIA_ASSERT(processing(const_cast<const JobContainer&>(jobData)));
448 jobData.state.store(JobState::Executing | workerIdx);
449#if GAIA_LOG_JOB_STATES
450 GAIA_LOG_N("JobHandle %u.%u - EXECUTING", jobData.idx, jobData.gen);
451#endif
452 }
453
456 static void finalize(JobContainer& jobData) {
457 jobData.state.store(JobState::Done);
458#if GAIA_LOG_JOB_STATES
459 GAIA_LOG_N("JobHandle %u.%u - DONE", jobData.idx, jobData.gen);
460#endif
461 }
462
465 static void reset_state(JobContainer& jobData) {
466 [[maybe_unused]] const auto state = jobData.state.load() & JobState::STATE_BITS_MASK;
467 // The job needs to be either clear or finalize for us to allow a reset
468 GAIA_ASSERT(state == 0 || state == JobState::Done);
469 jobData.state.store(0);
470#if GAIA_LOG_JOB_STATES
471 GAIA_LOG_N("JobHandle %u.%u - RESET_STATE", jobData.idx, jobData.gen);
472#endif
473 }
474
478 GAIA_NODISCARD bool is_clear(JobHandle jobHandle) const {
479 const auto& jobData = data(jobHandle);
480 const auto state = jobData.state.load();
481 return state == 0;
482 }
483
487 GAIA_NODISCARD static bool is_clear(JobContainer& jobData) {
488 const auto state = jobData.state.load();
489 return state == 0;
490 }
491
495 GAIA_NODISCARD static bool submitted(const JobContainer& jobData) {
496 const auto state = jobData.state.load() & JobState::STATE_BITS_MASK;
497 return state == JobState::Submitted;
498 }
499
503 GAIA_NODISCARD static bool processing(const JobContainer& jobData) {
504 const auto state = jobData.state.load() & JobState::STATE_BITS_MASK;
505 return state == JobState::Processing;
506 }
507
511 GAIA_NODISCARD static bool busy(const JobContainer& jobData) {
512 const auto state = jobData.state.load() & JobState::STATE_BITS_MASK;
513 return state == JobState::Executing || state == JobState::Processing;
514 }
515
519 GAIA_NODISCARD static bool done(const JobContainer& jobData) {
520 const auto state = jobData.state.load() & JobState::STATE_BITS_MASK;
521 return state == JobState::Done;
522 }
523
528 auto& record = m_parallelCallbacks[handle.id()];
529 GAIA_ASSERT(record.data.gen == handle.gen());
530 record.callback(args);
531 }
532
537 auto& record = m_parallelCallbacks[handle.id()];
538 GAIA_ASSERT(record.data.gen == handle.gen());
539 return record.refs.fetch_sub(1, std::memory_order_acq_rel) == 1;
540 }
541
542 private:
543 void dep_internal(JobHandle jobFirst, JobHandle jobSecond) {
544 GAIA_ASSERT(jobFirst != (JobHandle)JobNull_t{});
545 GAIA_ASSERT(jobSecond != (JobHandle)JobNull_t{});
546
547 auto& firstData = data(jobFirst);
548 const auto depCnt0 = firstData.edges.depCnt;
549 const auto depCnt1 = ++firstData.edges.depCnt;
550
551#if GAIA_LOG_JOB_STATES
552 GAIA_LOG_N(
553 "DEP %u.%u, %u -> %u.%u", jobFirst.id(), jobFirst.gen(), firstData.edges.depCnt, jobSecond.id(),
554 jobSecond.gen());
555#endif
556
557 if (depCnt1 <= 1) {
558 firstData.edges.dep = jobSecond;
559 } else if (depCnt1 == 2) {
560 auto prev = firstData.edges.dep;
561 // TODO: Use custom allocator
562 firstData.edges.pDeps = mem::AllocHelper::alloc<JobHandle>(depCnt1);
563 firstData.edges.pDeps[0] = prev;
564 firstData.edges.pDeps[1] = jobSecond;
565 } else {
566 // Reallocate if the previous value was a power of 2
567 const bool isPow2 = core::is_pow2(depCnt0);
568 if (isPow2) {
569 const auto nextPow2 = depCnt0 << 1;
570 auto* pPrev = firstData.edges.pDeps;
571 // TODO: Use custom allocator
572 firstData.edges.pDeps = mem::AllocHelper::alloc<JobHandle>(nextPow2);
573 if (pPrev != nullptr) {
574 GAIA_FOR(depCnt0) firstData.edges.pDeps[i] = pPrev[i];
575 mem::AllocHelper::free(pPrev);
576 }
577 }
578
579 // Append new dependencies
580 firstData.edges.pDeps[depCnt0] = jobSecond;
581 }
582 }
583
584#if GAIA_ASSERT_ENABLED
585 void dep_refresh_internal(JobHandle jobFirst, JobHandle jobSecond) const {
586 GAIA_ASSERT(jobFirst != (JobHandle)JobNull_t{});
587 GAIA_ASSERT(jobSecond != (JobHandle)JobNull_t{});
588
589 const auto& firstData = data(jobFirst);
590 const auto depCnt = firstData.edges.depCnt;
591
592 if (depCnt <= 1) {
593 GAIA_ASSERT(firstData.edges.dep == jobSecond);
594 } else {
595 GAIA_ASSERT(firstData.edges.pDeps != nullptr);
596 bool found = false;
597 GAIA_FOR(firstData.edges.depCnt) {
598 if (firstData.edges.pDeps[i] == jobSecond) {
599 found = true;
600 break;
601 }
602 }
603 GAIA_ASSERT(found);
604 }
605 }
606#endif
607 };
608
609 namespace detail {
610 void signal_edge(JobContainer& jobData) {
612 }
613 } // namespace detail
614 } // namespace mt
615} // namespace gaia
Array with variable size of elements of type.
Definition darray_impl.h:25
Definition span_impl.h:99
Move-only callback wrapper specialized for parallel job ranges.
Definition jobcommon.h:51
Storage and lifecycle manager for internal job and parallel callback records.
Definition jobmanager.h:233
static GAIA_NODISCARD bool busy(const JobContainer &jobData)
Checks whether jobData currently is executing or queued for execution.
Definition jobmanager.h:511
void reset()
Resets the job pool.
Definition jobmanager.h:307
static void free_edges(JobContainer &jobData)
Releases heap storage used for the dependency list of jobData.
Definition jobmanager.h:340
static void reset_state(JobContainer &jobData)
Resets a completed or never-submitted job back to the clear state.
Definition jobmanager.h:465
static GAIA_NODISCARD bool done(const JobContainer &jobData)
Checks whether jobData has finished executing.
Definition jobmanager.h:519
static GAIA_NODISCARD bool submitted(const JobContainer &jobData)
Checks whether jobData has been submitted but not yet queued for execution.
Definition jobmanager.h:495
static void processing(JobContainer &jobData)
Marks a job as queued for worker processing.
Definition jobmanager.h:435
GAIA_NODISCARD bool is_clear(JobHandle jobHandle) const
Checks whether the job referenced by jobHandle is in the clear state.
Definition jobmanager.h:478
static void finalize(JobContainer &jobData)
Marks a job as finished.
Definition jobmanager.h:456
void dep(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition jobmanager.h:364
static void run(JobContainer &jobData)
Execute the functor associated with the job container.
Definition jobmanager.h:314
GAIA_NODISCARD ParallelCallbackHandle alloc_parallel_callback(JobArgsFunc callback, uint32_t refs)
Allocates a shared callback record used by parallel jobs.
Definition jobmanager.h:279
static void executing(JobContainer &jobData, uint32_t workerIdx)
Marks a job as executing on the worker identified by workerIdx.
Definition jobmanager.h:446
void free_parallel_callback(ParallelCallbackHandle handle)
Releases a shared callback record.
Definition jobmanager.h:299
void free_job(JobHandle jobHandle)
Invalidates jobHandle by resetting its index in the job pool. Every time a job is deallocated its gen...
Definition jobmanager.h:290
static GAIA_NODISCARD bool is_clear(JobContainer &jobData)
Checks whether jobData is in the clear state.
Definition jobmanager.h:487
JobContainer & data(JobHandle jobHandle)
Returns mutable internal storage for jobHandle.
Definition jobmanager.h:246
void invoke_parallel_callback(ParallelCallbackHandle handle, const JobArgs &args)
Invokes the shared callback referenced by handle.
Definition jobmanager.h:527
static GAIA_NODISCARD bool processing(const JobContainer &jobData)
Checks whether jobData is queued for worker processing.
Definition jobmanager.h:503
void dep(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition jobmanager.h:355
const JobContainer & data(JobHandle jobHandle) const
Returns immutable internal storage for jobHandle.
Definition jobmanager.h:251
GAIA_NODISCARD bool release_parallel_callback_ref(ParallelCallbackHandle handle)
Releases one reference to the callback referenced by handle.
Definition jobmanager.h:536
void dep_refresh(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition jobmanager.h:395
GAIA_NODISCARD JobHandle alloc_job(Job job)
Allocates a new job container identified by a unique JobHandle.
Definition jobmanager.h:258
static uint32_t submit(JobContainer &jobData)
Marks a job as submitted.
Definition jobmanager.h:423
static bool signal_edge(JobContainer &jobData)
Signals that one dependency edge of jobData has completed.
Definition jobmanager.h:324
Move-only function wrapper with inline storage and optional SmallBlockAllocator spill storage....
Definition small_func.h:19
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9
uint32_t gen
Generation ID.
Definition ilist.h:125
Definition ilist.h:122
uint32_t idx
Allocated items: Index in the list. Deleted items: Index of the next deleted item in the list.
Definition ilist.h:130
ItemData data
Item data.
Definition ilist.h:132
Definition jobcommon.h:35
Definition jobcommon.h:45
Internal storage for a scheduled job and its dependency metadata.
Definition jobmanager.h:68
static GAIA_NODISCARD JobContainer create(uint32_t index, uint32_t generation, void *pCtx)
Creates a new job container for the intrusive storage.
Definition jobmanager.h:127
static GAIA_NODISCARD JobHandle handle(const JobContainer &jc)
Returns the public handle associated with jc.
Definition jobmanager.h:141
util::SmallFunc func
Function to execute when running the job.
Definition jobmanager.h:87
JobPriority prio
Job priority.
Definition jobmanager.h:81
std::atomic_uint32_t state
Current state of the job Consist of upper and bottom part. Least significant bits = special purpose....
Definition jobmanager.h:79
JobEdges edges
Dependency graph.
Definition jobmanager.h:85
JobCreationFlags flags
Job flags.
Definition jobmanager.h:83
Outgoing dependency edges for a job. Stores either a single dependent job handle or a heap-allocated ...
Definition jobmanager.h:52
uint32_t depCnt
Number of dependencies.
Definition jobmanager.h:60
Definition jobhandle.h:13
Definition jobhandle.h:85
Definition jobcommon.h:39
Allocation context used when creating shared parallel callbacks.
Definition jobmanager.h:177
Handle identifying a shared callback used by parallel jobs.
Definition jobmanager.h:147
ParallelCallbackHandle()=default
Creates an invalid callback handle.
ParallelCallbackHandle(uint32_t id, uint32_t gen)
Creates a callback handle from the given identifier and generation.
Definition jobmanager.h:158
GAIA_NODISCARD bool operator==(const ParallelCallbackHandle &other) const
Checks whether two callback handles reference the same record.
Definition jobmanager.h:171
GAIA_NODISCARD uint32_t id() const
Returns the slot identifier.
Definition jobmanager.h:161
GAIA_NODISCARD uint32_t gen() const
Returns the slot generation.
Definition jobmanager.h:166
Internal storage record for a shared parallel callback.
Definition jobmanager.h:183
static GAIA_NODISCARD ParallelCallbackRecord create(uint32_t index, uint32_t generation, void *pCtx)
Creates a new callback record for the intrusive storage.
Definition jobmanager.h:213
ParallelCallbackRecord()=default
Creates an empty callback record.
~ParallelCallbackRecord()=default
Destroys the callback record.
static GAIA_NODISCARD ParallelCallbackHandle handle(const ParallelCallbackRecord &record)
Returns the public handle associated with record.
Definition jobmanager.h:227