Gaia-ECS v0.9.3
A simple and powerful entity component system
Loading...
Searching...
No Matches
threadpool.h
1#pragma once
2
3#include "gaia/config/config.h"
4#include "gaia/config/profiler.h"
5
6#if GAIA_PLATFORM_WINDOWS
7 #include <cstdio>
8 #include <windows.h>
9#endif
10
11#define GAIA_THREAD_OFF 0
12#define GAIA_THREAD_STD 1
13#define GAIA_THREAD_PTHREAD 2
14
15#if GAIA_PLATFORM_WINDOWS || GAIA_PLATFORM_WASM
16 #include <thread>
17 // Emscripten supports std::thread if compiled with -sUSE_PTHREADS=1.
18 // Otherwise, std::thread calls are no-ops that compile but do not run concurrently.
19 #define GAIA_THREAD std::thread
20 #define GAIA_THREAD_PLATFORM GAIA_THREAD_STD
21#elif GAIA_PLATFORM_APPLE
22 #include <pthread.h>
23 #include <pthread/sched.h>
24 #include <sys/qos.h>
25 #include <sys/sysctl.h>
26 #define GAIA_THREAD pthread_t
27 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
28#elif GAIA_PLATFORM_LINUX
29 #include <dirent.h>
30 #include <fcntl.h>
31 #include <pthread.h>
32 #include <unistd.h>
33 #define GAIA_THREAD pthread_t
34 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
35#elif GAIA_PLATFORM_FREEBSD
36 #include <pthread.h>
37 #include <sys/sysctl.h>
38 #define GAIA_THREAD pthread_t
39 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
40#endif
41
42#if GAIA_PLATFORM_WINDOWS
43 #include <malloc.h>
44#else
45 #include <alloca.h>
46#endif
47#include <atomic>
48#include <thread>
49
50#include "gaia/cnt/sarray_ext.h"
51#include "gaia/core/span.h"
52#include "gaia/core/utility.h"
53#include "gaia/util/logging.h"
54
55#include "gaia/mt/event.h"
56#include "gaia/mt/futex.h"
57#include "gaia/mt/jobcommon.h"
58#include "gaia/mt/jobhandle.h"
59#include "gaia/mt/jobmanager.h"
60#include "gaia/mt/jobqueue.h"
61#include "gaia/mt/semaphore_fast.h"
62#include "gaia/mt/spinlock.h"
63
64namespace gaia {
65 namespace mt {
66#if GAIA_PLATFORM_WINDOWS
67 extern "C" typedef HRESULT(WINAPI* TOSApiFunc_SetThreadDescription)(HANDLE, PCWSTR);
68
69 #pragma pack(push, 8)
70 typedef struct tagTHREADNAME_INFO {
71 DWORD dwType; // Must be 0x1000.
72 LPCSTR szName; // Pointer to name (in user addr space).
73 DWORD dwThreadID; // Thread ID (-1=caller thread).
74 DWORD dwFlags; // Reserved for future use, must be zero.
75 } THREADNAME_INFO;
76 #pragma pack(pop)
77#endif
78
79 namespace detail {
81 inline thread_local ThreadCtx* tl_workerCtx;
82 } // namespace detail
83
84 GAIA_MSVC_WARNING_PUSH()
85 GAIA_MSVC_WARNING_DISABLE(4324)
86
87 class GAIA_API ThreadPool final {
88 friend class JobManager;
89
94 static constexpr uint32_t MaxWorkers = JobState::DEP_BITS;
95
97 std::thread::id m_mainThreadId;
98
100 std::atomic_bool m_stop{};
104 GAIA_ALIGNAS(128) cnt::sarray_ext<ThreadCtx, MaxWorkers> m_workersCtx;
106 MpmcQueue<JobHandle, 1024> m_jobQueue[JobPriorityCnt];
108 uint32_t m_workersCnt[JobPriorityCnt]{};
110 SemaphoreFast m_sem;
111
113 std::atomic_uint32_t m_blockedInWorkUntil;
114
116 JobManager m_jobManager;
118 // NOTE: Allocs are done only from the main thread while there are no jobs running.
119 // Freeing can happen at any point from any thread. Therefore, we need to lock this point.
120 // Access do job data is not thread-safe. No jobs should be added while there is any job running.
121 GAIA_PROF_MUTEX(SpinLock, m_jobAllocMtx);
122
123 private:
124 ThreadPool() {
125 m_stop.store(false);
126
127 make_main_thread();
128
129 const auto hwThreads = hw_thread_cnt();
130 const auto hwEffThreads = hw_efficiency_cores_cnt();
131 uint32_t hiPrioWorkers = hwThreads;
132 if (hwEffThreads < hwThreads)
133 hiPrioWorkers -= hwEffThreads;
134
135 set_max_workers(hwThreads, hiPrioWorkers);
136 }
137
138 ThreadPool(ThreadPool&&) = delete;
139 ThreadPool(const ThreadPool&) = delete;
140 ThreadPool& operator=(ThreadPool&&) = delete;
141 ThreadPool& operator=(const ThreadPool&) = delete;
142
143 public:
144 static ThreadPool& get() {
145 static ThreadPool threadPool;
146 return threadPool;
147 }
148
149 ~ThreadPool() {
150 reset();
151 }
152
155 m_mainThreadId = std::this_thread::get_id();
156 }
157
159 GAIA_NODISCARD uint32_t workers() const {
160 return m_workers.size();
161 }
162
168 void set_max_workers(uint32_t count, uint32_t countHighPrio) {
169 const auto workersCnt = core::get_max(core::get_min(MaxWorkers, count), 1U);
170 if (countHighPrio > count)
171 countHighPrio = count;
172
173 // Stop all threads first
174 reset();
175
176 // Reset previous worker contexts
177 for (auto& ctx: m_workersCtx)
178 ctx.reset();
179
180 // Resize or array
181 m_workersCtx.resize(workersCnt);
182 // We also have the main thread so there's always one less worker spawned
183 m_workers.resize(workersCnt - 1);
184
185 // First worker is considered the main thread.
186 // It is also assigned high priority but it doesn't really matter.
187 // The main thread can steal any jobs, both low and high priority.
188 detail::tl_workerCtx = m_workersCtx.data();
189 m_workersCtx[0].tp = this;
190 m_workersCtx[0].workerIdx = 0;
191 m_workersCtx[0].prio = JobPriority::High;
192
193 // Reset the workers
194 for (auto& worker: m_workers)
195 worker = {};
196
197 // Create a new set of high and low priority threads (if any)
198 uint32_t workerIdx = 1;
199 set_workers_high_prio_inter(workerIdx, countHighPrio);
200 }
201
206 void set_workers_high_prio_inter(uint32_t& workerIdx, uint32_t count) {
207 count = gaia::core::get_min(count, m_workers.size());
208 if (count == 0) {
209 m_workersCnt[0] = 1; // main thread
210 m_workersCnt[1] = 0;
211 } else {
212 m_workersCnt[0] = count + 1; // Main thread is always a priority worker
213 m_workersCnt[1] = m_workers.size() - count;
214 }
215
216 // Create a new set of high and low priority threads (if any)
217 create_worker_threads(workerIdx, JobPriority::High, m_workersCnt[0] - 1);
218 create_worker_threads(workerIdx, JobPriority::Low, m_workersCnt[1]);
219 }
220
225 void set_workers_low_prio_inter(uint32_t& workerIdx, uint32_t count) {
226 const uint32_t realCnt = gaia::core::get_max(count, m_workers.size());
227 if (realCnt == 0) {
228 m_workersCnt[0] = 0;
229 m_workersCnt[1] = 1; // main thread
230 } else {
231 m_workersCnt[0] = m_workers.size() - realCnt;
232 m_workersCnt[1] = realCnt + 1; // Main thread is always a priority worker;
233 }
234
235 // Create a new set of high and low priority threads (if any)
236 create_worker_threads(workerIdx, JobPriority::High, m_workersCnt[0]);
237 create_worker_threads(workerIdx, JobPriority::Low, m_workersCnt[1]);
238 }
239
242 void set_workers_high_prio(uint32_t count) {
243 // Stop all threads first
244 reset();
245 detail::tl_workerCtx = m_workersCtx.data();
246
247 uint32_t workerIdx = 1;
248 set_workers_high_prio_inter(workerIdx, count);
249 }
250
253 void set_workers_low_prio(uint32_t count) {
254 // Stop all threads first
255 reset();
256 detail::tl_workerCtx = m_workersCtx.data();
257
258 uint32_t workerIdx = 1;
259 set_workers_low_prio_inter(workerIdx, count);
260 }
261
267 void dep(JobHandle jobFirst, JobHandle jobSecond) {
268 GAIA_ASSERT(main_thread());
269
270 m_jobManager.dep(std::span(&jobFirst, 1), jobSecond);
271 }
272
278 void dep(std::span<JobHandle> jobsFirst, JobHandle jobSecond) {
279 GAIA_ASSERT(main_thread());
280
281 m_jobManager.dep(jobsFirst, jobSecond);
282 }
283
291 void dep_refresh(JobHandle jobFirst, JobHandle jobSecond) {
292 GAIA_ASSERT(main_thread());
293
294 m_jobManager.dep_refresh(std::span(&jobFirst, 1), jobSecond);
295 }
296
304 void dep_refresh(std::span<JobHandle> jobsFirst, JobHandle jobSecond) {
305 GAIA_ASSERT(main_thread());
306
307 m_jobManager.dep_refresh(jobsFirst, jobSecond);
308 }
309
314 template <typename TJob>
315 JobHandle add(TJob&& job) {
316 GAIA_ASSERT(main_thread());
317
318 job.priority = final_prio(job);
319
320 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
321 core::lock_scope lock(mtx);
322 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
323
324 return m_jobManager.alloc_job(GAIA_FWD(job));
325 }
326
327 private:
328 void add_n(JobPriority prio, std::span<JobHandle> jobHandles) {
329 GAIA_ASSERT(main_thread());
330 GAIA_ASSERT(!jobHandles.empty());
331
332 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
333 core::lock_scope lock(mtx);
334 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
335
336 for (auto& jobHandle: jobHandles)
337 jobHandle = m_jobManager.alloc_job({{}, prio, JobCreationFlags::Default});
338 }
339
340 public:
344 void del([[maybe_unused]] JobHandle jobHandle) {
345 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
346
347#if GAIA_ASSERT_ENABLED
348 {
349 const auto& jobData = m_jobManager.data(jobHandle);
350 GAIA_ASSERT(jobData.state == 0 || m_jobManager.done(jobData));
351 }
352#endif
353
354 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
355 core::lock_scope lock(mtx);
356 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
357
358 m_jobManager.free_job(jobHandle);
359 }
360
366 void submit(std::span<JobHandle> jobHandles) {
367 if (jobHandles.empty())
368 return;
369
370 GAIA_PROF_SCOPE(tp::submit);
371
372 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * jobHandles.size());
373
374 uint32_t cnt = 0;
375 for (auto handle: jobHandles) {
376 GAIA_ASSERT(handle != (JobHandle)JobNull_t{});
377
378 auto& jobData = m_jobManager.data(handle);
379 const auto state = m_jobManager.submit(jobData) & JobState::DEP_BITS_MASK;
380 // Jobs that were already submitted won't be submitted again.
381 // We can only accept the job if it has no pending dependencies.
382 if (state != 0)
383 continue;
384
385 pHandles[cnt++] = handle;
386 }
387
388 auto* ctx = detail::tl_workerCtx;
389 process(std::span(pHandles, cnt), ctx);
390 }
391
397 void submit(JobHandle jobHandle) {
398 submit(std::span(&jobHandle, 1));
399 }
400
401 void reset_state(std::span<JobHandle> jobHandles) {
402 if (jobHandles.empty())
403 return;
404
405 GAIA_PROF_SCOPE(tp::reset);
406
407 for (auto handle: jobHandles) {
408 auto& jobData = m_jobManager.data(handle);
409 m_jobManager.reset_state(jobData);
410 }
411 }
412
413 void reset_state(JobHandle jobHandle) {
414 reset_state(std::span(&jobHandle, 1));
415 }
416
419 void reset(std::span<JobHandle> jobHandles) {
420 if (jobHandles.empty())
421 return;
422
423 GAIA_ASSERT(main_thread());
424 GAIA_PROF_SCOPE(tp::reset_wait);
425
426 // Wait first to avoid resetting one handle while another one still depends on it.
427 for (auto handle: jobHandles) {
428 if (handle == (JobHandle)JobNull_t{})
429 continue;
430 wait(handle);
431 }
432
433 for (auto handle: jobHandles) {
434 if (handle == (JobHandle)JobNull_t{})
435 continue;
436
437 auto& jobData = m_jobManager.data(handle);
438 const auto state = jobData.state.load() & JobState::STATE_BITS_MASK;
439 // Auto-deleted jobs are released and cannot be reused through reset_state().
440 if (state == JobState::Released)
441 continue;
442
443 m_jobManager.reset_state(jobData);
444 }
445 }
446
448 void reset(JobHandle jobHandle) {
449 reset(std::span(&jobHandle, 1));
450 }
451
458 JobHandle jobHandle = add(job);
459 submit(jobHandle);
460 return jobHandle;
461 }
462
469 JobHandle sched(Job& job, JobHandle dependsOn) {
470 JobHandle jobHandle = add(job);
471 dep(jobHandle, dependsOn);
472 submit(jobHandle);
473 return jobHandle;
474 }
475
483 JobHandle sched_par(JobParallel& job, uint32_t itemsToProcess, uint32_t groupSize) {
484 GAIA_ASSERT(main_thread());
485
486 // Empty data set are considered wrong inputs
487 GAIA_ASSERT(itemsToProcess != 0);
488 if (itemsToProcess == 0)
489 return JobNull;
490
491 // Don't add new jobs once stop was requested
492 if GAIA_UNLIKELY (m_stop)
493 return JobNull;
494
495 // Make sure the right priority is selected
496 const auto prio = job.priority = final_prio(job);
497
498 // No group size was given, make a guess based on the set size
499 if (groupSize == 0) {
500 const auto cntWorkers = m_workersCnt[(uint32_t)prio];
501 groupSize = (itemsToProcess + cntWorkers - 1) / cntWorkers;
502
503 // If there are too many items we split them into multiple jobs.
504 // This way, if we wait for the result and some workers finish
505 // with our task faster, the finished worker can pick up a new
506 // job faster.
507 // On the other hand, too little items probably don't deserve
508 // multiple jobs.
509 constexpr uint32_t maxUnitsOfWorkPerGroup = 8;
510 groupSize = groupSize / maxUnitsOfWorkPerGroup;
511 if (groupSize <= 0)
512 groupSize = 1;
513 }
514
515 const auto jobs = (itemsToProcess + groupSize - 1) / groupSize;
516
517 // Only one job is created, use the job directly.
518 // Generally, this is the case we would want to avoid because it means this particular case
519 // is not worth of being scheduled via sched_par. However, we can never know for sure what
520 // the reason for that is so let's stay silent.
521 if (jobs == 1) {
522 const uint32_t groupJobIdxEnd = groupSize < itemsToProcess ? groupSize : itemsToProcess;
523 auto groupJobFunc = [job, groupJobIdxEnd]() {
524 JobArgs args;
525 args.idxStart = 0;
526 args.idxEnd = groupJobIdxEnd;
527 job.func(args);
528 };
529
530 auto handle = add(Job{groupJobFunc, prio, JobCreationFlags::Default});
531 submit(handle);
532 return handle;
533 }
534
535 // Multiple jobs need to be parallelized.
536 // Create a sync job and assign it as their dependency.
537 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * (jobs + 1));
538 std::span<JobHandle> handles(pHandles, jobs + 1);
539
540 add_n(prio, handles);
541
542#if GAIA_ASSERT_ENABLED
543 for (auto jobHandle: handles)
544 GAIA_ASSERT(m_jobManager.is_clear(jobHandle));
545#endif
546
547 // Work jobs
548 for (uint32_t jobIndex = 0; jobIndex < jobs; ++jobIndex) {
549 const uint32_t groupJobIdxStart = jobIndex * groupSize;
550 const uint32_t groupJobIdxStartPlusGroupSize = groupJobIdxStart + groupSize;
551 const uint32_t groupJobIdxEnd =
552 groupJobIdxStartPlusGroupSize < itemsToProcess ? groupJobIdxStartPlusGroupSize : itemsToProcess;
553
554 auto groupJobFunc = [job, groupJobIdxStart, groupJobIdxEnd]() {
555 JobArgs args;
556 args.idxStart = groupJobIdxStart;
557 args.idxEnd = groupJobIdxEnd;
558 job.func(args);
559 };
560
561 auto& jobData = m_jobManager.data(pHandles[jobIndex]);
562 jobData.func = groupJobFunc;
563 jobData.prio = prio;
564 }
565 // Sync job
566 {
567 auto& jobData = m_jobManager.data(pHandles[jobs]);
568 jobData.prio = prio;
569 }
570
571 // Assign the sync jobs as a dependency for work jobs
572 dep(handles.subspan(0, jobs), pHandles[jobs]);
573
574 // Sumbit the jobs to the threadpool.
575 // This is a point of no return. After this point no more changes to jobs are possible.
576 submit(handles);
577 return pHandles[jobs];
578 }
579
584 void wait(JobHandle jobHandle) {
585 GAIA_PROF_SCOPE(tp::wait);
586
587 GAIA_ASSERT(main_thread());
588
589 // Skip waitinig for unset job handles.
590 if (jobHandle == (JobHandle)JobNull_t{})
591 return;
592
593 auto* ctx = detail::tl_workerCtx;
594 auto& jobData = m_jobManager.data(jobHandle);
595 auto state = jobData.state.load();
596
597 // Waiting for a job that has not been initialized is nonsense.
598 GAIA_ASSERT(state != 0);
599
600 // Wait until done
601 for (; (state & JobState::STATE_BITS_MASK) < JobState::Done; state = jobData.state.load()) {
602 // The job we are waiting for is not finished yet, try running some other job in the meantime
603 JobHandle otherJobHandle;
604 if (try_fetch_job(*ctx, otherJobHandle)) {
605 if (run(otherJobHandle, ctx))
606 continue;
607 }
608
609 // The job we are waiting for is already running.
610 // Wait until it signals it's finished.
611 if ((state & JobState::STATE_BITS_MASK) == JobState::Executing) {
612 const auto workerId = (state & JobState::DEP_BITS_MASK);
613 auto* jobDoneEvent = &m_workersCtx[workerId].event;
614 jobDoneEvent->wait();
615 continue;
616 }
617
618 // The worst case scenario.
619 // We have nothing to do and the job we are waiting for is not executing still.
620 // Let's wait for any job to start executing.
621 const auto workerBit = 1U << ctx->workerIdx;
622 const auto oldBlockedMask = m_blockedInWorkUntil.fetch_or(workerBit);
623 const auto newState = jobData.state.load();
624 if (newState == state) // still not JobState::Done?
625 Futex::wait(&m_blockedInWorkUntil, oldBlockedMask | workerBit, detail::WaitMaskAny);
626 m_blockedInWorkUntil.fetch_and(~workerBit);
627 }
628 }
629
631 void update() {
632 GAIA_ASSERT(main_thread());
633 main_thread_tick();
634 }
635
638 GAIA_NODISCARD static uint32_t hw_thread_cnt() {
639 auto hwThreads = (uint32_t)std::thread::hardware_concurrency();
640 return core::get_max(1U, hwThreads);
641 }
642
645 GAIA_NODISCARD static uint32_t hw_efficiency_cores_cnt() {
646 uint32_t efficiencyCores = 0;
647#if GAIA_PLATFORM_APPLE
648 size_t size = sizeof(efficiencyCores);
649 if (sysctlbyname("hw.perflevel1.logicalcpu", &efficiencyCores, &size, nullptr, 0) != 0)
650 return 0;
651#elif GAIA_PLATFORM_FREEBSD
652 int cpuIndex = 0;
653 char oidName[32];
654 int coreType;
655 size_t size = sizeof(coreType);
656 while (true) {
657 GAIA_STRFMT(oidName, sizeof(oidName), "dev.cpu.%d.coretype", cpuIndex);
658 if (sysctlbyname(oidName, &coreType, &size, nullptr, 0) != 0)
659 break; // Stop on the last CPU index
660
661 // 0 = performance core
662 // 1 = efficiency core
663 if (coreType == 1)
664 ++efficiencyCores;
665
666 ++cpuIndex;
667 }
668#elif GAIA_PLATFORM_WINDOWS
669 DWORD length = 0;
670
671 // First, determine required buffer size
672 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, nullptr, &length))
673 return 0;
674
675 // Allocate enough memory
676 auto* pBuffer = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)malloc(length);
677 if (pBuffer == nullptr)
678 return 0;
679
680 // Retrieve the data
681 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, pBuffer, &length)) {
682 free(pBuffer);
683 return 0;
684 }
685
686 uint32_t heterogenousCnt = 0;
687
688 // Iterate over processor core entries.
689 // On Windows we can't directly tell whether a core is an efficiency core or a performance core.
690 // Instead:
691 // - lower EfficiencyClass values correspond to more efficient cores
692 // - higher EfficiencyClass values correspond to higher performance cores
693 // - EfficiencyClass is zero for homogeneous CPU architectures
694 // Therefore, to count efficiency cores on Windows, we will count cores where EfficiencyClass == 0.
695 // On heterogeneous this should gives us the correct results.
696 // On homogenous architectures, the value is always 0 so rather than calculating the number of efficiency
697 // cores, we would calculate the number of performance cores. For the sake of correctness, if all cores return
698 // 0, we use 0 for the number of efficiency cores.
699 for (char* ptr = (char*)pBuffer; ptr < (char*)pBuffer + length;
700 ptr += ((SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr)->Size) {
701 auto* entry = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr;
702 if (entry->Relationship == RelationProcessorCore) {
703 if (entry->Processor.EfficiencyClass == 0)
704 ++efficiencyCores;
705 else
706 ++heterogenousCnt;
707 }
708 }
709
710 if (heterogenousCnt == 0)
711 efficiencyCores = 0;
712
713 free(pBuffer);
714#elif GAIA_PLATFORM_LINUX
715 {
716 // Intel has /sys/devices/cpu_core/cpus, /sys/devices/cpu_atom/cpus on some systems
717 DIR* dir = opendir("/sys/devices/cpu_atom/cpus/");
718 if (dir == nullptr)
719 return 0;
720
721 dirent* entry;
722 while ((entry = readdir(dir)) != nullptr) {
723 if (strncmp(entry->d_name, "cpu", 3) == 0 && entry->d_name[3] >= '0' && entry->d_name[3] <= '9')
724 ++efficiencyCores;
725 }
726
727 closedir(dir);
728 }
729
730 if (efficiencyCores == 0) {
731 // TODO: Go through all CPUs packages and CPUs and determine the differences between them.
732 // There are many metrics.
733 // 1) We will assume all CPUs to be of the same architecture.
734 // 2) Same CPU architecture but different cache sizes. Smaller ones are "efficiency" cores.
735 // This is the AMD way. Still, these are about the same things so maybe we would just treat
736 // all such cores as performance cores.
737 // 3) Different max frequencies on different cores. This might be indicative enough.
738 // There is also an optional parameter present on ARM CPUs:
739 // https://www.kernel.org/doc/Documentation/devicetree/bindings/arm/cpu-capacity.txt
740 // In this case, we'd treat CPUs with the highest capacity-dmips-mhz as performance cores,
741 // and consider the rest as efficiency cores.
742
743 // ...
744 }
745#endif
746 return efficiencyCores;
747 }
748
749 private:
750 static void* thread_func(void* pCtx) {
751 auto& ctx = *(ThreadCtx*)pCtx;
752
753 detail::tl_workerCtx = &ctx;
754
755 // Set the worker thread name.
756 // Needs to be called from inside the thread because some platforms
757 // can change the name only when run from the specific thread.
758 ctx.tp->set_thread_name(ctx.workerIdx, ctx.prio);
759
760 // Set the worker thread priority
761 ctx.tp->set_thread_priority(ctx.workerIdx, ctx.prio);
762
763 // Process jobs
764 ctx.tp->worker_loop(ctx);
765
766 detail::tl_workerCtx = nullptr;
767
768 return nullptr;
769 }
770
774 void create_thread(uint32_t workerIdx, JobPriority prio) {
775 // Idx 0 is reserved for the main thread
776 GAIA_ASSERT(workerIdx > 0);
777
778 auto& ctx = m_workersCtx[workerIdx];
779 ctx.tp = this;
780 ctx.workerIdx = workerIdx;
781 ctx.prio = prio;
782 ctx.threadCreated = false;
783
784#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
785 m_workers[workerIdx - 1] = std::thread([&ctx]() {
786 thread_func((void*)&ctx);
787 });
788#else
789 pthread_attr_t attr{};
790 int ret = pthread_attr_init(&attr);
791 if (ret != 0) {
792 GAIA_LOG_W("pthread_attr_init failed for worker thread %u. ErrCode = %d", workerIdx, ret);
793 return;
794 }
795
797 // Apple's recommendation for Apple Silicon for games / high-perf software
798 // ========================================================================
799 // Per frame | Scheduling policy | QoS class / Priority
800 // ========================================================================
801 // Main thread | SCHED_OTHER | QOS_CLASS_USER_INTERACTIVE (47)
802 // Render/Audio thread | SCHED_RR | 45
803 // Workers High Prio | SCHED_RR | 39-41
804 // Workers Low Prio | SCHED_OTHER | QOS_CLASS_USER_INTERACTIVE (38)
805 // ========================================================================
806 // Multiple-frames | |
807 // ========================================================================
808 // Async Workers High Prio| SCHED_OTHER | QOS_CLASS_USER_INITIATED (37)
809 // Async Workers Low Prio | SCHED_OTHER | QOS_CLASS_DEFAULT (31)
810 // Prefetching/Streaming | SCHED_OTHER | QOS_CLASS_UTILITY (20)
811 // ========================================================================
812
813 if (prio == JobPriority::Low) {
814 #if GAIA_PLATFORM_APPLE
815 ret = pthread_attr_set_qos_class_np(&attr, QOS_CLASS_USER_INTERACTIVE, -9); // 47-9=38
816 if (ret != 0) {
817 GAIA_LOG_W(
818 "pthread_attr_set_qos_class_np failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
819 (uint32_t)prio, ret);
820 }
821 #else
822 ret = pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
823 if (ret != 0) {
824 GAIA_LOG_W(
825 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
826 (uint32_t)prio, ret);
827 }
828
829 int prioMax = core::get_min(38, sched_get_priority_max(SCHED_OTHER));
830 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_OTHER));
831 int prioUse = core::get_min(prioMin + 5, prioMax);
832 prioUse = core::get_max(prioUse, prioMin);
833 sched_param param{};
834 param.sched_priority = prioUse;
835
836 ret = pthread_attr_setschedparam(&attr, &param);
837 if (ret != 0) {
838 GAIA_LOG_W(
839 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
840 param.sched_priority, workerIdx, (uint32_t)prio, ret);
841 }
842 #endif
843 } else {
844 ret = pthread_attr_setschedpolicy(&attr, SCHED_RR);
845 if (ret != 0) {
846 GAIA_LOG_W(
847 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
848 (uint32_t)prio, ret);
849 }
850
851 int prioMax = core::get_min(41, sched_get_priority_max(SCHED_RR));
852 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_RR));
853 int prioUse = core::get_max(prioMax - 5, prioMin);
854 prioUse = core::get_min(prioUse, prioMax);
855 sched_param param{};
856 param.sched_priority = prioUse;
857
858 ret = pthread_attr_setschedparam(&attr, &param);
859 if (ret != 0) {
860 GAIA_LOG_W(
861 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
862 param.sched_priority, workerIdx, (uint32_t)prio, ret);
863 }
864 }
865
866 // Create the thread with given attributes
867 ret = pthread_create(&m_workers[workerIdx - 1], &attr, thread_func, (void*)&ctx);
868 if (ret != 0) {
869 GAIA_LOG_W("pthread_create failed for worker thread %u. ErrCode = %d", workerIdx, ret);
870 } else {
871 ctx.threadCreated = true;
872 }
873
874 pthread_attr_destroy(&attr);
875#endif
876
877 // Stick each thread to a specific CPU core if possible
878 set_thread_affinity(workerIdx);
879 }
880
883 void join_thread(uint32_t workerIdx) {
884 if GAIA_UNLIKELY (workerIdx > m_workers.size())
885 return;
886
887#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
888 auto& t = m_workers[workerIdx - 1];
889 if (t.joinable())
890 t.join();
891#else
892 auto& ctx = m_workersCtx[workerIdx];
893 if (!ctx.threadCreated)
894 return;
895
896 auto& t = m_workers[workerIdx - 1];
897 pthread_join(t, nullptr);
898 ctx.threadCreated = false;
899#endif
900 }
901
902 void create_worker_threads(uint32_t& workerIdx, JobPriority prio, uint32_t count) {
903 for (uint32_t i = 0; i < count; ++i)
904 create_thread(workerIdx++, prio);
905 }
906
907 void set_thread_priority([[maybe_unused]] uint32_t workerIdx, [[maybe_unused]] JobPriority priority) {
908#if GAIA_PLATFORM_WINDOWS
909 HANDLE nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
910
911 THREAD_POWER_THROTTLING_STATE state{};
912 state.Version = THREAD_POWER_THROTTLING_CURRENT_VERSION;
913 if (priority == JobPriority::High) {
914 // HighQoS
915 // Turn EXECUTION_SPEED throttling off.
916 // ControlMask selects the mechanism and StateMask is set to zero as mechanisms should be turned off.
917 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
918 state.StateMask = 0;
919 } else {
920 // EcoQoS
921 // Turn EXECUTION_SPEED throttling on.
922 // ControlMask selects the mechanism and StateMask declares which mechanism should be on or off.
923 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
924 state.StateMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
925 }
926
927 BOOL ret = SetThreadInformation(nativeHandle, ThreadPowerThrottling, &state, sizeof(state));
928 if (ret != TRUE) {
929 GAIA_LOG_W("SetThreadInformation failed for thread %u", workerIdx);
930 return;
931 }
932#else
933 // Done when the thread is created
934#endif
935 }
936
937 void set_thread_affinity([[maybe_unused]] uint32_t workerIdx) {
938 // NOTE:
939 // Some cores might have multiple logic threads, there might be
940 // more sockets and some cores might even be physically different
941 // form others (performance vs efficiency cores).
942 // Because of that, do not handle affinity and let the OS figure it out.
943 // All treads created by the pool are setting thread priorities to make
944 // it easier for the OS.
945
946 // #if GAIA_PLATFORM_WINDOWS
947 // HANDLE nativeHandle = (HANDLE)m_workers[workerIdx-1].native_handle();
948 //
949 // auto mask = SetThreadAffinityMask(nativeHandle, 1ULL << workerIdx);
950 // if (mask <= 0)
951 // GAIA_LOG_W("Issue setting thread affinity for worker thread %u!", workerIdx);
952 // #elif GAIA_PLATFORM_APPLE
953 // // Do not do affinity for MacOS. If is not supported for Apple Silicon and
954 // // Intel MACs are deprecated anyway.
955 // // TODO: Consider supporting this at least for Intel MAC as there are still
956 // // quite of few of them out there.
957 // #elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
958 // pthread_t nativeHandle = (pthread_t)m_workers[workerIdx-1].native_handle();
959 //
960 // cpu_set_t cpuSet;
961 // CPU_ZERO(&cpuSet);
962 // CPU_SET(workerIdx, &cpuSet);
963 //
964 // auto ret = pthread_setaffinity_np(nativeHandle, sizeof(cpuSet), &cpuSet);
965 // if (ret != 0)
966 // GAIA_LOG_W("Issue setting thread affinity for worker thread %u!", workerIdx);
967 //
968 // ret = pthread_getaffinity_np(nativeHandle, sizeof(cpuSet), &cpuSet);
969 // if (ret != 0)
970 // GAIA_LOG_W("Thread affinity could not be set for worker thread %u!", workerIdx);
971 // #endif
972 }
973
977 void set_thread_name(uint32_t workerIdx, JobPriority prio) {
978#if GAIA_PROF_USE_PROFILER_THREAD_NAME
979 char threadName[16]{};
980 GAIA_STRFMT(threadName, 16, "worker_%s_%u", prio == JobPriority::High ? "HI" : "LO", workerIdx);
981 GAIA_PROF_THREAD_NAME(threadName);
982#elif GAIA_PLATFORM_WINDOWS
983 auto nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
984
985 TOSApiFunc_SetThreadDescription pSetThreadDescFunc = nullptr;
986 if (auto* pModule = GetModuleHandleA("kernel32.dll")) {
987 auto* pFunc = GetProcAddress(pModule, "SetThreadDescription");
988 pSetThreadDescFunc = reinterpret_cast<TOSApiFunc_SetThreadDescription>(reinterpret_cast<void*>(pFunc));
989 }
990 if (pSetThreadDescFunc != nullptr) {
991 wchar_t threadName[16]{};
992 swprintf_s(threadName, L"worker_%s_%u", prio == JobPriority::High ? L"HI" : L"LO", workerIdx);
993
994 auto hr = pSetThreadDescFunc(nativeHandle, threadName);
995 if (FAILED(hr)) {
996 GAIA_LOG_W(
997 "Issue setting name for worker %s thread %u!", prio == JobPriority::High ? "HI" : "LO", workerIdx);
998 }
999 } else {
1000 #if defined _MSC_VER
1001 char threadName[16]{};
1002 GAIA_STRFMT(threadName, 16, "worker_%s_%u", prio == JobPriority::High ? "HI" : "LO", workerIdx);
1003
1004 THREADNAME_INFO info{};
1005 info.dwType = 0x1000;
1006 info.szName = threadName;
1007 info.dwThreadID = GetThreadId(nativeHandle);
1008
1009 __try {
1010 RaiseException(0x406D1388, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
1011 } __except (EXCEPTION_EXECUTE_HANDLER) {
1012 }
1013 #endif
1014 }
1015#elif GAIA_PLATFORM_APPLE
1016 char threadName[16]{};
1017 GAIA_STRFMT(threadName, 16, "worker_%s_%u", prio == JobPriority::High ? "HI" : "LO", workerIdx);
1018 auto ret = pthread_setname_np(threadName);
1019 if (ret != 0)
1020 GAIA_LOG_W("Issue setting name for worker %s thread %u!", prio == JobPriority::High ? "HI" : "LO", workerIdx);
1021#elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
1022 auto nativeHandle = m_workers[workerIdx - 1];
1023
1024 char threadName[16]{};
1025 GAIA_STRFMT(threadName, 16, "worker_%s_%u", prio == JobPriority::High ? "HI" : "LO", workerIdx);
1026 GAIA_PROF_THREAD_NAME(threadName);
1027 auto ret = pthread_setname_np(nativeHandle, threadName);
1028 if (ret != 0)
1029 GAIA_LOG_W("Issue setting name for worker %s thread %u!", prio == JobPriority::High ? "HI" : "LO", workerIdx);
1030#endif
1031 }
1032
1035 GAIA_NODISCARD bool main_thread() const {
1036 return std::this_thread::get_id() == m_mainThreadId;
1037 }
1038
1043 void main_thread_tick() {
1044 auto& ctx = *detail::tl_workerCtx;
1045
1046 // Keep executing while there is work
1047 while (true) {
1048 JobHandle jobHandle;
1049 if (!try_fetch_job(ctx, jobHandle))
1050 break;
1051
1052 (void)run(jobHandle, &ctx);
1053 }
1054 }
1055
1056 bool try_fetch_job(ThreadCtx& ctx, JobHandle& jobHandle) {
1057 // Try getting a job from the local queue
1058 if (ctx.jobQueue.try_pop(jobHandle))
1059 return true;
1060
1061 // Try getting a job from the global queue
1062 if (m_jobQueue[(uint32_t)ctx.prio].try_pop(jobHandle))
1063 return true;
1064
1065 // Could not get a job, try stealing from other workers.
1066 const auto workerCnt = m_workersCtx.size();
1067 for (uint32_t i = 0; i < workerCnt;) {
1068 // We need to skip our worker
1069 if (i == ctx.workerIdx) {
1070 ++i;
1071 continue;
1072 }
1073
1074 // Try stealing a job
1075 const auto res = m_workersCtx[i].jobQueue.try_steal(jobHandle);
1076
1077 // Race condition, try again from the same context
1078 if (!res)
1079 continue;
1080
1081 // Stealing can return true if the queue is empty.
1082 // We return right away only if we receive a valid handle which means
1083 // when there was an idle job in the queue.
1084 if (res && jobHandle != (JobHandle)JobNull_t{})
1085 return true;
1086
1087 ++i;
1088 }
1089
1090 return false;
1091 }
1092
1096 void worker_loop(ThreadCtx& ctx) {
1097 while (true) {
1098 // Wait for work
1099 m_sem.wait();
1100
1101 // Keep executing while there is work
1102 while (true) {
1103 JobHandle jobHandle;
1104 if (!try_fetch_job(ctx, jobHandle))
1105 break;
1106
1107 (void)run(jobHandle, detail::tl_workerCtx);
1108 }
1109
1110 // Check if the worker can keep running
1111 const bool stop = m_stop.load();
1112 if (stop)
1113 break;
1114 }
1115 }
1116
1118 void reset() {
1119 if (m_workers.empty())
1120 return;
1121
1122 // Request stopping
1123 m_stop.store(true);
1124
1125 // Signal all threads
1126 m_sem.release((int32_t)m_workers.size());
1127
1128 auto* ctx = detail::tl_workerCtx;
1129 if (ctx == nullptr) {
1130 // reset() can be reached during static teardown from a thread that never
1131 // entered the pool and therefore has no TLS worker context bound.
1132 ctx = &m_workersCtx[0];
1133 detail::tl_workerCtx = ctx;
1134 }
1135
1136 // Finish remaining jobs
1137 JobHandle jobHandle;
1138 while (try_fetch_job(*ctx, jobHandle)) {
1139 run(jobHandle, ctx);
1140 }
1141
1142 detail::tl_workerCtx = nullptr;
1143
1144 // Join threads with the main one
1145 GAIA_FOR(m_workers.size()) join_thread(i + 1);
1146
1147 // All threads have been stopped. Allow new threads to run if necessary.
1148 m_stop.store(false);
1149 }
1150
1152 template <typename TJob>
1153 JobPriority final_prio(const TJob& job) {
1154 const auto cntWorkers = m_workersCnt[(uint32_t)job.priority];
1155 return cntWorkers > 0
1156 // If there is enough workers, keep the priority
1157 ? job.priority
1158 // Not enough workers, use the other priority that has workers
1159 : (JobPriority)(((uint32_t)job.priority + 1U) % (uint32_t)JobPriorityCnt);
1160 }
1161
1162 void signal_edges(JobContainer& jobData) {
1163 const auto max = jobData.edges.depCnt;
1164
1165 // Nothing to do if there are no dependencies
1166 if (max == 0)
1167 return;
1168
1169 auto* ctx = detail::tl_workerCtx;
1170
1171 // One dependency
1172 if (max == 1) {
1173 auto depHandle = jobData.edges.dep;
1174#if GAIA_LOG_JOB_STATES
1175 GAIA_LOG_N("SIGNAL %u.%u -> %u.%u", jobData.idx, jobData.gen, depHandle.id(), depHandle.gen());
1176#endif
1177
1178 // See the conditions can't be satisfied for us to submit the job we skip
1179 auto& depData = m_jobManager.data(depHandle);
1180 if (!JobManager::signal_edge(depData))
1181 return;
1182
1183 // Submit all jobs that can are ready
1184 process(std::span(&depHandle, 1), ctx);
1185 return;
1186 }
1187
1188 // Multiple dependencies. The array has to be set
1189 GAIA_ASSERT(jobData.edges.pDeps != nullptr);
1190
1191 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * max);
1192 uint32_t cnt = 0;
1193 GAIA_FOR(max) {
1194 auto depHandle = jobData.edges.pDeps[i];
1195
1196 // See if all conditions were satisfied for us to submit the job
1197 auto& depData = m_jobManager.data(depHandle);
1198 if (!JobManager::signal_edge(depData))
1199 continue;
1200
1201 pHandles[cnt++] = depHandle;
1202 }
1203
1204 // Submit all jobs that can are ready
1205 process(std::span(pHandles, cnt), ctx);
1206 }
1207
1208 void process(std::span<JobHandle> jobHandles, ThreadCtx* ctx) {
1209 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * jobHandles.size());
1210 uint32_t handlesCnt = 0;
1211
1212 for (auto handle: jobHandles) {
1213 auto& jobData = m_jobManager.data(handle);
1214 m_jobManager.processing(jobData);
1215
1216 // Jobs that have no functor assigned don't need to be enqueued.
1217 // We can "run" them right away. The only time where it makes
1218 // sense to create such a job is to create a sync job. E.g. when you
1219 // need to wait for N jobs, rather than waiting for each of them
1220 // separately you make them a dependency of a dummy/sync job and
1221 // wait just for that one.
1222 if (!jobData.func.operator bool())
1223 (void)run(handle, ctx);
1224 else
1225 pHandles[handlesCnt++] = handle;
1226 }
1227
1228 std::span handles(pHandles, handlesCnt);
1229 while (!handles.empty()) {
1230 // Try pushing all jobs
1231 uint32_t pushed = 0;
1232 if (ctx != nullptr) {
1233 pushed = ctx->jobQueue.try_push(handles);
1234 } else {
1235 for (auto handle: handles) {
1236 if (m_jobQueue[(uint32_t)handle.prio()].try_push(handle))
1237 pushed++;
1238 else
1239 break;
1240 }
1241 }
1242
1243 // Lock the semaphore with the number of jobs me managed to push.
1244 // Number of workers if the upper bound.
1245 const auto cntWorkers = ctx != nullptr ? m_workersCnt[(uint32_t)ctx->prio] : m_workers.size();
1246 const auto cnt = (int32_t)core::get_min(pushed, cntWorkers);
1247 m_sem.release(cnt);
1248
1249 handles = handles.subspan(pushed);
1250 if (!handles.empty()) {
1251 // The queue was full. Execute the job right away.
1252 run(handles[0], ctx);
1253 handles = handles.subspan(1);
1254 }
1255 }
1256 }
1257
1258 bool run(JobHandle jobHandle, ThreadCtx* ctx) {
1259 if (jobHandle == (JobHandle)JobNull_t{})
1260 return false;
1261
1262 auto& jobData = m_jobManager.data(jobHandle);
1263 const bool manualDelete = (jobData.flags & JobCreationFlags::ManualDelete) != 0U;
1264 const bool canWait = (jobData.flags & JobCreationFlags::CanWait) != 0U;
1265
1266 m_jobManager.executing(jobData, ctx->workerIdx);
1267
1268 if (m_blockedInWorkUntil.load() != 0) {
1269 const auto blockedCnt = m_blockedInWorkUntil.exchange(0);
1270 if (blockedCnt != 0)
1271 Futex::wake(&m_blockedInWorkUntil, detail::WaitMaskAll);
1272 }
1273
1274 GAIA_ASSERT(jobData.idx != (uint32_t)-1 && jobData.data.gen != (uint32_t)-1);
1275
1276 // Run the functor associated with the job
1277 m_jobManager.run(jobData);
1278
1279 // Signal the edges and release memory allocated for them if possible
1280 signal_edges(jobData);
1281 JobManager::free_edges(jobData);
1282
1283 // Signal we finished
1284 ctx->event.set();
1285 if (canWait) {
1286 const auto* pFutexValue = &jobData.state;
1287 Futex::wake(pFutexValue, detail::WaitMaskAll);
1288 }
1289
1290 if (!manualDelete)
1291 del(jobHandle);
1292
1293 return true;
1294 }
1295 };
1296
1297 GAIA_MSVC_WARNING_POP()
1298 } // namespace mt
1299} // namespace gaia
Array of elements of type.
Definition sarray_ext_impl.h:27
Definition span_impl.h:99
Definition jobmanager.h:152
static void run(JobContainer &jobData)
Execute the functor associated with the job container.
Definition jobmanager.h:201
void free_job(JobHandle jobHandle)
Invalidates jobHandle by resetting its index in the job pool. Every time a job is deallocated its gen...
Definition jobmanager.h:188
void dep(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition jobmanager.h:237
GAIA_NODISCARD JobHandle alloc_job(const Job &job)
Allocates a new job container identified by a unique JobHandle.
Definition jobmanager.h:167
void dep_refresh(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition jobmanager.h:277
An optimized version of Semaphore that avoids expensive system calls when the counter is greater than...
Definition semaphore_fast.h:12
bool wait()
Decrements semaphore count by 1. If the count is already 0, it waits indefinitely until semaphore cou...
Definition semaphore_fast.h:39
void release(int32_t count=1)
Increments semaphore count by the specified amount.
Definition semaphore_fast.h:26
Definition spinlock.h:8
Definition threadpool.h:87
void set_workers_high_prio_inter(uint32_t &workerIdx, uint32_t count)
Updates the number of worker threads participating at high priority workloads.
Definition threadpool.h:206
void update()
Uses the main thread to help with jobs processing.
Definition threadpool.h:631
JobHandle sched_par(JobParallel &job, uint32_t itemsToProcess, uint32_t groupSize)
Schedules a job to run on worker threads in parallel.
Definition threadpool.h:483
void reset(std::span< JobHandle > jobHandles)
Waits for jobHandles to finish and resets them to a reusable state.
Definition threadpool.h:419
JobHandle sched(Job &job)
Schedules a job to run on a worker thread.
Definition threadpool.h:457
void dep(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition threadpool.h:267
void set_max_workers(uint32_t count, uint32_t countHighPrio)
Set the maximum number of workers for this system.
Definition threadpool.h:168
void submit(JobHandle jobHandle)
Pushes jobHandle into the internal queue so worker threads can pick it up and execute it....
Definition threadpool.h:397
JobHandle sched(Job &job, JobHandle dependsOn)
Schedules a job to run on a worker thread.
Definition threadpool.h:469
void dep_refresh(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition threadpool.h:304
void dep_refresh(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition threadpool.h:291
void submit(std::span< JobHandle > jobHandles)
Pushes jobHandles into the internal queue so worker threads can pick them up and execute them....
Definition threadpool.h:366
static GAIA_NODISCARD uint32_t hw_efficiency_cores_cnt()
Returns the number of efficiency cores of the system.
Definition threadpool.h:645
void make_main_thread()
Make the calling thread the effective main thread from the thread pool perspective.
Definition threadpool.h:154
void wait(JobHandle jobHandle)
Wait until a job associated with the jobHandle finishes executing. Cleans up any job allocations and ...
Definition threadpool.h:584
JobHandle add(TJob &&job)
Creates a threadpool job from job.
Definition threadpool.h:315
void set_workers_low_prio(uint32_t count)
Updates the number of worker threads participating at low priority workloads.
Definition threadpool.h:253
void del(JobHandle jobHandle)
Deletes a job handle jobHandle from the threadpool.
Definition threadpool.h:344
void reset(JobHandle jobHandle)
Waits for jobHandle to finish and resets it to a reusable state.
Definition threadpool.h:448
void set_workers_high_prio(uint32_t count)
Updates the number of worker threads participating at high priority workloads.
Definition threadpool.h:242
GAIA_NODISCARD uint32_t workers() const
Returns the number of worker threads.
Definition threadpool.h:159
void dep(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition threadpool.h:278
void set_workers_low_prio_inter(uint32_t &workerIdx, uint32_t count)
Updates the number of worker threads participating at low priority workloads.
Definition threadpool.h:225
static GAIA_NODISCARD uint32_t hw_thread_cnt()
Returns the number of HW threads available on the system. 1 is minimum.
Definition threadpool.h:638
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9
Definition utility.h:123
static Result wait(const std::atomic_uint32_t *pFutexValue, uint32_t expected, uint32_t waitMask)
Suspends the caller on the futex while its value remains expected.
Definition futex.h:69
static uint32_t wake(const std::atomic_uint32_t *pFutexValue, uint32_t wakeCount, uint32_t wakeMask=detail::WaitMaskAny)
Wakes up to wakeCount waiters whose waitMask matches wakeMask.
Definition futex.h:101
Definition jobcommon.h:39
std::function< void()> func
Function to execute when running the job.
Definition jobmanager.h:86
JobPriority prio
Job priority.
Definition jobmanager.h:80
std::atomic_uint32_t state
Current state of the job Consist of upper and bottom part. Least significant bits = special purpose....
Definition jobmanager.h:78
Definition jobhandle.h:13
Definition jobhandle.h:85
Definition jobcommon.h:44
Definition jobcommon.h:33
Definition jobcommon.h:51