3#include "gaia/config/config.h"
4#include "gaia/config/profiler.h"
9#include "gaia/cnt/ilist.h"
10#include "gaia/core/span.h"
11#include "gaia/core/utility.h"
12#include "gaia/mem/mem_alloc.h"
13#include "gaia/mt/jobcommon.h"
14#include "gaia/mt/jobhandle.h"
16#define GAIA_LOG_JOB_STATES 0
21 enum JobState : uint32_t {
24 DEP_BITS_MASK = (uint32_t)((1u << DEP_BITS) - 1),
26 STATE_BITS_START = DEP_BITS_START + DEP_BITS,
28 STATE_BITS_MASK = (uint32_t)(((1u << STATE_BITS) - 1) << STATE_BITS_START),
33 Submitted = 0x01 << STATE_BITS_START,
35 Processing = 0x02 << STATE_BITS_START,
37 Executing = 0x03 << STATE_BITS_START,
39 Done = 0x04 << STATE_BITS_START,
41 Released = 0x05 << STATE_BITS_START
47 inline void signal_edge(JobContainer& jobData);
96 state = other.state.load();
99 func = GAIA_MOV(other.func);
107 GAIA_ASSERT(core::addressof(other) !=
this);
108 cnt::ilist_item::operator=(GAIA_MOV(other));
109 state = other.state.load();
112 func = GAIA_MOV(other.func);
132 jc.data.gen = generation;
133 jc.prio = ctx->priority;
148 static constexpr uint32_t IdMask = uint32_t(-1);
150 uint32_t m_id = IdMask;
161 GAIA_NODISCARD uint32_t
id()
const {
166 GAIA_NODISCARD uint32_t
gen()
const {
172 return m_id == other.m_id && m_gen == other.m_gen;
185 std::atomic_uint32_t refs = 0;
197 refs.store(other.refs.load(std::memory_order_relaxed), std::memory_order_relaxed);
201 GAIA_ASSERT(core::addressof(other) !=
this);
202 cnt::ilist_item::operator=(GAIA_MOV(other));
203 callback = GAIA_MOV(other.callback);
204 refs.store(other.refs.load(std::memory_order_relaxed), std::memory_order_relaxed);
218 record.data.gen = generation;
219 record.callback = GAIA_MOV(ctx->callback);
220 record.refs.store(ctx->refs, std::memory_order_relaxed);
235 static constexpr uint32_t JobDataPageCount = JobDataLayout::page_count_for_capacity(JobHandle::IdMask);
247 return m_jobData.live_unsafe(jobHandle.id());
252 return m_jobData.live_unsafe(jobHandle.id());
261 auto handle = m_jobData.alloc(&ctx);
262 auto& j = m_jobData[handle.id()];
265 GAIA_ASSERT(j.state == 0 || j.state == JobState::Released);
268 j.prio = ctx.priority;
270 j.func = GAIA_MOV(job.func);
281 ctx.callback = GAIA_MOV(callback);
283 return m_parallelCallbacks.alloc(&ctx);
291 auto& jobData = m_jobData.live_unsafe(jobHandle.id());
292 GAIA_ASSERT(
done(jobData));
293 jobData.state.store(JobState::Released);
294 m_jobData.free_keep_live(jobHandle);
300 auto& record = m_parallelCallbacks[handle.
id()];
301 record.callback.reset();
302 record.refs.store(0, std::memory_order_relaxed);
303 m_parallelCallbacks.free(handle);
309 m_parallelCallbacks.clear();
315 if (jobData.
func.operator
bool())
326 const auto state = jobData.
state.fetch_sub(1) - 1;
329 const auto s = state & JobState::STATE_BITS_MASK;
330 if (s != JobState::Submitted)
334 const auto deps = state & JobState::DEP_BITS_MASK;
345 mem::AllocHelper::free(jobData.
edges.pDeps);
365 GAIA_ASSERT(!jobsFirst.empty());
369 auto& secondData =
data(jobSecond);
371#if GAIA_ASSERT_ENABLED
373 for (
auto jobFirst: jobsFirst) {
374 const auto& firstData =
data(jobFirst);
375 GAIA_ASSERT(!
busy(firstData));
379 for (
auto jobFirst: jobsFirst)
380 dep_internal(jobFirst, jobSecond);
384 const uint32_t cnt = (uint32_t)jobsFirst.size();
385 [[maybe_unused]]
const uint32_t statePrev = secondData.state.fetch_add(cnt);
386 GAIA_ASSERT((statePrev & JobState::DEP_BITS_MASK) < DEP_BITS_MASK - 1);
396 GAIA_ASSERT(!jobsFirst.empty());
400 auto& secondData =
data(jobSecond);
402#if GAIA_ASSERT_ENABLED
404 for (
auto jobFirst: jobsFirst) {
405 const auto& firstData =
data(jobFirst);
406 GAIA_ASSERT(!
busy(firstData));
409 for (
auto jobFirst: jobsFirst)
410 dep_refresh_internal(jobFirst, jobSecond);
415 const uint32_t cnt = (uint32_t)jobsFirst.size();
416 [[maybe_unused]]
const uint32_t statePrev = secondData.state.fetch_add(cnt);
417 GAIA_ASSERT((statePrev & JobState::DEP_BITS_MASK) < DEP_BITS_MASK - 1);
424 [[maybe_unused]]
const auto state = jobData.
state.load() & JobState::STATE_BITS_MASK;
425 GAIA_ASSERT(state < JobState::Submitted);
426 const auto val = jobData.
state.fetch_add(JobState::Submitted) + (uint32_t)JobState::Submitted;
427#if GAIA_LOG_JOB_STATES
428 GAIA_LOG_N(
"JobHandle %u.%u - SUBMITTED", jobData.
idx, jobData.gen);
437 jobData.
state.store(JobState::Processing);
438#if GAIA_LOG_JOB_STATES
439 GAIA_LOG_N(
"JobHandle %u.%u - PROCESSING", jobData.
idx, jobData.gen);
448 jobData.
state.store(JobState::Executing | workerIdx);
449#if GAIA_LOG_JOB_STATES
450 GAIA_LOG_N(
"JobHandle %u.%u - EXECUTING", jobData.
idx, jobData.gen);
457 jobData.
state.store(JobState::Done);
458#if GAIA_LOG_JOB_STATES
459 GAIA_LOG_N(
"JobHandle %u.%u - DONE", jobData.
idx, jobData.gen);
466 [[maybe_unused]]
const auto state = jobData.
state.load() & JobState::STATE_BITS_MASK;
468 GAIA_ASSERT(state == 0 || state == JobState::Done);
469 jobData.
state.store(0);
470#if GAIA_LOG_JOB_STATES
471 GAIA_LOG_N(
"JobHandle %u.%u - RESET_STATE", jobData.
idx, jobData.gen);
479 const auto& jobData =
data(jobHandle);
480 const auto state = jobData.state.load();
488 const auto state = jobData.
state.load();
496 const auto state = jobData.
state.load() & JobState::STATE_BITS_MASK;
497 return state == JobState::Submitted;
504 const auto state = jobData.
state.load() & JobState::STATE_BITS_MASK;
505 return state == JobState::Processing;
512 const auto state = jobData.
state.load() & JobState::STATE_BITS_MASK;
513 return state == JobState::Executing || state == JobState::Processing;
520 const auto state = jobData.
state.load() & JobState::STATE_BITS_MASK;
521 return state == JobState::Done;
528 auto& record = m_parallelCallbacks[handle.
id()];
529 GAIA_ASSERT(record.data.gen == handle.
gen());
530 record.callback(args);
537 auto& record = m_parallelCallbacks[handle.
id()];
538 GAIA_ASSERT(record.data.gen == handle.
gen());
539 return record.refs.fetch_sub(1, std::memory_order_acq_rel) == 1;
545 GAIA_ASSERT(jobSecond != (JobHandle)JobNull_t{});
547 auto& firstData =
data(jobFirst);
548 const auto depCnt0 = firstData.edges.depCnt;
549 const auto depCnt1 = ++firstData.edges.depCnt;
551#if GAIA_LOG_JOB_STATES
553 "DEP %u.%u, %u -> %u.%u", jobFirst.id(), jobFirst.gen(), firstData.edges.depCnt, jobSecond.id(),
558 firstData.edges.dep = jobSecond;
559 }
else if (depCnt1 == 2) {
560 auto prev = firstData.edges.dep;
562 firstData.edges.pDeps = mem::AllocHelper::alloc<JobHandle>(depCnt1);
563 firstData.edges.pDeps[0] = prev;
564 firstData.edges.pDeps[1] = jobSecond;
567 const bool isPow2 = core::is_pow2(depCnt0);
569 const auto nextPow2 = depCnt0 << 1;
570 auto* pPrev = firstData.edges.pDeps;
572 firstData.edges.pDeps = mem::AllocHelper::alloc<JobHandle>(nextPow2);
573 if (pPrev !=
nullptr) {
574 GAIA_FOR(depCnt0) firstData.edges.pDeps[i] = pPrev[i];
575 mem::AllocHelper::free(pPrev);
580 firstData.edges.pDeps[depCnt0] = jobSecond;
584#if GAIA_ASSERT_ENABLED
585 void dep_refresh_internal(JobHandle jobFirst, JobHandle jobSecond)
const {
586 GAIA_ASSERT(jobFirst != (JobHandle)JobNull_t{});
587 GAIA_ASSERT(jobSecond != (JobHandle)JobNull_t{});
589 const auto& firstData =
data(jobFirst);
590 const auto depCnt = firstData.edges.depCnt;
593 GAIA_ASSERT(firstData.edges.dep == jobSecond);
595 GAIA_ASSERT(firstData.edges.pDeps !=
nullptr);
597 GAIA_FOR(firstData.edges.depCnt) {
598 if (firstData.edges.pDeps[i] == jobSecond) {
610 void signal_edge(JobContainer& jobData) {
Array with variable size of elements of type.
Definition darray_impl.h:25
Definition span_impl.h:99
Move-only callback wrapper specialized for parallel job ranges.
Definition jobcommon.h:51
Storage and lifecycle manager for internal job and parallel callback records.
Definition jobmanager.h:233
static GAIA_NODISCARD bool busy(const JobContainer &jobData)
Checks whether jobData currently is executing or queued for execution.
Definition jobmanager.h:511
void reset()
Resets the job pool.
Definition jobmanager.h:307
static void free_edges(JobContainer &jobData)
Releases heap storage used for the dependency list of jobData.
Definition jobmanager.h:340
static void reset_state(JobContainer &jobData)
Resets a completed or never-submitted job back to the clear state.
Definition jobmanager.h:465
static GAIA_NODISCARD bool done(const JobContainer &jobData)
Checks whether jobData has finished executing.
Definition jobmanager.h:519
static GAIA_NODISCARD bool submitted(const JobContainer &jobData)
Checks whether jobData has been submitted but not yet queued for execution.
Definition jobmanager.h:495
static void processing(JobContainer &jobData)
Marks a job as queued for worker processing.
Definition jobmanager.h:435
GAIA_NODISCARD bool is_clear(JobHandle jobHandle) const
Checks whether the job referenced by jobHandle is in the clear state.
Definition jobmanager.h:478
static void finalize(JobContainer &jobData)
Marks a job as finished.
Definition jobmanager.h:456
void dep(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition jobmanager.h:364
static void run(JobContainer &jobData)
Execute the functor associated with the job container.
Definition jobmanager.h:314
GAIA_NODISCARD ParallelCallbackHandle alloc_parallel_callback(JobArgsFunc callback, uint32_t refs)
Allocates a shared callback record used by parallel jobs.
Definition jobmanager.h:279
static void executing(JobContainer &jobData, uint32_t workerIdx)
Marks a job as executing on the worker identified by workerIdx.
Definition jobmanager.h:446
void free_parallel_callback(ParallelCallbackHandle handle)
Releases a shared callback record.
Definition jobmanager.h:299
void free_job(JobHandle jobHandle)
Invalidates jobHandle by resetting its index in the job pool. Every time a job is deallocated its gen...
Definition jobmanager.h:290
static GAIA_NODISCARD bool is_clear(JobContainer &jobData)
Checks whether jobData is in the clear state.
Definition jobmanager.h:487
JobContainer & data(JobHandle jobHandle)
Returns mutable internal storage for jobHandle.
Definition jobmanager.h:246
void invoke_parallel_callback(ParallelCallbackHandle handle, const JobArgs &args)
Invokes the shared callback referenced by handle.
Definition jobmanager.h:527
static GAIA_NODISCARD bool processing(const JobContainer &jobData)
Checks whether jobData is queued for worker processing.
Definition jobmanager.h:503
void dep(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition jobmanager.h:355
const JobContainer & data(JobHandle jobHandle) const
Returns immutable internal storage for jobHandle.
Definition jobmanager.h:251
GAIA_NODISCARD bool release_parallel_callback_ref(ParallelCallbackHandle handle)
Releases one reference to the callback referenced by handle.
Definition jobmanager.h:536
void dep_refresh(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition jobmanager.h:395
GAIA_NODISCARD JobHandle alloc_job(Job job)
Allocates a new job container identified by a unique JobHandle.
Definition jobmanager.h:258
static uint32_t submit(JobContainer &jobData)
Marks a job as submitted.
Definition jobmanager.h:423
static bool signal_edge(JobContainer &jobData)
Signals that one dependency edge of jobData has completed.
Definition jobmanager.h:324
Move-only function wrapper with inline storage and optional SmallBlockAllocator spill storage....
Definition small_func.h:19
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9
uint32_t gen
Generation ID.
Definition ilist.h:125
uint32_t idx
Allocated items: Index in the list. Deleted items: Index of the next deleted item in the list.
Definition ilist.h:130
ItemData data
Item data.
Definition ilist.h:132
Definition jobcommon.h:35
Definition jobcommon.h:45
Internal storage for a scheduled job and its dependency metadata.
Definition jobmanager.h:68
static GAIA_NODISCARD JobContainer create(uint32_t index, uint32_t generation, void *pCtx)
Creates a new job container for the intrusive storage.
Definition jobmanager.h:127
static GAIA_NODISCARD JobHandle handle(const JobContainer &jc)
Returns the public handle associated with jc.
Definition jobmanager.h:141
util::SmallFunc func
Function to execute when running the job.
Definition jobmanager.h:87
JobPriority prio
Job priority.
Definition jobmanager.h:81
std::atomic_uint32_t state
Current state of the job Consist of upper and bottom part. Least significant bits = special purpose....
Definition jobmanager.h:79
JobEdges edges
Dependency graph.
Definition jobmanager.h:85
JobCreationFlags flags
Job flags.
Definition jobmanager.h:83
Outgoing dependency edges for a job. Stores either a single dependent job handle or a heap-allocated ...
Definition jobmanager.h:52
uint32_t depCnt
Number of dependencies.
Definition jobmanager.h:60
Definition jobhandle.h:13
Definition jobhandle.h:85
Definition jobcommon.h:39
Allocation context used when creating shared parallel callbacks.
Definition jobmanager.h:177
Handle identifying a shared callback used by parallel jobs.
Definition jobmanager.h:147
ParallelCallbackHandle()=default
Creates an invalid callback handle.
ParallelCallbackHandle(uint32_t id, uint32_t gen)
Creates a callback handle from the given identifier and generation.
Definition jobmanager.h:158
GAIA_NODISCARD bool operator==(const ParallelCallbackHandle &other) const
Checks whether two callback handles reference the same record.
Definition jobmanager.h:171
GAIA_NODISCARD uint32_t id() const
Returns the slot identifier.
Definition jobmanager.h:161
GAIA_NODISCARD uint32_t gen() const
Returns the slot generation.
Definition jobmanager.h:166
Internal storage record for a shared parallel callback.
Definition jobmanager.h:183
static GAIA_NODISCARD ParallelCallbackRecord create(uint32_t index, uint32_t generation, void *pCtx)
Creates a new callback record for the intrusive storage.
Definition jobmanager.h:213
ParallelCallbackRecord()=default
Creates an empty callback record.
~ParallelCallbackRecord()=default
Destroys the callback record.
static GAIA_NODISCARD ParallelCallbackHandle handle(const ParallelCallbackRecord &record)
Returns the public handle associated with record.
Definition jobmanager.h:227