Gaia-ECS v0.9.3
A simple and powerful entity component system
Loading...
Searching...
No Matches
threadpool.h
1#pragma once
2
3#include "gaia/config/config.h"
4#include "gaia/config/profiler.h"
5
6#if GAIA_PLATFORM_WINDOWS
7 #include <cstdio>
8 #include <windows.h>
9#endif
10
11#define GAIA_THREAD_OFF 0
12#define GAIA_THREAD_STD 1
13#define GAIA_THREAD_PTHREAD 2
14
15#if GAIA_PLATFORM_WINDOWS || GAIA_PLATFORM_WASM
16 #include <thread>
17 // Emscripten supports std::thread if compiled with -sUSE_PTHREADS=1.
18 // Otherwise, std::thread calls are no-ops that compile but do not run concurrently.
19 #define GAIA_THREAD std::thread
20 #define GAIA_THREAD_PLATFORM GAIA_THREAD_STD
21#elif GAIA_PLATFORM_APPLE
22 #include <pthread.h>
23 #include <pthread/sched.h>
24 #include <sys/qos.h>
25 #include <sys/sysctl.h>
26 #define GAIA_THREAD pthread_t
27 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
28#elif GAIA_PLATFORM_LINUX
29 #include <dirent.h>
30 #include <fcntl.h>
31 #include <pthread.h>
32 #include <unistd.h>
33 #define GAIA_THREAD pthread_t
34 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
35#elif GAIA_PLATFORM_FREEBSD
36 #include <pthread.h>
37 #include <sys/sysctl.h>
38 #define GAIA_THREAD pthread_t
39 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
40#endif
41
42#if GAIA_PLATFORM_WINDOWS
43 #include <malloc.h>
44#else
45 #include <alloca.h>
46#endif
47#include <atomic>
48#include <thread>
49
50#include "gaia/cnt/sarray_ext.h"
51#include "gaia/core/span.h"
52#include "gaia/core/utility.h"
53#include "gaia/util/logging.h"
54
55#include "gaia/mt/event.h"
56#include "gaia/mt/futex.h"
57#include "gaia/mt/jobcommon.h"
58#include "gaia/mt/jobhandle.h"
59#include "gaia/mt/jobmanager.h"
60#include "gaia/mt/jobqueue.h"
61#include "gaia/mt/semaphore_fast.h"
62#include "gaia/mt/spinlock.h"
63
64namespace gaia {
65 namespace mt {
66#if GAIA_PLATFORM_WINDOWS
67 extern "C" typedef HRESULT(WINAPI* TOSApiFunc_SetThreadDescription)(HANDLE, PCWSTR);
68
69 #pragma pack(push, 8)
70 typedef struct tagTHREADNAME_INFO {
71 DWORD dwType; // Must be 0x1000.
72 LPCSTR szName; // Pointer to name (in user addr space).
73 DWORD dwThreadID; // Thread ID (-1=caller thread).
74 DWORD dwFlags; // Reserved for future use, must be zero.
75 } THREADNAME_INFO;
76 #pragma pack(pop)
77#endif
78
79 namespace detail {
81 inline thread_local ThreadCtx* tl_workerCtx;
82 } // namespace detail
83
84 GAIA_MSVC_WARNING_PUSH()
85 GAIA_MSVC_WARNING_DISABLE(4324)
86
87 class GAIA_API ThreadPool final {
88 friend class JobManager;
89
94 static constexpr uint32_t MaxWorkers = JobState::DEP_BITS;
95
97 std::thread::id m_mainThreadId;
98
100 std::atomic_bool m_stop{};
104 GAIA_ALIGNAS(128) cnt::sarray_ext<ThreadCtx, MaxWorkers> m_workersCtx;
106 MpmcQueue<JobHandle, 1024> m_jobQueue[JobPriorityCnt];
108 uint32_t m_workersCnt[JobPriorityCnt]{};
110 SemaphoreFast m_sem;
111
113 std::atomic_uint32_t m_blockedInWorkUntil;
114
116 JobManager m_jobManager;
118 // NOTE: Allocs are done only from the main thread while there are no jobs running.
119 // Freeing can happen at any point from any thread. Therefore, we need to lock this point.
120 // Access do job data is not thread-safe. No jobs should be added while there is any job running.
121 GAIA_PROF_MUTEX(SpinLock, m_jobAllocMtx);
122
123 private:
124 ThreadPool() {
125 m_stop.store(false);
126
127 make_main_thread();
128
129 const auto hwThreads = hw_thread_cnt();
130 const auto hwEffThreads = hw_efficiency_cores_cnt();
131 uint32_t hiPrioWorkers = hwThreads;
132 if (hwEffThreads < hwThreads)
133 hiPrioWorkers -= hwEffThreads;
134
135 set_max_workers(hwThreads, hiPrioWorkers);
136 }
137
138 ThreadPool(ThreadPool&&) = delete;
139 ThreadPool(const ThreadPool&) = delete;
140 ThreadPool& operator=(ThreadPool&&) = delete;
141 ThreadPool& operator=(const ThreadPool&) = delete;
142
143 public:
144 static ThreadPool& get() {
145 static ThreadPool threadPool;
146 return threadPool;
147 }
148
149 ~ThreadPool() {
150 reset();
151 }
152
155 m_mainThreadId = std::this_thread::get_id();
156 }
157
159 GAIA_NODISCARD uint32_t workers() const {
160 return m_workers.size();
161 }
162
168 void set_max_workers(uint32_t count, uint32_t countHighPrio) {
169 const auto workersCnt = core::get_max(core::get_min(MaxWorkers, count), 1U);
170 if (countHighPrio > count)
171 countHighPrio = count;
172
173 // Stop all threads first
174 reset();
175
176 // Reset previous worker contexts
177 for (auto& ctx: m_workersCtx)
178 ctx.reset();
179
180 // Resize or array
181 m_workersCtx.resize(workersCnt);
182 // We also have the main thread so there's always one less worker spawned
183 m_workers.resize(workersCnt - 1);
184
185 // First worker is considered the main thread.
186 // It is also assigned high priority but it doesn't really matter.
187 // The main thread can steal any jobs, both low and high priority.
188 detail::tl_workerCtx = m_workersCtx.data();
189 m_workersCtx[0].tp = this;
190 m_workersCtx[0].workerIdx = 0;
191 m_workersCtx[0].prio = JobPriority::High;
192
193 // Reset the workers
194 for (auto& worker: m_workers)
195 worker = {};
196
197 // Create a new set of high and low priority threads (if any)
198 uint32_t workerIdx = 1;
199 set_workers_high_prio_inter(workerIdx, countHighPrio);
200 }
201
206 void set_workers_high_prio_inter(uint32_t& workerIdx, uint32_t count) {
207 count = gaia::core::get_min(count, m_workers.size());
208 if (count == 0) {
209 m_workersCnt[0] = 1; // main thread
210 m_workersCnt[1] = 0;
211 } else {
212 m_workersCnt[0] = count + 1; // Main thread is always a priority worker
213 m_workersCnt[1] = m_workers.size() - count;
214 }
215
216 // Create a new set of high and low priority threads (if any)
217 create_worker_threads(workerIdx, JobPriority::High, m_workersCnt[0] - 1);
218 create_worker_threads(workerIdx, JobPriority::Low, m_workersCnt[1]);
219 }
220
225 void set_workers_low_prio_inter(uint32_t& workerIdx, uint32_t count) {
226 const uint32_t realCnt = gaia::core::get_max(count, m_workers.size());
227 if (realCnt == 0) {
228 m_workersCnt[0] = 0;
229 m_workersCnt[1] = 1; // main thread
230 } else {
231 m_workersCnt[0] = m_workers.size() - realCnt;
232 m_workersCnt[1] = realCnt + 1; // Main thread is always a priority worker;
233 }
234
235 // Create a new set of high and low priority threads (if any)
236 create_worker_threads(workerIdx, JobPriority::High, m_workersCnt[0]);
237 create_worker_threads(workerIdx, JobPriority::Low, m_workersCnt[1]);
238 }
239
242 void set_workers_high_prio(uint32_t count) {
243 // Stop all threads first
244 reset();
245
246 uint32_t workerIdx = 1;
247 set_workers_high_prio_inter(workerIdx, count);
248 }
249
252 void set_workers_low_prio(uint32_t count) {
253 // Stop all threads first
254 reset();
255
256 uint32_t workerIdx = 1;
257 set_workers_low_prio_inter(workerIdx, count);
258 }
259
265 void dep(JobHandle jobFirst, JobHandle jobSecond) {
266 GAIA_ASSERT(main_thread());
267
268 m_jobManager.dep(std::span(&jobFirst, 1), jobSecond);
269 }
270
276 void dep(std::span<JobHandle> jobsFirst, JobHandle jobSecond) {
277 GAIA_ASSERT(main_thread());
278
279 m_jobManager.dep(jobsFirst, jobSecond);
280 }
281
289 void dep_refresh(JobHandle jobFirst, JobHandle jobSecond) {
290 GAIA_ASSERT(main_thread());
291
292 m_jobManager.dep_refresh(std::span(&jobFirst, 1), jobSecond);
293 }
294
302 void dep_refresh(std::span<JobHandle> jobsFirst, JobHandle jobSecond) {
303 GAIA_ASSERT(main_thread());
304
305 m_jobManager.dep_refresh(jobsFirst, jobSecond);
306 }
307
312 template <typename TJob>
313 JobHandle add(TJob&& job) {
314 GAIA_ASSERT(main_thread());
315
316 job.priority = final_prio(job);
317
318 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
319 core::lock_scope lock(mtx);
320 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
321
322 return m_jobManager.alloc_job(GAIA_FWD(job));
323 }
324
325 private:
326 void add_n(JobPriority prio, std::span<JobHandle> jobHandles) {
327 GAIA_ASSERT(main_thread());
328 GAIA_ASSERT(!jobHandles.empty());
329
330 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
331 core::lock_scope lock(mtx);
332 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
333
334 for (auto& jobHandle: jobHandles)
335 jobHandle = m_jobManager.alloc_job({{}, prio, JobCreationFlags::Default});
336 }
337
338 public:
342 void del([[maybe_unused]] JobHandle jobHandle) {
343 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
344
345#if GAIA_ASSERT_ENABLED
346 {
347 const auto& jobData = m_jobManager.data(jobHandle);
348 GAIA_ASSERT(jobData.state == 0 || m_jobManager.done(jobData));
349 }
350#endif
351
352 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
353 core::lock_scope lock(mtx);
354 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
355
356 m_jobManager.free_job(jobHandle);
357 }
358
364 void submit(std::span<JobHandle> jobHandles) {
365 if (jobHandles.empty())
366 return;
367
368 GAIA_PROF_SCOPE(tp::submit);
369
370 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * jobHandles.size());
371
372 uint32_t cnt = 0;
373 for (auto handle: jobHandles) {
374 GAIA_ASSERT(handle != (JobHandle)JobNull_t{});
375
376 auto& jobData = m_jobManager.data(handle);
377 const auto state = m_jobManager.submit(jobData) & JobState::DEP_BITS_MASK;
378 // Jobs that were already submitted won't be submitted again.
379 // We can only accept the job if it has no pending dependencies.
380 if (state != 0)
381 continue;
382
383 pHandles[cnt++] = handle;
384 }
385
386 auto* ctx = detail::tl_workerCtx;
387 process(std::span(pHandles, cnt), ctx);
388 }
389
395 void submit(JobHandle jobHandle) {
396 submit(std::span(&jobHandle, 1));
397 }
398
399 void reset_state(std::span<JobHandle> jobHandles) {
400 if (jobHandles.empty())
401 return;
402
403 GAIA_PROF_SCOPE(tp::reset);
404
405 for (auto handle: jobHandles) {
406 auto& jobData = m_jobManager.data(handle);
407 m_jobManager.reset_state(jobData);
408 }
409 }
410
411 void reset_state(JobHandle jobHandle) {
412 reset_state(std::span(&jobHandle, 1));
413 }
414
421 JobHandle jobHandle = add(job);
422 submit(jobHandle);
423 return jobHandle;
424 }
425
432 JobHandle sched(Job& job, JobHandle dependsOn) {
433 JobHandle jobHandle = add(job);
434 dep(jobHandle, dependsOn);
435 submit(jobHandle);
436 return jobHandle;
437 }
438
446 JobHandle sched_par(JobParallel& job, uint32_t itemsToProcess, uint32_t groupSize) {
447 GAIA_ASSERT(main_thread());
448
449 // Empty data set are considered wrong inputs
450 GAIA_ASSERT(itemsToProcess != 0);
451 if (itemsToProcess == 0)
452 return JobNull;
453
454 // Don't add new jobs once stop was requested
455 if GAIA_UNLIKELY (m_stop)
456 return JobNull;
457
458 // Make sure the right priority is selected
459 const auto prio = job.priority = final_prio(job);
460
461 // No group size was given, make a guess based on the set size
462 if (groupSize == 0) {
463 const auto cntWorkers = m_workersCnt[(uint32_t)prio];
464 groupSize = (itemsToProcess + cntWorkers - 1) / cntWorkers;
465
466 // If there are too many items we split them into multiple jobs.
467 // This way, if we wait for the result and some workers finish
468 // with our task faster, the finished worker can pick up a new
469 // job faster.
470 // On the other hand, too little items probably don't deserve
471 // multiple jobs.
472 constexpr uint32_t maxUnitsOfWorkPerGroup = 8;
473 groupSize = groupSize / maxUnitsOfWorkPerGroup;
474 if (groupSize <= 0)
475 groupSize = 1;
476 }
477
478 const auto jobs = (itemsToProcess + groupSize - 1) / groupSize;
479
480 // Only one job is created, use the job directly.
481 // Generally, this is the case we would want to avoid because it means this particular case
482 // is not worth of being scheduled via sched_par. However, we can never know for sure what
483 // the reason for that is so let's stay silent.
484 if (jobs == 1) {
485 const uint32_t groupJobIdxEnd = groupSize < itemsToProcess ? groupSize : itemsToProcess;
486 auto groupJobFunc = [job, groupJobIdxEnd]() {
487 JobArgs args;
488 args.idxStart = 0;
489 args.idxEnd = groupJobIdxEnd;
490 job.func(args);
491 };
492
493 auto handle = add(Job{groupJobFunc, prio, JobCreationFlags::Default});
494 submit(handle);
495 return handle;
496 }
497
498 // Multiple jobs need to be parallelized.
499 // Create a sync job and assign it as their dependency.
500 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * (jobs + 1));
501 std::span<JobHandle> handles(pHandles, jobs + 1);
502
503 add_n(prio, handles);
504
505#if GAIA_ASSERT_ENABLED
506 for (auto jobHandle: handles)
507 GAIA_ASSERT(m_jobManager.is_clear(jobHandle));
508#endif
509
510 // Work jobs
511 for (uint32_t jobIndex = 0; jobIndex < jobs; ++jobIndex) {
512 const uint32_t groupJobIdxStart = jobIndex * groupSize;
513 const uint32_t groupJobIdxStartPlusGroupSize = groupJobIdxStart + groupSize;
514 const uint32_t groupJobIdxEnd =
515 groupJobIdxStartPlusGroupSize < itemsToProcess ? groupJobIdxStartPlusGroupSize : itemsToProcess;
516
517 auto groupJobFunc = [job, groupJobIdxStart, groupJobIdxEnd]() {
518 JobArgs args;
519 args.idxStart = groupJobIdxStart;
520 args.idxEnd = groupJobIdxEnd;
521 job.func(args);
522 };
523
524 auto& jobData = m_jobManager.data(pHandles[jobIndex]);
525 jobData.func = groupJobFunc;
526 jobData.prio = prio;
527 }
528 // Sync job
529 {
530 auto& jobData = m_jobManager.data(pHandles[jobs]);
531 jobData.prio = prio;
532 }
533
534 // Assign the sync jobs as a dependency for work jobs
535 dep(handles.subspan(0, jobs), pHandles[jobs]);
536
537 // Sumbit the jobs to the threadpool.
538 // This is a point of no return. After this point no more changes to jobs are possible.
539 submit(handles);
540 return pHandles[jobs];
541 }
542
546 void wait(JobHandle jobHandle) {
547 GAIA_PROF_SCOPE(tp::wait);
548
549 GAIA_ASSERT(main_thread());
550
551 auto* ctx = detail::tl_workerCtx;
552 auto& jobData = m_jobManager.data(jobHandle);
553 auto state = jobData.state.load();
554
555 // Waiting for a job that has not been initialized is nonsense.
556 GAIA_ASSERT(state != 0);
557
558 // Wait until done
559 for (; (state & JobState::STATE_BITS_MASK) < JobState::Done; state = jobData.state.load()) {
560 // The job we are waiting for is not finished yet, try running some other job in the meantime
561 JobHandle otherJobHandle;
562 if (try_fetch_job(*ctx, otherJobHandle)) {
563 if (run(otherJobHandle, ctx))
564 continue;
565 }
566
567 // The job we are waiting for is already running.
568 // Wait until it signals it's finished.
569 if ((state & JobState::STATE_BITS_MASK) == JobState::Executing) {
570 const auto workerId = (state & JobState::DEP_BITS_MASK);
571 auto* jobDoneEvent = &m_workersCtx[workerId].event;
572 jobDoneEvent->wait();
573 continue;
574 }
575
576 // The worst case scenario.
577 // We have nothing to do and the job we are waiting for is not executing still.
578 // Let's wait for any job to start executing.
579 const auto workerBit = 1U << ctx->workerIdx;
580 const auto oldBlockedMask = m_blockedInWorkUntil.fetch_or(workerBit);
581 const auto newState = jobData.state.load();
582 if (newState == state) // still not JobState::Done?
583 Futex::wait(&m_blockedInWorkUntil, oldBlockedMask | workerBit, detail::WaitMaskAny);
584 m_blockedInWorkUntil.fetch_and(~workerBit);
585 }
586 }
587
589 void update() {
590 GAIA_ASSERT(main_thread());
591 main_thread_tick();
592 }
593
596 GAIA_NODISCARD static uint32_t hw_thread_cnt() {
597 auto hwThreads = (uint32_t)std::thread::hardware_concurrency();
598 return core::get_max(1U, hwThreads);
599 }
600
603 GAIA_NODISCARD static uint32_t hw_efficiency_cores_cnt() {
604 uint32_t efficiencyCores = 0;
605#if GAIA_PLATFORM_APPLE
606 size_t size = sizeof(efficiencyCores);
607 if (sysctlbyname("hw.perflevel1.logicalcpu", &efficiencyCores, &size, nullptr, 0) != 0)
608 return 0;
609#elif GAIA_PLATFORM_FREEBSD
610 int cpuIndex = 0;
611 char oidName[32];
612 int coreType;
613 size_t size = sizeof(coreType);
614 while (true) {
615 snprintf(oidName, sizeof(oidName), "dev.cpu.%d.coretype", cpuIndex);
616 if (sysctlbyname(oidName, &coreType, &size, nullptr, 0) != 0)
617 break; // Stop on the last CPU index
618
619 // 0 = performance core
620 // 1 = efficiency core
621 if (coreType == 1)
622 ++efficiencyCores;
623
624 ++cpuIndex;
625 }
626#elif GAIA_PLATFORM_WINDOWS
627 DWORD length = 0;
628
629 // First, determine required buffer size
630 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, nullptr, &length))
631 return 0;
632
633 // Allocate enough memory
634 auto* pBuffer = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)malloc(length);
635 if (pBuffer == nullptr)
636 return 0;
637
638 // Retrieve the data
639 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, pBuffer, &length)) {
640 free(pBuffer);
641 return 0;
642 }
643
644 uint32_t heterogenousCnt = 0;
645
646 // Iterate over processor core entries.
647 // On Windows we can't directly tell whether a core is an efficiency core or a performance core.
648 // Instead:
649 // - lower EfficiencyClass values correspond to more efficient cores
650 // - higher EfficiencyClass values correspond to higher performance cores
651 // - EfficiencyClass is zero for homogeneous CPU architectures
652 // Therefore, to count efficiency cores on Windows, we will count cores where EfficiencyClass == 0.
653 // On heterogeneous this should gives us the correct results.
654 // On homogenous architectures, the value is always 0 so rather than calculating the number of efficiency
655 // cores, we would calculate the number of performance cores. For the sake of correctness, if all cores return
656 // 0, we use 0 for the number of efficiency cores.
657 for (char* ptr = (char*)pBuffer; ptr < (char*)pBuffer + length;
658 ptr += ((SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr)->Size) {
659 auto* entry = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr;
660 if (entry->Relationship == RelationProcessorCore) {
661 if (entry->Processor.EfficiencyClass == 0)
662 ++efficiencyCores;
663 else
664 ++heterogenousCnt;
665 }
666 }
667
668 if (heterogenousCnt == 0)
669 efficiencyCores = 0;
670
671 free(pBuffer);
672#elif GAIA_PLATFORM_LINUX
673 {
674 // Intel has /sys/devices/cpu_core/cpus, /sys/devices/cpu_atom/cpus on some systems
675 DIR* dir = opendir("/sys/devices/cpu_atom/cpus/");
676 if (dir == nullptr)
677 return 0;
678
679 dirent* entry;
680 while ((entry = readdir(dir)) != nullptr) {
681 if (strncmp(entry->d_name, "cpu", 3) == 0 && entry->d_name[3] >= '0' && entry->d_name[3] <= '9')
682 ++efficiencyCores;
683 }
684
685 closedir(dir);
686 }
687
688 if (efficiencyCores == 0) {
689 // TODO: Go through all CPUs packages and CPUs and determine the differences between them.
690 // There are many metrics.
691 // 1) We will assume all CPUs to be of the same architecture.
692 // 2) Same CPU architecture but different cache sizes. Smaller ones are "efficiency" cores.
693 // This is the AMD way. Still, these are about the same things so maybe we would just treat
694 // all such cores as performance cores.
695 // 3) Different max frequencies on different cores. This might be indicative enough.
696 // There is also an optional parameter present on ARM CPUs:
697 // https://www.kernel.org/doc/Documentation/devicetree/bindings/arm/cpu-capacity.txt
698 // In this case, we'd treat CPUs with the highest capacity-dmips-mhz as performance cores,
699 // and consider the rest as efficiency cores.
700
701 // ...
702 }
703#endif
704 return efficiencyCores;
705 }
706
707 private:
708 static void* thread_func(void* pCtx) {
709 auto& ctx = *(ThreadCtx*)pCtx;
710
711 detail::tl_workerCtx = &ctx;
712
713 // Set the worker thread name.
714 // Needs to be called from inside the thread because some platforms
715 // can change the name only when run from the specific thread.
716 ctx.tp->set_thread_name(ctx.workerIdx, ctx.prio);
717
718 // Set the worker thread priority
719 ctx.tp->set_thread_priority(ctx.workerIdx, ctx.prio);
720
721 // Process jobs
722 ctx.tp->worker_loop(ctx);
723
724 detail::tl_workerCtx = nullptr;
725
726 return nullptr;
727 }
728
732 void create_thread(uint32_t workerIdx, JobPriority prio) {
733 // Idx 0 is reserved for the main thread
734 GAIA_ASSERT(workerIdx > 0);
735
736 auto& ctx = m_workersCtx[workerIdx];
737 ctx.tp = this;
738 ctx.workerIdx = workerIdx;
739 ctx.prio = prio;
740
741#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
742 m_workers[workerIdx - 1] = std::thread([&ctx]() {
743 thread_func((void*)&ctx);
744 });
745#else
746 pthread_attr_t attr{};
747 int ret = pthread_attr_init(&attr);
748 if (ret != 0) {
749 GAIA_LOG_W("pthread_attr_init failed for worker thread %u. ErrCode = %d", workerIdx, ret);
750 return;
751 }
752
754 // Apple's recommendation for Apple Silicon for games / high-perf software
755 // ========================================================================
756 // Per frame | Scheduling policy | QoS class / Priority
757 // ========================================================================
758 // Main thread | SCHED_OTHER | QOS_CLASS_USER_INTERACTIVE (47)
759 // Render/Audio thread | SCHED_RR | 45
760 // Workers High Prio | SCHED_RR | 39-41
761 // Workers Low Prio | SCHED_OTHER | QOS_CLASS_USER_INTERACTIVE (38)
762 // ========================================================================
763 // Multiple-frames | |
764 // ========================================================================
765 // Async Workers High Prio| SCHED_OTHER | QOS_CLASS_USER_INITIATED (37)
766 // Async Workers Low Prio | SCHED_OTHER | QOS_CLASS_DEFAULT (31)
767 // Prefetching/Streaming | SCHED_OTHER | QOS_CLASS_UTILITY (20)
768 // ========================================================================
769
770 if (prio == JobPriority::Low) {
771 #if GAIA_PLATFORM_APPLE
772 ret = pthread_attr_set_qos_class_np(&attr, QOS_CLASS_USER_INTERACTIVE, -9); // 47-9=38
773 if (ret != 0) {
774 GAIA_LOG_W(
775 "pthread_attr_set_qos_class_np failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
776 (uint32_t)prio, ret);
777 }
778 #else
779 ret = pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
780 if (ret != 0) {
781 GAIA_LOG_W(
782 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
783 (uint32_t)prio, ret);
784 }
785
786 int prioMax = core::get_min(38, sched_get_priority_max(SCHED_OTHER));
787 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_OTHER));
788 int prioUse = core::get_min(prioMin + 5, prioMax);
789 prioUse = core::get_max(prioUse, prioMin);
790 sched_param param{};
791 param.sched_priority = prioUse;
792
793 ret = pthread_attr_setschedparam(&attr, &param);
794 if (ret != 0) {
795 GAIA_LOG_W(
796 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
797 param.sched_priority, workerIdx, (uint32_t)prio, ret);
798 }
799 #endif
800 } else {
801 ret = pthread_attr_setschedpolicy(&attr, SCHED_RR);
802 if (ret != 0) {
803 GAIA_LOG_W(
804 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
805 (uint32_t)prio, ret);
806 }
807
808 int prioMax = core::get_min(41, sched_get_priority_max(SCHED_RR));
809 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_RR));
810 int prioUse = core::get_max(prioMax - 5, prioMin);
811 prioUse = core::get_min(prioUse, prioMax);
812 sched_param param{};
813 param.sched_priority = prioUse;
814
815 ret = pthread_attr_setschedparam(&attr, &param);
816 if (ret != 0) {
817 GAIA_LOG_W(
818 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
819 param.sched_priority, workerIdx, (uint32_t)prio, ret);
820 }
821 }
822
823 // Create the thread with given attributes
824 ret = pthread_create(&m_workers[workerIdx - 1], &attr, thread_func, (void*)&ctx);
825 if (ret != 0) {
826 GAIA_LOG_W("pthread_create failed for worker thread %u. ErrCode = %d", workerIdx, ret);
827 }
828
829 pthread_attr_destroy(&attr);
830#endif
831
832 // Stick each thread to a specific CPU core if possible
833 set_thread_affinity(workerIdx);
834 }
835
838 void join_thread(uint32_t workerIdx) {
839 if GAIA_UNLIKELY (workerIdx > m_workers.size())
840 return;
841
842#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
843 auto& t = m_workers[workerIdx - 1];
844 if (t.joinable())
845 t.join();
846#else
847 auto& t = m_workers[workerIdx - 1];
848 pthread_join(t, nullptr);
849#endif
850 }
851
852 void create_worker_threads(uint32_t& workerIdx, JobPriority prio, uint32_t count) {
853 for (uint32_t i = 0; i < count; ++i)
854 create_thread(workerIdx++, prio);
855 }
856
857 void set_thread_priority([[maybe_unused]] uint32_t workerIdx, [[maybe_unused]] JobPriority priority) {
858#if GAIA_PLATFORM_WINDOWS
859 HANDLE nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
860
861 THREAD_POWER_THROTTLING_STATE state{};
862 state.Version = THREAD_POWER_THROTTLING_CURRENT_VERSION;
863 if (priority == JobPriority::High) {
864 // HighQoS
865 // Turn EXECUTION_SPEED throttling off.
866 // ControlMask selects the mechanism and StateMask is set to zero as mechanisms should be turned off.
867 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
868 state.StateMask = 0;
869 } else {
870 // EcoQoS
871 // Turn EXECUTION_SPEED throttling on.
872 // ControlMask selects the mechanism and StateMask declares which mechanism should be on or off.
873 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
874 state.StateMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
875 }
876
877 BOOL ret = SetThreadInformation(nativeHandle, ThreadPowerThrottling, &state, sizeof(state));
878 if (ret != TRUE) {
879 GAIA_LOG_W("SetThreadInformation failed for thread %u", workerIdx);
880 return;
881 }
882#else
883 // Done when the thread is created
884#endif
885 }
886
887 void set_thread_affinity([[maybe_unused]] uint32_t workerIdx) {
888 // NOTE:
889 // Some cores might have multiple logic threads, there might be
890 // more sockets and some cores might even be physically different
891 // form others (performance vs efficiency cores).
892 // Because of that, do not handle affinity and let the OS figure it out.
893 // All treads created by the pool are setting thread priorities to make
894 // it easier for the OS.
895
896 // #if GAIA_PLATFORM_WINDOWS
897 // HANDLE nativeHandle = (HANDLE)m_workers[workerIdx-1].native_handle();
898 //
899 // auto mask = SetThreadAffinityMask(nativeHandle, 1ULL << workerIdx);
900 // if (mask <= 0)
901 // GAIA_LOG_W("Issue setting thread affinity for worker thread %u!", workerIdx);
902 // #elif GAIA_PLATFORM_APPLE
903 // // Do not do affinity for MacOS. If is not supported for Apple Silicon and
904 // // Intel MACs are deprecated anyway.
905 // // TODO: Consider supporting this at least for Intel MAC as there are still
906 // // quite of few of them out there.
907 // #elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
908 // pthread_t nativeHandle = (pthread_t)m_workers[workerIdx-1].native_handle();
909 //
910 // cpu_set_t cpuSet;
911 // CPU_ZERO(&cpuSet);
912 // CPU_SET(workerIdx, &cpuSet);
913 //
914 // auto ret = pthread_setaffinity_np(nativeHandle, sizeof(cpuSet), &cpuSet);
915 // if (ret != 0)
916 // GAIA_LOG_W("Issue setting thread affinity for worker thread %u!", workerIdx);
917 //
918 // ret = pthread_getaffinity_np(nativeHandle, sizeof(cpuSet), &cpuSet);
919 // if (ret != 0)
920 // GAIA_LOG_W("Thread affinity could not be set for worker thread %u!", workerIdx);
921 // #endif
922 }
923
927 void set_thread_name(uint32_t workerIdx, JobPriority prio) {
928#if GAIA_PROF_USE_PROFILER_THREAD_NAME
929 char threadName[16]{};
930 snprintf(threadName, 16, "worker_%s_%u", prio == JobPriority::High ? "HI" : "LO", workerIdx);
931 GAIA_PROF_THREAD_NAME(threadName);
932#elif GAIA_PLATFORM_WINDOWS
933 auto nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
934
935 TOSApiFunc_SetThreadDescription pSetThreadDescFunc = nullptr;
936 if (auto* pModule = GetModuleHandleA("kernel32.dll")) {
937 auto* pFunc = GetProcAddress(pModule, "SetThreadDescription");
938 pSetThreadDescFunc = reinterpret_cast<TOSApiFunc_SetThreadDescription>(reinterpret_cast<void*>(pFunc));
939 }
940 if (pSetThreadDescFunc != nullptr) {
941 wchar_t threadName[16]{};
942 swprintf_s(threadName, L"worker_%s_%u", prio == JobPriority::High ? L"HI" : L"LO", workerIdx);
943
944 auto hr = pSetThreadDescFunc(nativeHandle, threadName);
945 if (FAILED(hr)) {
946 GAIA_LOG_W(
947 "Issue setting name for worker %s thread %u!", prio == JobPriority::High ? "HI" : "LO", workerIdx);
948 }
949 } else {
950 #if defined _MSC_VER
951 char threadName[16]{};
952 snprintf(threadName, 16, "worker_%s_%u", prio == JobPriority::High ? "HI" : "LO", workerIdx);
953
954 THREADNAME_INFO info{};
955 info.dwType = 0x1000;
956 info.szName = threadName;
957 info.dwThreadID = GetThreadId(nativeHandle);
958
959 __try {
960 RaiseException(0x406D1388, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
961 } __except (EXCEPTION_EXECUTE_HANDLER) {
962 }
963 #endif
964 }
965#elif GAIA_PLATFORM_APPLE
966 char threadName[16]{};
967 snprintf(threadName, 16, "worker_%s_%u", prio == JobPriority::High ? "HI" : "LO", workerIdx);
968 auto ret = pthread_setname_np(threadName);
969 if (ret != 0)
970 GAIA_LOG_W("Issue setting name for worker %s thread %u!", prio == JobPriority::High ? "HI" : "LO", workerIdx);
971#elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
972 auto nativeHandle = m_workers[workerIdx - 1];
973
974 char threadName[16]{};
975 snprintf(threadName, 16, "worker_%s_%u", prio == JobPriority::High ? "HI" : "LO", workerIdx);
976 GAIA_PROF_THREAD_NAME(threadName);
977 auto ret = pthread_setname_np(nativeHandle, threadName);
978 if (ret != 0)
979 GAIA_LOG_W("Issue setting name for worker %s thread %u!", prio == JobPriority::High ? "HI" : "LO", workerIdx);
980#endif
981 }
982
985 GAIA_NODISCARD bool main_thread() const {
986 return std::this_thread::get_id() == m_mainThreadId;
987 }
988
993 void main_thread_tick() {
994 auto& ctx = *detail::tl_workerCtx;
995
996 // Keep executing while there is work
997 while (true) {
998 JobHandle jobHandle;
999 if (!try_fetch_job(ctx, jobHandle))
1000 break;
1001
1002 if (run(jobHandle, &ctx))
1003 del(jobHandle);
1004 }
1005 }
1006
1007 bool try_fetch_job(ThreadCtx& ctx, JobHandle& jobHandle) {
1008 // Try getting a job from the local queue
1009 if (ctx.jobQueue.try_pop(jobHandle))
1010 return true;
1011
1012 // Try getting a job from the global queue
1013 if (m_jobQueue[(uint32_t)ctx.prio].try_pop(jobHandle))
1014 return true;
1015
1016 // Could not get a job, try stealing from other workers.
1017 const auto workerCnt = m_workersCtx.size();
1018 for (uint32_t i = 0; i < workerCnt;) {
1019 // We need to skip our worker
1020 if (i == ctx.workerIdx) {
1021 ++i;
1022 continue;
1023 }
1024
1025 // Try stealing a job
1026 const auto res = m_workersCtx[i].jobQueue.try_steal(jobHandle);
1027
1028 // Race condition, try again from the same context
1029 if (!res)
1030 continue;
1031
1032 // Stealing can return true if the queue is empty.
1033 // We return right away only if we receive a valid handle which means
1034 // when there was an idle job in the queue.
1035 if (res && jobHandle != (JobHandle)JobNull_t{})
1036 return true;
1037
1038 ++i;
1039 }
1040
1041 return false;
1042 }
1043
1047 void worker_loop(ThreadCtx& ctx) {
1048 while (true) {
1049 // Wait for work
1050 m_sem.wait();
1051
1052 // Keep executing while there is work
1053 while (true) {
1054 JobHandle jobHandle;
1055 if (!try_fetch_job(ctx, jobHandle))
1056 break;
1057
1058 (void)run(jobHandle, detail::tl_workerCtx);
1059 }
1060
1061 // Check if the worker can keep running
1062 const bool stop = m_stop.load();
1063 if (stop)
1064 break;
1065 }
1066 }
1067
1069 void reset() {
1070 if (m_workers.empty())
1071 return;
1072
1073 // Request stopping
1074 m_stop.store(true);
1075
1076 // Signal all threads
1077 m_sem.release((int32_t)m_workers.size());
1078
1079 auto* ctx = detail::tl_workerCtx;
1080
1081 // Finish remaining jobs
1082 JobHandle jobHandle;
1083 while (try_fetch_job(*ctx, jobHandle)) {
1084 run(jobHandle, ctx);
1085 }
1086
1087 detail::tl_workerCtx = nullptr;
1088
1089 // Join threads with the main one
1090 GAIA_FOR(m_workers.size()) join_thread(i + 1);
1091
1092 // All threads have been stopped. Allow new threads to run if necessary.
1093 m_stop.store(false);
1094 }
1095
1097 template <typename TJob>
1098 JobPriority final_prio(const TJob& job) {
1099 const auto cntWorkers = m_workersCnt[(uint32_t)job.priority];
1100 return cntWorkers > 0
1101 // If there is enough workers, keep the priority
1102 ? job.priority
1103 // Not enough workers, use the other priority that has workers
1104 : (JobPriority)(((uint32_t)job.priority + 1U) % (uint32_t)JobPriorityCnt);
1105 }
1106
1107 void signal_edges(JobContainer& jobData) {
1108 const auto max = jobData.edges.depCnt;
1109
1110 // Nothing to do if there are no dependencies
1111 if (max == 0)
1112 return;
1113
1114 auto* ctx = detail::tl_workerCtx;
1115
1116 // One dependency
1117 if (max == 1) {
1118 auto depHandle = jobData.edges.dep;
1119#if GAIA_LOG_JOB_STATES
1120 GAIA_LOG_N("SIGNAL %u.%u -> %u.%u", jobData.idx, jobData.gen, depHandle.id(), depHandle.gen());
1121#endif
1122
1123 // See the conditions can't be satisfied for us to submit the job we skip
1124 auto& depData = m_jobManager.data(depHandle);
1125 if (!JobManager::signal_edge(depData))
1126 return;
1127
1128 // Submit all jobs that can are ready
1129 process(std::span(&depHandle, 1), ctx);
1130 return;
1131 }
1132
1133 // Multiple dependencies. The array has to be set
1134 GAIA_ASSERT(jobData.edges.pDeps != nullptr);
1135
1136 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * max);
1137 uint32_t cnt = 0;
1138 GAIA_FOR(max) {
1139 auto depHandle = jobData.edges.pDeps[i];
1140
1141 // See if all conditions were satisfied for us to submit the job
1142 auto& depData = m_jobManager.data(depHandle);
1143 if (!JobManager::signal_edge(depData))
1144 continue;
1145
1146 pHandles[cnt++] = depHandle;
1147 }
1148
1149 // Submit all jobs that can are ready
1150 process(std::span(pHandles, cnt), ctx);
1151 }
1152
1153 void process(std::span<JobHandle> jobHandles, ThreadCtx* ctx) {
1154 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * jobHandles.size());
1155 uint32_t handlesCnt = 0;
1156
1157 for (auto handle: jobHandles) {
1158 auto& jobData = m_jobManager.data(handle);
1159 m_jobManager.processing(jobData);
1160
1161 // Jobs that have no functor assigned don't need to be enqueued.
1162 // We can "run" them right away. The only time where it makes
1163 // sense to create such a job is to create a sync job. E.g. when you
1164 // need to wait for N jobs, rather than waiting for each of them
1165 // separately you make them a dependency of a dummy/sync job and
1166 // wait just for that one.
1167 if (!jobData.func.operator bool())
1168 (void)run(handle, ctx);
1169 else
1170 pHandles[handlesCnt++] = handle;
1171 }
1172
1173 std::span handles(pHandles, handlesCnt);
1174 while (!handles.empty()) {
1175 // Try pushing all jobs
1176 uint32_t pushed = 0;
1177 if (ctx != nullptr) {
1178 pushed = ctx->jobQueue.try_push(handles);
1179 } else {
1180 for (auto handle: handles) {
1181 if (m_jobQueue[(uint32_t)handle.prio()].try_push(handle))
1182 pushed++;
1183 else
1184 break;
1185 }
1186 }
1187
1188 // Lock the semaphore with the number of jobs me managed to push.
1189 // Number of workers if the upper bound.
1190 const auto cntWorkers = m_workersCnt[(uint32_t)ctx->prio];
1191 const auto cnt = (int32_t)core::get_min(pushed, cntWorkers);
1192 m_sem.release(cnt);
1193
1194 handles = handles.subspan(pushed);
1195 if (!handles.empty()) {
1196 // The queue was full. Execute the job right away.
1197 run(handles[0], ctx);
1198 handles = handles.subspan(1);
1199 }
1200 }
1201 }
1202
1203 bool run(JobHandle jobHandle, ThreadCtx* ctx) {
1204 if (jobHandle == (JobHandle)JobNull_t{})
1205 return false;
1206
1207 auto& jobData = m_jobManager.data(jobHandle);
1208 const bool manualDelete = (jobData.flags & JobCreationFlags::ManualDelete) != 0U;
1209 const bool canWait = (jobData.flags & JobCreationFlags::CanWait) != 0U;
1210
1211 m_jobManager.executing(jobData, ctx->workerIdx);
1212
1213 if (m_blockedInWorkUntil.load() != 0) {
1214 const auto blockedCnt = m_blockedInWorkUntil.exchange(0);
1215 if (blockedCnt != 0)
1216 Futex::wake(&m_blockedInWorkUntil, detail::WaitMaskAll);
1217 }
1218
1219 GAIA_ASSERT(jobData.idx != (uint32_t)-1 && jobData.data.gen != (uint32_t)-1);
1220
1221 // Run the functor associated with the job
1222 m_jobManager.run(jobData);
1223
1224 // Signal the edges and release memory allocated for them if possible
1225 signal_edges(jobData);
1226 JobManager::free_edges(jobData);
1227
1228 // Signal we finished
1229 ctx->event.set();
1230 if (canWait) {
1231 const auto* pFutexValue = &jobData.state;
1232 Futex::wake(pFutexValue, detail::WaitMaskAll);
1233 }
1234
1235 if (!manualDelete)
1236 del(jobHandle);
1237
1238 return true;
1239 }
1240 };
1241
1242 GAIA_MSVC_WARNING_POP()
1243 } // namespace mt
1244} // namespace gaia
Array of elements of type.
Definition sarray_ext_impl.h:27
Definition span_impl.h:99
Definition jobmanager.h:147
static void run(JobContainer &jobData)
Execute the functor associated with the job container.
Definition jobmanager.h:196
void free_job(JobHandle jobHandle)
Invalidates jobHandle by resetting its index in the job pool. Every time a job is deallocated its gen...
Definition jobmanager.h:183
void dep(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition jobmanager.h:232
GAIA_NODISCARD JobHandle alloc_job(const Job &job)
Allocates a new job container identified by a unique JobHandle.
Definition jobmanager.h:162
void dep_refresh(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition jobmanager.h:272
An optimized version of Semaphore that avoids expensive system calls when the counter is greater than...
Definition semaphore_fast.h:12
bool wait()
Decrements semaphore count by 1. If the count is already 0, it waits indefinitely until semaphore cou...
Definition semaphore_fast.h:39
void release(int32_t count=1)
Increments semaphore count by the specified amount.
Definition semaphore_fast.h:26
Definition spinlock.h:8
Definition threadpool.h:87
void set_workers_high_prio_inter(uint32_t &workerIdx, uint32_t count)
Updates the number of worker threads participating at high priority workloads.
Definition threadpool.h:206
void update()
Uses the main thread to help with jobs processing.
Definition threadpool.h:589
JobHandle sched_par(JobParallel &job, uint32_t itemsToProcess, uint32_t groupSize)
Schedules a job to run on worker threads in parallel.
Definition threadpool.h:446
JobHandle sched(Job &job)
Schedules a job to run on a worker thread.
Definition threadpool.h:420
void dep(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition threadpool.h:265
void set_max_workers(uint32_t count, uint32_t countHighPrio)
Set the maximum number of workers for this system.
Definition threadpool.h:168
void submit(JobHandle jobHandle)
Pushes jobHandle into the internal queue so worker threads can pick it up and execute it....
Definition threadpool.h:395
JobHandle sched(Job &job, JobHandle dependsOn)
Schedules a job to run on a worker thread.
Definition threadpool.h:432
void dep_refresh(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition threadpool.h:302
void dep_refresh(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition threadpool.h:289
void submit(std::span< JobHandle > jobHandles)
Pushes jobHandles into the internal queue so worker threads can pick them up and execute them....
Definition threadpool.h:364
static GAIA_NODISCARD uint32_t hw_efficiency_cores_cnt()
Returns the number of efficiency cores of the system.
Definition threadpool.h:603
void make_main_thread()
Make the calling thread the effective main thread from the thread pool perspective.
Definition threadpool.h:154
void wait(JobHandle jobHandle)
Wait until a job associated with the jobHandle finishes executing. Cleans up any job allocations and ...
Definition threadpool.h:546
JobHandle add(TJob &&job)
Creates a threadpool job from job.
Definition threadpool.h:313
void set_workers_low_prio(uint32_t count)
Updates the number of worker threads participating at low priority workloads.
Definition threadpool.h:252
void del(JobHandle jobHandle)
Deletes a job handle jobHandle from the threadpool.
Definition threadpool.h:342
void set_workers_high_prio(uint32_t count)
Updates the number of worker threads participating at high priority workloads.
Definition threadpool.h:242
GAIA_NODISCARD uint32_t workers() const
Returns the number of worker threads.
Definition threadpool.h:159
void dep(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition threadpool.h:276
void set_workers_low_prio_inter(uint32_t &workerIdx, uint32_t count)
Updates the number of worker threads participating at low priority workloads.
Definition threadpool.h:225
static GAIA_NODISCARD uint32_t hw_thread_cnt()
Returns the number of HW threads available on the system. 1 is minimum.
Definition threadpool.h:596
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9
Definition utility.h:123
static Result wait(const std::atomic_uint32_t *pFutexValue, uint32_t expected, uint32_t waitMask)
Suspends the caller on the futex while its value remains expected.
Definition futex.h:65
static uint32_t wake(const std::atomic_uint32_t *pFutexValue, uint32_t wakeCount, uint32_t wakeMask=detail::WaitMaskAny)
Wakes up to wakeCount waiters whose waitMask matches wakeMask.
Definition futex.h:97
Definition jobcommon.h:39
std::function< void()> func
Function to execute when running the job.
Definition jobmanager.h:81
JobPriority prio
Job priority.
Definition jobmanager.h:75
std::atomic_uint32_t state
Current state of the job Consist of upper and bottom part. Least significant bits = special purpose....
Definition jobmanager.h:73
Definition jobhandle.h:13
Definition jobhandle.h:81
Definition jobcommon.h:44
Definition jobcommon.h:33
Definition jobcommon.h:51