Gaia-ECS v0.9.3
A simple and powerful entity component system
Loading...
Searching...
No Matches
threadpool.h
1#pragma once
2
3#include "gaia/config/config.h"
4#include "gaia/config/profiler.h"
5
6#if GAIA_PLATFORM_WINDOWS
7 #include <cstdio>
8 #include <windows.h>
9#endif
10
11#define GAIA_THREAD_OFF 0
12#define GAIA_THREAD_STD 1
13#define GAIA_THREAD_PTHREAD 2
14
15#if GAIA_PLATFORM_WINDOWS || GAIA_PLATFORM_WASM
16 #include <thread>
17 // Emscripten supports std::thread if compiled with -sUSE_PTHREADS=1.
18 // Otherwise, std::thread calls are no-ops that compile but do not run concurrently.
19 #define GAIA_THREAD std::thread
20 #define GAIA_THREAD_PLATFORM GAIA_THREAD_STD
21#elif GAIA_PLATFORM_APPLE
22 #include <pthread.h>
23 #include <pthread/sched.h>
24 #include <sys/qos.h>
25 #include <sys/sysctl.h>
26 #define GAIA_THREAD pthread_t
27 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
28#elif GAIA_PLATFORM_LINUX
29 #include <dirent.h>
30 #include <fcntl.h>
31 #include <pthread.h>
32 #include <unistd.h>
33 #define GAIA_THREAD pthread_t
34 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
35#elif GAIA_PLATFORM_FREEBSD
36 #include <pthread.h>
37 #include <sys/sysctl.h>
38 #define GAIA_THREAD pthread_t
39 #define GAIA_THREAD_PLATFORM GAIA_THREAD_PTHREAD
40#endif
41
42#if GAIA_PLATFORM_WINDOWS
43 #include <malloc.h>
44#else
45 #include <alloca.h>
46#endif
47#include <atomic>
48#include <thread>
49
50#include "gaia/cnt/sarray_ext.h"
51#include "gaia/core/span.h"
52#include "gaia/core/utility.h"
53#include "gaia/util/logging.h"
54
55#include "gaia/mt/event.h"
56#include "gaia/mt/futex.h"
57#include "gaia/mt/jobcommon.h"
58#include "gaia/mt/jobhandle.h"
59#include "gaia/mt/jobmanager.h"
60#include "gaia/mt/jobqueue.h"
61#include "gaia/mt/semaphore_fast.h"
62#include "gaia/mt/spinlock.h"
63
64namespace gaia {
65 namespace mt {
66#if GAIA_PLATFORM_WINDOWS
67 extern "C" typedef HRESULT(WINAPI* TOSApiFunc_SetThreadDescription)(HANDLE, PCWSTR);
68
69 #pragma pack(push, 8)
70 typedef struct tagTHREADNAME_INFO {
71 DWORD dwType; // Must be 0x1000.
72 LPCSTR szName; // Pointer to name (in user addr space).
73 DWORD dwThreadID; // Thread ID (-1=caller thread).
74 DWORD dwFlags; // Reserved for future use, must be zero.
75 } THREADNAME_INFO;
76 #pragma pack(pop)
77#endif
78
79 namespace detail {
81 inline thread_local ThreadCtx* tl_workerCtx;
82 } // namespace detail
83
84 GAIA_MSVC_WARNING_PUSH()
85 GAIA_MSVC_WARNING_DISABLE(4324)
86
87 class GAIA_API ThreadPool final {
88 friend class JobManager;
89
94 static constexpr uint32_t MaxWorkers = JobState::DEP_BITS;
95
97 std::thread::id m_mainThreadId;
98
100 std::atomic_bool m_stop{};
104 GAIA_ALIGNAS(128) cnt::sarray_ext<ThreadCtx, MaxWorkers> m_workersCtx;
106 MpmcQueue<JobHandle, 1024> m_jobQueue[JobPriorityCnt];
108 MpmcQueue<JobHandle, 1024> m_jobQueueBackground;
110 uint32_t m_frameWorkersCnt = 0;
112 uint32_t m_backgroundWorkersCnt = 0;
114 uint32_t m_workersCnt[JobPriorityCnt]{};
119 uint32_t m_workerThreadsCnt[JobPriorityCnt]{};
121 SemaphoreFast m_sem[JobPriorityCnt];
123 SemaphoreFast m_semBackground;
124
126 std::atomic_uint32_t m_blockedInWorkUntil;
127
129 JobManager m_jobManager;
135 GAIA_PROF_MUTEX(SpinLock, m_jobAllocMtx);
136
137 private:
138 ThreadPool() {
139 m_stop.store(false);
140
141 make_main_thread();
142
143 const auto hwThreads = hw_thread_cnt();
144 const auto hwEffThreads = hw_efficiency_cores_cnt();
145 uint32_t hiPrioWorkers = hwThreads;
146 if (hwEffThreads < hwThreads)
147 hiPrioWorkers -= hwEffThreads;
148
149 set_max_workers(hwThreads, hiPrioWorkers);
150 }
151
152 ThreadPool(ThreadPool&&) = delete;
153 ThreadPool(const ThreadPool&) = delete;
154 ThreadPool& operator=(ThreadPool&&) = delete;
155 ThreadPool& operator=(const ThreadPool&) = delete;
156
157 public:
158 static ThreadPool& get() {
159 static ThreadPool threadPool;
160 return threadPool;
161 }
162
163 ~ThreadPool() {
164 reset();
165 }
166
169 m_mainThreadId = std::this_thread::get_id();
170 }
171
173 GAIA_NODISCARD uint32_t workers() const {
174 return m_frameWorkersCnt;
175 }
176
178 GAIA_NODISCARD uint32_t background_workers() const {
179 return m_backgroundWorkersCnt;
180 }
181
188 void set_max_workers(uint32_t count, uint32_t countHighPrio) {
189 const auto maxFrameWorkers = MaxWorkers - m_backgroundWorkersCnt;
190 const auto workersCnt = core::get_max(core::get_min(maxFrameWorkers, count), 1U);
191 countHighPrio = core::get_min(countHighPrio, workersCnt);
192
193 // Stop all threads first
194 reset();
195
196 // Reset previous worker contexts
197 for (auto& ctx: m_workersCtx)
198 ctx.reset();
199
200 m_frameWorkersCnt = workersCnt - 1;
201
202 // The main thread uses context 0. Frame workers follow, and
203 // background workers are appended after them.
204 m_workersCtx.resize(workersCnt + m_backgroundWorkersCnt);
205 // We also have the main thread so there's always one less worker spawned
206 m_workers.resize(m_frameWorkersCnt + m_backgroundWorkersCnt);
207
208 // First worker is considered the main thread.
209 // It is also assigned high priority but it doesn't really matter.
210 // The main thread can steal any jobs, both low and high priority.
211 detail::tl_workerCtx = m_workersCtx.data();
212 m_workersCtx[0].tp = this;
213 m_workersCtx[0].workerIdx = 0;
214 m_workersCtx[0].prio = JobPriority::High;
215
216 // Reset the workers
217 for (auto& worker: m_workers)
218 worker = {};
219
220 // Create a new set of high and low priority threads (if any)
221 uint32_t workerIdx = 1;
222 set_workers_high_prio_inter(workerIdx, countHighPrio);
223 create_background_worker_threads(workerIdx);
224 }
225
230 void set_workers_high_prio_inter(uint32_t& workerIdx, uint32_t count) {
231 count = gaia::core::get_min(count, m_frameWorkersCnt);
232 m_workerThreadsCnt[0] = count;
233 m_workerThreadsCnt[1] = m_frameWorkersCnt - count;
234 m_workersCnt[0] = count + 1; // Main thread is always a priority worker
235 m_workersCnt[1] = m_workerThreadsCnt[1];
236
237 // Create a new set of high and low priority threads (if any)
238 create_worker_threads(workerIdx, JobPriority::High, m_workerThreadsCnt[0]);
239 create_worker_threads(workerIdx, JobPriority::Low, m_workerThreadsCnt[1]);
240 }
241
246 void set_workers_low_prio_inter(uint32_t& workerIdx, uint32_t count) {
247 const uint32_t realCnt = gaia::core::get_min(count, m_frameWorkersCnt);
248 m_workerThreadsCnt[0] = m_frameWorkersCnt - realCnt;
249 m_workerThreadsCnt[1] = realCnt;
250 m_workersCnt[0] = m_workerThreadsCnt[0] + 1; // Main thread is always a priority worker
251 m_workersCnt[1] = m_workerThreadsCnt[1];
252
253 // Create a new set of high and low priority threads (if any)
254 create_worker_threads(workerIdx, JobPriority::High, m_workerThreadsCnt[0]);
255 create_worker_threads(workerIdx, JobPriority::Low, m_workerThreadsCnt[1]);
256 }
257
260 void set_workers_high_prio(uint32_t count) {
261 // Stop all threads first
262 reset();
263 detail::tl_workerCtx = m_workersCtx.data();
264
265 uint32_t workerIdx = 1;
266 set_workers_high_prio_inter(workerIdx, count);
267 create_background_worker_threads(workerIdx);
268 }
269
272 void set_workers_low_prio(uint32_t count) {
273 // Stop all threads first
274 reset();
275 detail::tl_workerCtx = m_workersCtx.data();
276
277 uint32_t workerIdx = 1;
278 set_workers_low_prio_inter(workerIdx, count);
279 create_background_worker_threads(workerIdx);
280 }
281
287 void set_background_workers(uint32_t count) {
288 const auto maxBackgroundWorkers = MaxWorkers - 1;
289 count = core::get_min(maxBackgroundWorkers, count);
290
291 const auto frameWorkersCntOld = m_frameWorkersCnt;
292 const auto highWorkersCntOld = m_workerThreadsCnt[0];
293
294 // Stop all threads first
295 reset();
296
297 m_backgroundWorkersCnt = count;
298
299 const auto maxFrameWorkers = MaxWorkers - m_backgroundWorkersCnt - 1;
300 m_frameWorkersCnt = core::get_min(frameWorkersCntOld, maxFrameWorkers);
301
302 for (auto& ctx: m_workersCtx)
303 ctx.reset();
304
305 m_workersCtx.resize(m_frameWorkersCnt + 1 + m_backgroundWorkersCnt);
306 m_workers.resize(m_frameWorkersCnt + m_backgroundWorkersCnt);
307
308 detail::tl_workerCtx = m_workersCtx.data();
309 m_workersCtx[0].tp = this;
310 m_workersCtx[0].workerIdx = 0;
311 m_workersCtx[0].prio = JobPriority::High;
312
313 for (auto& worker: m_workers)
314 worker = {};
315
316 uint32_t workerIdx = 1;
317 set_workers_high_prio_inter(workerIdx, highWorkersCntOld);
318 create_background_worker_threads(workerIdx);
319 }
320
326 void dep(JobHandle jobFirst, JobHandle jobSecond) {
327 GAIA_ASSERT(main_thread());
328
329 m_jobManager.dep(std::span(&jobFirst, 1), jobSecond);
330 }
331
337 void dep(std::span<JobHandle> jobsFirst, JobHandle jobSecond) {
338 GAIA_ASSERT(main_thread());
339
340 m_jobManager.dep(jobsFirst, jobSecond);
341 }
342
350 void dep_refresh(JobHandle jobFirst, JobHandle jobSecond) {
351 GAIA_ASSERT(main_thread());
352
353 m_jobManager.dep_refresh(std::span(&jobFirst, 1), jobSecond);
354 }
355
363 void dep_refresh(std::span<JobHandle> jobsFirst, JobHandle jobSecond) {
364 GAIA_ASSERT(main_thread());
365
366 m_jobManager.dep_refresh(jobsFirst, jobSecond);
367 }
368
374 template <typename TJob>
375 JobHandle add(TJob job) {
376 GAIA_ASSERT(main_thread());
377
378 job.priority = final_prio(job);
379
380 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
381 core::lock_scope lock(mtx);
382 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
383
384 return m_jobManager.alloc_job(GAIA_MOV(job));
385 }
386
387 private:
388 void add_n(JobPriority prio, std::span<JobHandle> jobHandles) {
389 GAIA_ASSERT(main_thread());
390 GAIA_ASSERT(!jobHandles.empty());
391
392 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
393 core::lock_scope lock(mtx);
394 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
395
396 for (auto& jobHandle: jobHandles)
397 jobHandle = m_jobManager.alloc_job({{}, prio, JobCreationFlags::Default});
398 }
399
400 GAIA_NODISCARD ParallelCallbackHandle add_parallel_callback(JobArgsFunc callback, uint32_t refs) {
401 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
402 core::lock_scope lock(mtx);
403 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
404
405 return m_jobManager.alloc_parallel_callback(GAIA_MOV(callback), refs);
406 }
407
408 void release_parallel_callback(ParallelCallbackHandle handle) {
409 if (!m_jobManager.release_parallel_callback_ref(handle))
410 return;
411
412 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
413 core::lock_scope lock(mtx);
414 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
415
416 m_jobManager.free_parallel_callback(handle);
417 }
418
419 public:
423 void del([[maybe_unused]] JobHandle jobHandle) {
424 GAIA_ASSERT(jobHandle != (JobHandle)JobNull_t{});
425
426#if GAIA_ASSERT_ENABLED
427 {
428 const auto& jobData = m_jobManager.data(jobHandle);
429 GAIA_ASSERT(jobData.state == 0 || m_jobManager.done(jobData));
430 }
431#endif
432
433 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
434 core::lock_scope lock(mtx);
435 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
436
437 m_jobManager.free_job(jobHandle);
438 }
439
445 void submit(std::span<JobHandle> jobHandles) {
446 if (jobHandles.empty())
447 return;
448
449 GAIA_PROF_SCOPE(tp::submit);
450
451 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * jobHandles.size());
452
453 uint32_t cnt = 0;
454 for (auto handle: jobHandles) {
455 GAIA_ASSERT(handle != (JobHandle)JobNull_t{});
456
457 auto& jobData = m_jobManager.data(handle);
458 const auto state = m_jobManager.submit(jobData) & JobState::DEP_BITS_MASK;
459 // Jobs that were already submitted won't be submitted again.
460 // We can only accept the job if it has no pending dependencies.
461 if (state != 0)
462 continue;
463
464 pHandles[cnt++] = handle;
465 }
466
467 auto* ctx = detail::tl_workerCtx;
468 process(std::span(pHandles, cnt), ctx);
469 }
470
476 void submit(JobHandle jobHandle) {
477 submit(std::span(&jobHandle, 1));
478 }
479
480 void reset_state(std::span<JobHandle> jobHandles) {
481 if (jobHandles.empty())
482 return;
483
484 GAIA_PROF_SCOPE(tp::reset);
485
486 for (auto handle: jobHandles) {
487 auto& jobData = m_jobManager.data(handle);
488 m_jobManager.reset_state(jobData);
489 }
490 }
491
492 void reset_state(JobHandle jobHandle) {
493 reset_state(std::span(&jobHandle, 1));
494 }
495
498 void reset(std::span<JobHandle> jobHandles) {
499 if (jobHandles.empty())
500 return;
501
502 GAIA_ASSERT(main_thread());
503 GAIA_PROF_SCOPE(tp::reset_wait);
504
505 // Wait first to avoid resetting one handle while another one still depends on it.
506 for (auto handle: jobHandles) {
507 if (handle == (JobHandle)JobNull_t{})
508 continue;
509 wait(handle);
510 }
511
512 for (auto handle: jobHandles) {
513 if (handle == (JobHandle)JobNull_t{})
514 continue;
515
516 auto& jobData = m_jobManager.data(handle);
517 const auto state = jobData.state.load() & JobState::STATE_BITS_MASK;
518 // Auto-deleted jobs are released and cannot be reused through reset_state().
519 if (state == JobState::Released)
520 continue;
521
522 m_jobManager.reset_state(jobData);
523 }
524 }
525
527 void reset(JobHandle jobHandle) {
528 reset(std::span(&jobHandle, 1));
529 }
530
537 JobHandle jobHandle = add(GAIA_MOV(job));
538 submit(jobHandle);
539 return jobHandle;
540 }
541
551 job.flags = (JobCreationFlags)((uint8_t)job.flags | (uint8_t)JobCreationFlags::Background);
552 JobHandle jobHandle = add(GAIA_MOV(job));
553 submit(jobHandle);
554 return jobHandle;
555 }
556
563 JobHandle sched(Job job, JobHandle dependsOn) {
564 JobHandle jobHandle = add(GAIA_MOV(job));
565 dep(jobHandle, dependsOn);
566 submit(jobHandle);
567 return jobHandle;
568 }
569
577 JobHandle sched_par(JobParallel job, uint32_t itemsToProcess, uint32_t groupSize) {
578 GAIA_ASSERT(main_thread());
579
580 // Empty data set are considered wrong inputs
581 GAIA_ASSERT(itemsToProcess != 0);
582 if (itemsToProcess == 0)
583 return JobNull;
584
585 // Don't add new jobs once stop was requested
586 if GAIA_UNLIKELY (m_stop)
587 return JobNull;
588
589 // Make sure the right priority is selected
590 const auto prio = job.priority = final_prio(job);
591
592 // No group size was given, make a guess based on the set size
593 if (groupSize == 0) {
594 const auto cntWorkers = m_workersCnt[(uint32_t)prio];
595 groupSize = (itemsToProcess + cntWorkers - 1) / cntWorkers;
596
597 // If there are too many items we split them into multiple jobs.
598 // This way, if we wait for the result and some workers finish
599 // with our task faster, the finished worker can pick up a new
600 // job faster.
601 // On the other hand, too little items probably don't deserve
602 // multiple jobs.
603 constexpr uint32_t maxUnitsOfWorkPerGroup = 8;
604 groupSize = groupSize / maxUnitsOfWorkPerGroup;
605 if (groupSize <= 0)
606 groupSize = 1;
607 }
608
609 const auto jobs = (itemsToProcess + groupSize - 1) / groupSize;
610
611 // Only one job is created, use the job directly.
612 // Generally, this is the case we would want to avoid because it means this particular case
613 // is not worth of being scheduled via sched_par. However, we can never know for sure what
614 // the reason for that is so let's stay silent.
615 if (jobs == 1) {
616 const uint32_t groupJobIdxEnd = groupSize < itemsToProcess ? groupSize : itemsToProcess;
617 auto groupFunc = GAIA_MOV(job.func);
618 auto groupJobFunc = [func = GAIA_MOV(groupFunc), groupJobIdxEnd]() mutable {
619 JobArgs args;
620 args.idxStart = 0;
621 args.idxEnd = groupJobIdxEnd;
622 func(args);
623 };
624
625 auto handle = add(Job{GAIA_MOV(groupJobFunc), prio, JobCreationFlags::Default});
626 submit(handle);
627 return handle;
628 }
629
630 // Multiple jobs need to be parallelized.
631 // Create a sync job and assign it as their dependency.
632 auto callbackHandle = add_parallel_callback(GAIA_MOV(job.func), jobs);
633
634 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * (jobs + 1));
635 std::span<JobHandle> handles(pHandles, jobs + 1);
636
637 add_n(prio, handles);
638
639#if GAIA_ASSERT_ENABLED
640 for (auto jobHandle: handles)
641 GAIA_ASSERT(m_jobManager.is_clear(jobHandle));
642#endif
643
644 // Work jobs
645 for (uint32_t jobIndex = 0; jobIndex < jobs; ++jobIndex) {
646 const uint32_t groupJobIdxStart = jobIndex * groupSize;
647 const uint32_t groupJobIdxStartPlusGroupSize = groupJobIdxStart + groupSize;
648 const uint32_t groupJobIdxEnd =
649 groupJobIdxStartPlusGroupSize < itemsToProcess ? groupJobIdxStartPlusGroupSize : itemsToProcess;
650
651 auto groupJobFunc = [this, callbackHandle, groupJobIdxStart, groupJobIdxEnd]() {
652 JobArgs args;
653 args.idxStart = groupJobIdxStart;
654 args.idxEnd = groupJobIdxEnd;
655 m_jobManager.invoke_parallel_callback(callbackHandle, args);
656 release_parallel_callback(callbackHandle);
657 };
658
659 auto& jobData = m_jobManager.data(pHandles[jobIndex]);
660 jobData.func = util::SmallFunc::create(GAIA_MOV(groupJobFunc));
661 jobData.prio = prio;
662 }
663 // Sync job
664 {
665 auto& jobData = m_jobManager.data(pHandles[jobs]);
666 jobData.prio = prio;
667 }
668
669 // Assign the sync jobs as a dependency for work jobs
670 dep(handles.subspan(0, jobs), pHandles[jobs]);
671
672 // Sumbit the jobs to the threadpool.
673 // This is a point of no return. After this point no more changes to jobs are possible.
674 submit(handles);
675 return pHandles[jobs];
676 }
677
685 JobHandle sched_par(JobParallelRef job, uint32_t itemsToProcess, uint32_t groupSize) {
686 GAIA_ASSERT(main_thread());
687 GAIA_ASSERT(job.pCtx != nullptr);
688 GAIA_ASSERT(job.invoke != nullptr);
689
690 GAIA_ASSERT(itemsToProcess != 0);
691 if (itemsToProcess == 0)
692 return JobNull;
693
694 if GAIA_UNLIKELY (m_stop)
695 return JobNull;
696
697 const auto prio = job.priority = final_prio(job);
698
699 if (groupSize == 0) {
700 const auto cntWorkers = m_workersCnt[(uint32_t)prio];
701 groupSize = (itemsToProcess + cntWorkers - 1) / cntWorkers;
702
703 constexpr uint32_t maxUnitsOfWorkPerGroup = 8;
704 groupSize = groupSize / maxUnitsOfWorkPerGroup;
705 if (groupSize <= 0)
706 groupSize = 1;
707 }
708
709 const auto jobs = (itemsToProcess + groupSize - 1) / groupSize;
710
711 if (jobs == 1) {
712 const uint32_t groupJobIdxEnd = groupSize < itemsToProcess ? groupSize : itemsToProcess;
713 auto* pCtx = job.pCtx;
714 auto invoke = job.invoke;
715 auto groupJobFunc = [pCtx, invoke, groupJobIdxEnd]() {
716 JobArgs args;
717 args.idxStart = 0;
718 args.idxEnd = groupJobIdxEnd;
719 invoke(pCtx, args);
720 };
721
722 auto handle = add(Job{GAIA_MOV(groupJobFunc), prio, JobCreationFlags::Default});
723 submit(handle);
724 return handle;
725 }
726
727 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * (jobs + 1));
728 std::span<JobHandle> handles(pHandles, jobs + 1);
729
730 add_n(prio, handles);
731
732#if GAIA_ASSERT_ENABLED
733 for (auto jobHandle: handles)
734 GAIA_ASSERT(m_jobManager.is_clear(jobHandle));
735#endif
736
737 for (uint32_t jobIndex = 0; jobIndex < jobs; ++jobIndex) {
738 const uint32_t groupJobIdxStart = jobIndex * groupSize;
739 const uint32_t groupJobIdxStartPlusGroupSize = groupJobIdxStart + groupSize;
740 const uint32_t groupJobIdxEnd =
741 groupJobIdxStartPlusGroupSize < itemsToProcess ? groupJobIdxStartPlusGroupSize : itemsToProcess;
742
743 auto* pCtx = job.pCtx;
744 auto invoke = job.invoke;
745 auto groupJobFunc = [pCtx, invoke, groupJobIdxStart, groupJobIdxEnd]() {
746 JobArgs args;
747 args.idxStart = groupJobIdxStart;
748 args.idxEnd = groupJobIdxEnd;
749 invoke(pCtx, args);
750 };
751
752 auto& jobData = m_jobManager.data(pHandles[jobIndex]);
753 jobData.func = util::SmallFunc::create(GAIA_MOV(groupJobFunc));
754 jobData.prio = prio;
755 }
756 {
757 auto& jobData = m_jobManager.data(pHandles[jobs]);
758 jobData.prio = prio;
759 }
760
761 dep(handles.subspan(0, jobs), pHandles[jobs]);
762 submit(handles);
763 return pHandles[jobs];
764 }
765
772 void wait(JobHandle jobHandle) {
773 GAIA_PROF_SCOPE(tp::wait);
774
775 GAIA_ASSERT(main_thread());
776
777 // Skip waitinig for unset job handles.
778 if (jobHandle == (JobHandle)JobNull_t{})
779 return;
780
781 auto* ctx = detail::tl_workerCtx;
782 auto& jobData = m_jobManager.data(jobHandle);
783 const bool waitBackground = is_background(jobData);
784 auto state = jobData.state.load();
785
786 // Waiting for a job that has not been initialized is nonsense.
787 GAIA_ASSERT(state != 0);
788
789 // Wait until done
790 for (; (state & JobState::STATE_BITS_MASK) < JobState::Done; state = jobData.state.load()) {
791 // The job we are waiting for is not finished yet, try running some other job in the meantime
792 JobHandle otherJobHandle;
793 const bool canHelpBackground = waitBackground && m_backgroundWorkersCnt == 0;
794 const bool hasBackgroundJob = canHelpBackground && try_fetch_background_job(otherJobHandle);
795 const bool hasJob = hasBackgroundJob || try_fetch_job(*ctx, otherJobHandle);
796 if (hasJob) {
797 if (run(otherJobHandle, ctx))
798 continue;
799 }
800
801 // The job we are waiting for is already running.
802 // Wait until it signals it's finished.
803 if ((state & JobState::STATE_BITS_MASK) == JobState::Executing) {
804 const auto workerId = (state & JobState::DEP_BITS_MASK);
805 auto* jobDoneEvent = &m_workersCtx[workerId].event;
806 jobDoneEvent->wait();
807 continue;
808 }
809
810 // The worst case scenario.
811 // We have nothing to do and the job we are waiting for is not executing still.
812 // Let's wait for any job to start executing.
813 const auto workerBit = 1U << ctx->workerIdx;
814 const auto oldBlockedMask = m_blockedInWorkUntil.fetch_or(workerBit);
815 const auto newState = jobData.state.load();
816 if (newState == state) // still not JobState::Done?
817 Futex::wait(&m_blockedInWorkUntil, oldBlockedMask | workerBit, detail::WaitMaskAny);
818 m_blockedInWorkUntil.fetch_and(~workerBit);
819 }
820 }
821
824 void update() {
825 GAIA_ASSERT(main_thread());
826 main_thread_tick();
827 }
828
831 GAIA_NODISCARD static uint32_t hw_thread_cnt() {
832 auto hwThreads = (uint32_t)std::thread::hardware_concurrency();
833 return core::get_max(1U, hwThreads);
834 }
835
838 GAIA_NODISCARD static uint32_t hw_efficiency_cores_cnt() {
839 uint32_t efficiencyCores = 0;
840#if GAIA_PLATFORM_APPLE
841 size_t size = sizeof(efficiencyCores);
842 if (sysctlbyname("hw.perflevel1.logicalcpu", &efficiencyCores, &size, nullptr, 0) != 0)
843 return 0;
844#elif GAIA_PLATFORM_FREEBSD
845 int cpuIndex = 0;
846 char oidName[32];
847 int coreType;
848 size_t size = sizeof(coreType);
849 while (true) {
850 GAIA_STRFMT(oidName, sizeof(oidName), "dev.cpu.%d.coretype", cpuIndex);
851 if (sysctlbyname(oidName, &coreType, &size, nullptr, 0) != 0)
852 break; // Stop on the last CPU index
853
854 // 0 = performance core
855 // 1 = efficiency core
856 if (coreType == 1)
857 ++efficiencyCores;
858
859 ++cpuIndex;
860 }
861#elif GAIA_PLATFORM_WINDOWS
862 DWORD length = 0;
863
864 // First, determine required buffer size
865 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, nullptr, &length))
866 return 0;
867
868 // Allocate enough memory
869 auto* pBuffer = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)malloc(length);
870 if (pBuffer == nullptr)
871 return 0;
872
873 // Retrieve the data
874 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, pBuffer, &length)) {
875 free(pBuffer);
876 return 0;
877 }
878
879 uint32_t heterogenousCnt = 0;
880
881 // Iterate over processor core entries.
882 // On Windows we can't directly tell whether a core is an efficiency core or a performance core.
883 // Instead:
884 // - lower EfficiencyClass values correspond to more efficient cores
885 // - higher EfficiencyClass values correspond to higher performance cores
886 // - EfficiencyClass is zero for homogeneous CPU architectures
887 // Therefore, to count efficiency cores on Windows, we will count cores where EfficiencyClass == 0.
888 // On heterogeneous this should gives us the correct results.
889 // On homogenous architectures, the value is always 0 so rather than calculating the number of efficiency
890 // cores, we would calculate the number of performance cores. For the sake of correctness, if all cores return
891 // 0, we use 0 for the number of efficiency cores.
892 for (char* ptr = (char*)pBuffer; ptr < (char*)pBuffer + length;
893 ptr += ((SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr)->Size) {
894 auto* entry = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr;
895 if (entry->Relationship == RelationProcessorCore) {
896 if (entry->Processor.EfficiencyClass == 0)
897 ++efficiencyCores;
898 else
899 ++heterogenousCnt;
900 }
901 }
902
903 if (heterogenousCnt == 0)
904 efficiencyCores = 0;
905
906 free(pBuffer);
907#elif GAIA_PLATFORM_LINUX
908 {
909 // Intel has /sys/devices/cpu_core/cpus, /sys/devices/cpu_atom/cpus on some systems
910 DIR* dir = opendir("/sys/devices/cpu_atom/cpus/");
911 if (dir == nullptr)
912 return 0;
913
914 dirent* entry;
915 while ((entry = readdir(dir)) != nullptr) {
916 if (strncmp(entry->d_name, "cpu", 3) == 0 && entry->d_name[3] >= '0' && entry->d_name[3] <= '9')
917 ++efficiencyCores;
918 }
919
920 closedir(dir);
921 }
922
923 if (efficiencyCores == 0) {
924 // TODO: Go through all CPUs packages and CPUs and determine the differences between them.
925 // There are many metrics.
926 // 1) We will assume all CPUs to be of the same architecture.
927 // 2) Same CPU architecture but different cache sizes. Smaller ones are "efficiency" cores.
928 // This is the AMD way. Still, these are about the same things so maybe we would just treat
929 // all such cores as performance cores.
930 // 3) Different max frequencies on different cores. This might be indicative enough.
931 // There is also an optional parameter present on ARM CPUs:
932 // https://www.kernel.org/doc/Documentation/devicetree/bindings/arm/cpu-capacity.txt
933 // In this case, we'd treat CPUs with the highest capacity-dmips-mhz as performance cores,
934 // and consider the rest as efficiency cores.
935
936 // ...
937 }
938#endif
939 return efficiencyCores;
940 }
941
942 private:
943 static void* thread_func(void* pCtx) {
944 auto& ctx = *(ThreadCtx*)pCtx;
945
946 detail::tl_workerCtx = &ctx;
947
948 // Set the worker thread name.
949 // Needs to be called from inside the thread because some platforms
950 // can change the name only when run from the specific thread.
951 ctx.tp->set_thread_name(ctx.workerIdx, ctx.prio);
952
953 // Set the worker thread priority
954 ctx.tp->set_thread_priority(ctx.workerIdx, ctx.prio);
955
956 // Process jobs
957 ctx.tp->worker_loop(ctx);
958
959 detail::tl_workerCtx = nullptr;
960
961 return nullptr;
962 }
963
968 void create_thread(uint32_t workerIdx, JobPriority prio, bool background) {
969 // Idx 0 is reserved for the main thread
970 GAIA_ASSERT(workerIdx > 0);
971
972 auto& ctx = m_workersCtx[workerIdx];
973 ctx.tp = this;
974 ctx.workerIdx = workerIdx;
975 ctx.prio = prio;
976 ctx.background = background;
977 ctx.threadCreated = false;
978
979#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
980 m_workers[workerIdx - 1] = std::thread([&ctx]() {
981 thread_func((void*)&ctx);
982 });
983#else
984 pthread_attr_t attr{};
985 int ret = pthread_attr_init(&attr);
986 if (ret != 0) {
987 GAIA_LOG_W("pthread_attr_init failed for worker thread %u. ErrCode = %d", workerIdx, ret);
988 return;
989 }
990
992 // Apple's recommendation for Apple Silicon for games / high-perf software
993 // ========================================================================
994 // Per frame | Scheduling policy | QoS class / Priority
995 // ========================================================================
996 // Main thread | SCHED_OTHER | QOS_CLASS_USER_INTERACTIVE (47)
997 // Render/Audio thread | SCHED_RR | 45
998 // Workers High Prio | SCHED_RR | 39-41
999 // Workers Low Prio | SCHED_OTHER | QOS_CLASS_USER_INTERACTIVE (38)
1000 // ========================================================================
1001 // Multiple-frames | |
1002 // ========================================================================
1003 // Async Workers High Prio| SCHED_OTHER | QOS_CLASS_USER_INITIATED (37)
1004 // Async Workers Low Prio | SCHED_OTHER | QOS_CLASS_DEFAULT (31)
1005 // Prefetching/Streaming | SCHED_OTHER | QOS_CLASS_UTILITY (20)
1006 // ========================================================================
1007
1008 if (prio == JobPriority::Low) {
1009 #if GAIA_PLATFORM_APPLE
1010 ret = pthread_attr_set_qos_class_np(&attr, QOS_CLASS_USER_INTERACTIVE, -9); // 47-9=38
1011 if (ret != 0) {
1012 GAIA_LOG_W(
1013 "pthread_attr_set_qos_class_np failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
1014 (uint32_t)prio, ret);
1015 }
1016 #else
1017 ret = pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1018 if (ret != 0) {
1019 GAIA_LOG_W(
1020 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
1021 (uint32_t)prio, ret);
1022 }
1023
1024 int prioMax = core::get_min(38, sched_get_priority_max(SCHED_OTHER));
1025 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_OTHER));
1026 int prioUse = core::get_min(prioMin + 5, prioMax);
1027 prioUse = core::get_max(prioUse, prioMin);
1028 sched_param param{};
1029 param.sched_priority = prioUse;
1030
1031 ret = pthread_attr_setschedparam(&attr, &param);
1032 if (ret != 0) {
1033 GAIA_LOG_W(
1034 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
1035 param.sched_priority, workerIdx, (uint32_t)prio, ret);
1036 }
1037 #endif
1038 } else {
1039 ret = pthread_attr_setschedpolicy(&attr, SCHED_RR);
1040 if (ret != 0) {
1041 GAIA_LOG_W(
1042 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
1043 (uint32_t)prio, ret);
1044 }
1045
1046 int prioMax = core::get_min(41, sched_get_priority_max(SCHED_RR));
1047 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_RR));
1048 int prioUse = core::get_max(prioMax - 5, prioMin);
1049 prioUse = core::get_min(prioUse, prioMax);
1050 sched_param param{};
1051 param.sched_priority = prioUse;
1052
1053 ret = pthread_attr_setschedparam(&attr, &param);
1054 if (ret != 0) {
1055 GAIA_LOG_W(
1056 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
1057 param.sched_priority, workerIdx, (uint32_t)prio, ret);
1058 }
1059 }
1060
1061 // Create the thread with given attributes
1062 ret = pthread_create(&m_workers[workerIdx - 1], &attr, thread_func, (void*)&ctx);
1063 if (ret != 0) {
1064 GAIA_LOG_W("pthread_create failed for worker thread %u. ErrCode = %d", workerIdx, ret);
1065 } else {
1066 ctx.threadCreated = true;
1067 }
1068
1069 pthread_attr_destroy(&attr);
1070#endif
1071
1072 // Stick each thread to a specific CPU core if possible
1073 set_thread_affinity(workerIdx);
1074 }
1075
1078 void join_thread(uint32_t workerIdx) {
1079 if GAIA_UNLIKELY (workerIdx > m_workers.size())
1080 return;
1081
1082#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
1083 auto& t = m_workers[workerIdx - 1];
1084 if (t.joinable())
1085 t.join();
1086#else
1087 auto& ctx = m_workersCtx[workerIdx];
1088 if (!ctx.threadCreated)
1089 return;
1090
1091 auto& t = m_workers[workerIdx - 1];
1092 pthread_join(t, nullptr);
1093 ctx.threadCreated = false;
1094#endif
1095 }
1096
1097 void create_worker_threads(uint32_t& workerIdx, JobPriority prio, uint32_t count) {
1098 for (uint32_t i = 0; i < count; ++i)
1099 create_thread(workerIdx++, prio, false);
1100 }
1101
1102 void create_background_worker_threads(uint32_t& workerIdx) {
1103 for (uint32_t i = 0; i < m_backgroundWorkersCnt; ++i)
1104 create_thread(workerIdx++, JobPriority::Low, true);
1105 }
1106
1107 void set_thread_priority([[maybe_unused]] uint32_t workerIdx, [[maybe_unused]] JobPriority priority) {
1108#if GAIA_PLATFORM_WINDOWS
1109 HANDLE nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
1110
1111 THREAD_POWER_THROTTLING_STATE state{};
1112 state.Version = THREAD_POWER_THROTTLING_CURRENT_VERSION;
1113 if (priority == JobPriority::High) {
1114 // HighQoS
1115 // Turn EXECUTION_SPEED throttling off.
1116 // ControlMask selects the mechanism and StateMask is set to zero as mechanisms should be turned off.
1117 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
1118 state.StateMask = 0;
1119 } else {
1120 // EcoQoS
1121 // Turn EXECUTION_SPEED throttling on.
1122 // ControlMask selects the mechanism and StateMask declares which mechanism should be on or off.
1123 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
1124 state.StateMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
1125 }
1126
1127 BOOL ret = SetThreadInformation(nativeHandle, ThreadPowerThrottling, &state, sizeof(state));
1128 if (ret != TRUE) {
1129 GAIA_LOG_W("SetThreadInformation failed for thread %u", workerIdx);
1130 return;
1131 }
1132#else
1133 // Done when the thread is created
1134#endif
1135 }
1136
1137 void set_thread_affinity([[maybe_unused]] uint32_t workerIdx) {
1138 // NOTE:
1139 // Some cores might have multiple logic threads, there might be
1140 // more sockets and some cores might even be physically different
1141 // form others (performance vs efficiency cores).
1142 // Because of that, do not handle affinity and let the OS figure it out.
1143 // All treads created by the pool are setting thread priorities to make
1144 // it easier for the OS.
1145
1146 // #if GAIA_PLATFORM_WINDOWS
1147 // HANDLE nativeHandle = (HANDLE)m_workers[workerIdx-1].native_handle();
1148 //
1149 // auto mask = SetThreadAffinityMask(nativeHandle, 1ULL << workerIdx);
1150 // if (mask <= 0)
1151 // GAIA_LOG_W("Issue setting thread affinity for worker thread %u!", workerIdx);
1152 // #elif GAIA_PLATFORM_APPLE
1153 // // Do not do affinity for MacOS. If is not supported for Apple Silicon and
1154 // // Intel MACs are deprecated anyway.
1155 // // TODO: Consider supporting this at least for Intel MAC as there are still
1156 // // quite of few of them out there.
1157 // #elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
1158 // pthread_t nativeHandle = (pthread_t)m_workers[workerIdx-1].native_handle();
1159 //
1160 // cpu_set_t cpuSet;
1161 // CPU_ZERO(&cpuSet);
1162 // CPU_SET(workerIdx, &cpuSet);
1163 //
1164 // auto ret = pthread_setaffinity_np(nativeHandle, sizeof(cpuSet), &cpuSet);
1165 // if (ret != 0)
1166 // GAIA_LOG_W("Issue setting thread affinity for worker thread %u!", workerIdx);
1167 //
1168 // ret = pthread_getaffinity_np(nativeHandle, sizeof(cpuSet), &cpuSet);
1169 // if (ret != 0)
1170 // GAIA_LOG_W("Thread affinity could not be set for worker thread %u!", workerIdx);
1171 // #endif
1172 }
1173
1177 void set_thread_name(uint32_t workerIdx, JobPriority prio) {
1178 const bool background = m_workersCtx[workerIdx].background;
1179 const char* workerKind = background ? "BG" : prio == JobPriority::High ? "HI" : "LO";
1180#if GAIA_PROF_USE_PROFILER_THREAD_NAME
1181 char threadName[16]{};
1182 GAIA_STRFMT(threadName, 16, "worker_%s_%u", workerKind, workerIdx);
1183 GAIA_PROF_THREAD_NAME(threadName);
1184#elif GAIA_PLATFORM_WINDOWS
1185 auto nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
1186 const wchar_t* workerKindW = background ? L"BG" : prio == JobPriority::High ? L"HI" : L"LO";
1187
1188 TOSApiFunc_SetThreadDescription pSetThreadDescFunc = nullptr;
1189 if (auto* pModule = GetModuleHandleA("kernel32.dll")) {
1190 auto* pFunc = GetProcAddress(pModule, "SetThreadDescription");
1191 pSetThreadDescFunc = reinterpret_cast<TOSApiFunc_SetThreadDescription>(reinterpret_cast<void*>(pFunc));
1192 }
1193 if (pSetThreadDescFunc != nullptr) {
1194 wchar_t threadName[16]{};
1195 swprintf_s(threadName, L"worker_%s_%u", workerKindW, workerIdx);
1196
1197 auto hr = pSetThreadDescFunc(nativeHandle, threadName);
1198 if (FAILED(hr)) {
1199 GAIA_LOG_W("Issue setting name for worker %s thread %u!", workerKind, workerIdx);
1200 }
1201 } else {
1202 #if defined _MSC_VER
1203 char threadName[16]{};
1204 GAIA_STRFMT(threadName, 16, "worker_%s_%u", workerKind, workerIdx);
1205
1206 THREADNAME_INFO info{};
1207 info.dwType = 0x1000;
1208 info.szName = threadName;
1209 info.dwThreadID = GetThreadId(nativeHandle);
1210
1211 __try {
1212 RaiseException(0x406D1388, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
1213 } __except (EXCEPTION_EXECUTE_HANDLER) {
1214 }
1215#endif
1216 }
1217#elif GAIA_PLATFORM_APPLE
1218 char threadName[16]{};
1219 GAIA_STRFMT(threadName, 16, "worker_%s_%u", workerKind, workerIdx);
1220 auto ret = pthread_setname_np(threadName);
1221 if (ret != 0)
1222 GAIA_LOG_W("Issue setting name for worker %s thread %u!", workerKind, workerIdx);
1223#elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
1224 auto nativeHandle = m_workers[workerIdx - 1];
1225
1226 char threadName[16]{};
1227 GAIA_STRFMT(threadName, 16, "worker_%s_%u", workerKind, workerIdx);
1228 GAIA_PROF_THREAD_NAME(threadName);
1229 auto ret = pthread_setname_np(nativeHandle, threadName);
1230 if (ret != 0)
1231 GAIA_LOG_W("Issue setting name for worker %s thread %u!", workerKind, workerIdx);
1232#endif
1233 }
1234
1237 GAIA_NODISCARD bool main_thread() const {
1238 return std::this_thread::get_id() == m_mainThreadId;
1239 }
1240
1243 void main_thread_tick() {
1244 auto& ctx = *detail::tl_workerCtx;
1245
1246 // Keep executing while there is work
1247 while (true) {
1248 JobHandle jobHandle;
1249 if (!try_fetch_job(ctx, jobHandle))
1250 break;
1251
1252 (void)run(jobHandle, &ctx);
1253 }
1254 }
1255
1261 GAIA_NODISCARD bool try_steal_job(ThreadCtx& ctx, JobPriority prio, JobHandle& jobHandle) {
1262 const auto workerCnt = m_workersCtx.size();
1263 for (uint32_t i = 0; i < workerCnt;) {
1264 // Keep stealing within the same priority class and skip our own queue
1265 if (i == ctx.workerIdx || m_workersCtx[i].background || m_workersCtx[i].prio != prio) {
1266 ++i;
1267 continue;
1268 }
1269
1270 const auto res = m_workersCtx[i].jobQueue.try_steal(jobHandle);
1271 // Race condition, try again from the same context
1272 if (!res)
1273 continue;
1274
1275 // Stealing can return true if the queue is empty.
1276 // We return right away only if we receive a valid handle which means
1277 // when there was an idle job in the queue.
1278 if (jobHandle != (JobHandle)JobNull_t{})
1279 return true;
1280
1281 ++i;
1282 }
1283
1284 return false;
1285 }
1286
1292 GAIA_NODISCARD bool try_fetch_prio(ThreadCtx& ctx, JobPriority prio, JobHandle& jobHandle) {
1293 if (m_jobQueue[(uint32_t)prio].try_pop(jobHandle))
1294 return true;
1295
1296 return try_steal_job(ctx, prio, jobHandle);
1297 }
1298
1302 GAIA_NODISCARD bool try_fetch_background_job(JobHandle& jobHandle) {
1303 return m_jobQueueBackground.try_pop(jobHandle);
1304 }
1305
1310 GAIA_NODISCARD bool try_fetch_job(ThreadCtx& ctx, JobHandle& jobHandle) {
1311 if (ctx.background)
1312 return try_fetch_background_job(jobHandle);
1313
1314 // Try getting a job from the local queue
1315 if (ctx.jobQueue.try_pop(jobHandle))
1316 return true;
1317
1318 // The main thread may help with both queues while waiting or updating
1319 if (ctx.workerIdx == 0) {
1320 if (try_fetch_prio(ctx, JobPriority::High, jobHandle))
1321 return true;
1322
1323 return try_fetch_prio(ctx, JobPriority::Low, jobHandle);
1324 }
1325
1326 return try_fetch_prio(ctx, ctx.prio, jobHandle);
1327 }
1328
1333 GAIA_NODISCARD bool can_run_inline(const ThreadCtx* ctx, const JobContainer& jobData) const {
1334 const bool background = is_background(jobData);
1335 if (background)
1336 return (ctx != nullptr && ctx->background) || m_backgroundWorkersCnt == 0;
1337
1338 // The main thread is allowed to help with both priority classes.
1339 if (ctx == nullptr || ctx->workerIdx == 0)
1340 return true;
1341
1342 // Matching worker classes may execute their own overflow inline.
1343 if (!ctx->background && ctx->prio == jobData.prio)
1344 return true;
1345
1346 // If there are no spawned workers for the target priority we need to preserve
1347 // the forward-progress guarantee and allow inline fallback.
1348 return m_workerThreadsCnt[(uint32_t)jobData.prio] == 0;
1349 }
1350
1354 void wait_for_queue_space(ThreadCtx& ctx, const JobContainer& jobData) {
1355 const bool background = is_background(jobData);
1356
1357 // Wake one worker from the target class in case all of them are asleep while
1358 // the producer is waiting for queue space to become available.
1359 if (background) {
1360 if (m_backgroundWorkersCnt != 0)
1361 m_semBackground.release(1);
1362 } else {
1363 const auto prioIdx = (uint32_t)jobData.prio;
1364 if (m_workerThreadsCnt[prioIdx] != 0)
1365 m_sem[prioIdx].release(1);
1366 }
1367
1368 // Keep the current worker productive without violating the priority boundary.
1369 JobHandle otherJobHandle;
1370 const bool hasWork =
1371 ctx.background ? try_fetch_background_job(otherJobHandle) : try_fetch_prio(ctx, ctx.prio, otherJobHandle);
1372 if (hasWork) {
1373 (void)run(otherJobHandle, &ctx);
1374 return;
1375 }
1376
1377 std::this_thread::yield();
1378 }
1379
1383 void worker_loop(ThreadCtx& ctx) {
1384 while (true) {
1385 // Wait for work
1386 if (ctx.background)
1387 m_semBackground.wait();
1388 else
1389 m_sem[(uint32_t)ctx.prio].wait();
1390
1391 // Keep executing while there is work
1392 while (true) {
1393 JobHandle jobHandle;
1394 if (!try_fetch_job(ctx, jobHandle))
1395 break;
1396
1397 (void)run(jobHandle, detail::tl_workerCtx);
1398 }
1399
1400 // Check if the worker can keep running
1401 const bool stop = m_stop.load();
1402 if (stop)
1403 break;
1404 }
1405 }
1406
1408 void reset() {
1409 if (m_workers.empty())
1410 return;
1411
1412 // Request stopping
1413 m_stop.store(true);
1414
1415 // Signal all threads
1416 GAIA_FOR(JobPriorityCnt) {
1417 if (m_workerThreadsCnt[i] != 0)
1418 m_sem[i].release((int32_t)m_workerThreadsCnt[i]);
1419 }
1420 if (m_backgroundWorkersCnt != 0)
1421 m_semBackground.release((int32_t)m_backgroundWorkersCnt);
1422
1423 auto* ctx = detail::tl_workerCtx;
1424 if (ctx == nullptr) {
1425 // reset() can be reached during static teardown from a thread that never
1426 // entered the pool and therefore has no TLS worker context bound.
1427 ctx = &m_workersCtx[0];
1428 detail::tl_workerCtx = ctx;
1429 }
1430
1431 // Finish remaining jobs
1432 JobHandle jobHandle;
1433 while (try_fetch_job(*ctx, jobHandle)) {
1434 run(jobHandle, ctx);
1435 }
1436 while (try_fetch_background_job(jobHandle)) {
1437 run(jobHandle, ctx);
1438 }
1439
1440 detail::tl_workerCtx = nullptr;
1441
1442 // Join threads with the main one
1443 GAIA_FOR(m_workers.size()) join_thread(i + 1);
1444
1445 // All threads have been stopped. Allow new threads to run if necessary.
1446 m_stop.store(false);
1447 }
1448
1450 JobPriority final_frame_prio(JobPriority priority) {
1451 const auto cntWorkers = m_workersCnt[(uint32_t)priority];
1452 return cntWorkers > 0
1453 // If there is enough workers, keep the priority
1454 ? priority
1455 // Not enough workers, use the other priority that has workers
1456 : (JobPriority)(((uint32_t)priority + 1U) % (uint32_t)JobPriorityCnt);
1457 }
1458
1460 JobPriority final_prio(const Job& job) {
1461 if ((job.flags & JobCreationFlags::Background) != 0U)
1462 return job.priority;
1463
1464 return final_frame_prio(job.priority);
1465 }
1466
1468 template <typename TJob>
1469 JobPriority final_prio(const TJob& job) {
1470 return final_frame_prio(job.priority);
1471 }
1472
1476 GAIA_NODISCARD static bool is_background(const JobContainer& jobData) {
1477 return (jobData.flags & JobCreationFlags::Background) != 0U;
1478 }
1479
1480 void signal_edges(JobContainer& jobData) {
1481 const auto max = jobData.edges.depCnt;
1482
1483 // Nothing to do if there are no dependencies
1484 if (max == 0)
1485 return;
1486
1487 auto* ctx = detail::tl_workerCtx;
1488
1489 // One dependency
1490 if (max == 1) {
1491 auto depHandle = jobData.edges.dep;
1492#if GAIA_LOG_JOB_STATES
1493 GAIA_LOG_N("SIGNAL %u.%u -> %u.%u", jobData.idx, jobData.gen, depHandle.id(), depHandle.gen());
1494#endif
1495
1496 // See the conditions can't be satisfied for us to submit the job we skip
1497 auto& depData = m_jobManager.data(depHandle);
1498 if (!JobManager::signal_edge(depData))
1499 return;
1500
1501 // Submit all jobs that can are ready
1502 process(std::span(&depHandle, 1), ctx);
1503 return;
1504 }
1505
1506 // Multiple dependencies. The array has to be set
1507 GAIA_ASSERT(jobData.edges.pDeps != nullptr);
1508
1509 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * max);
1510 uint32_t cnt = 0;
1511 GAIA_FOR(max) {
1512 auto depHandle = jobData.edges.pDeps[i];
1513
1514 // See if all conditions were satisfied for us to submit the job
1515 auto& depData = m_jobManager.data(depHandle);
1516 if (!JobManager::signal_edge(depData))
1517 continue;
1518
1519 pHandles[cnt++] = depHandle;
1520 }
1521
1522 // Submit all jobs that can are ready
1523 process(std::span(pHandles, cnt), ctx);
1524 }
1525
1529 void process(std::span<JobHandle> jobHandles, ThreadCtx* ctx) {
1530 auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * jobHandles.size());
1531 uint32_t handlesCnt = 0;
1532
1533 for (auto handle: jobHandles) {
1534 auto& jobData = m_jobManager.data(handle);
1535 m_jobManager.processing(jobData);
1536
1537 // Jobs that have no functor assigned don't need to be enqueued.
1538 // We can "run" them right away. The only time where it makes
1539 // sense to create such a job is to create a sync job. E.g. when you
1540 // need to wait for N jobs, rather than waiting for each of them
1541 // separately you make them a dependency of a dummy/sync job and
1542 // wait just for that one.
1543 if (!jobData.func.operator bool())
1544 (void)run(handle, ctx);
1545 else
1546 pHandles[handlesCnt++] = handle;
1547 }
1548
1549 std::span handles(pHandles, handlesCnt);
1550 while (!handles.empty()) {
1551 // Try pushing all jobs while preserving their priority queue ownership.
1552 uint32_t pushed = 0;
1553 uint32_t released[JobPriorityCnt]{};
1554 uint32_t backgroundReleased = 0;
1555 for (; pushed < handles.size(); ++pushed) {
1556 const auto handle = handles[pushed];
1557 const auto& jobData = m_jobManager.data(handle);
1558 if (is_background(jobData)) {
1559 if (!m_jobQueueBackground.try_push(handle))
1560 break;
1561
1562 ++backgroundReleased;
1563 continue;
1564 }
1565
1566 const auto prio = jobData.prio;
1567 // Worker-local queues are reserved for work that matches the worker's own
1568 // priority class. Cross-priority releases must go through the matching
1569 // global queue so the right workers can pick them up.
1570 const bool useLocalQueue = ctx != nullptr && !ctx->background && ctx->workerIdx != 0 && ctx->prio == prio;
1571 const bool res =
1572 useLocalQueue ? ctx->jobQueue.try_push(handle) : m_jobQueue[(uint32_t)prio].try_push(handle);
1573 if (!res)
1574 break;
1575
1576 released[(uint32_t)prio]++;
1577 }
1578
1579 GAIA_FOR(JobPriorityCnt) {
1580 // Only spawned worker threads block on semaphores. The main thread helps by
1581 // draining queues opportunistically from wait() and update().
1582 const auto cnt = core::get_min(released[i], m_workerThreadsCnt[i]);
1583 if (cnt != 0)
1584 m_sem[i].release((int32_t)cnt);
1585 }
1586 const auto backgroundCnt = core::get_min(backgroundReleased, m_backgroundWorkersCnt);
1587 if (backgroundCnt != 0)
1588 m_semBackground.release((int32_t)backgroundCnt);
1589
1590 handles = handles.subspan(pushed);
1591 if (!handles.empty()) {
1592 const auto handle = handles[0];
1593 const auto& jobData = m_jobManager.data(handle);
1594
1595 if (can_run_inline(ctx, jobData)) {
1596 // The queue was full. Execute the job right away only when the
1597 // current execution context is allowed to run this priority class.
1598 run(handle, ctx);
1599 handles = handles.subspan(1);
1600 } else {
1601 GAIA_ASSERT(ctx != nullptr);
1602 wait_for_queue_space(*ctx, jobData);
1603 }
1604 }
1605 }
1606 }
1607
1608 bool run(JobHandle jobHandle, ThreadCtx* ctx) {
1609 if (jobHandle == (JobHandle)JobNull_t{})
1610 return false;
1611
1612 auto& jobData = m_jobManager.data(jobHandle);
1613 const bool manualDelete = (jobData.flags & JobCreationFlags::ManualDelete) != 0U;
1614 const bool canWait = (jobData.flags & JobCreationFlags::CanWait) != 0U;
1615
1616 m_jobManager.executing(jobData, ctx->workerIdx);
1617
1618 if (m_blockedInWorkUntil.load() != 0) {
1619 const auto blockedCnt = m_blockedInWorkUntil.exchange(0);
1620 if (blockedCnt != 0)
1621 Futex::wake(&m_blockedInWorkUntil, detail::WaitMaskAll);
1622 }
1623
1624 GAIA_ASSERT(jobData.idx != (uint32_t)-1 && jobData.data.gen != (uint32_t)-1);
1625
1626 // Run the functor associated with the job
1627 m_jobManager.run(jobData);
1628
1629 // Signal the edges and release memory allocated for them if possible
1630 signal_edges(jobData);
1631 JobManager::free_edges(jobData);
1632
1633 // Signal we finished
1634 ctx->event.set();
1635 if (canWait) {
1636 const auto* pFutexValue = &jobData.state;
1637 Futex::wake(pFutexValue, detail::WaitMaskAll);
1638 }
1639
1640 if (!manualDelete)
1641 del(jobHandle);
1642
1643 return true;
1644 }
1645 };
1646
1647 GAIA_MSVC_WARNING_POP()
1648 } // namespace mt
1649} // namespace gaia
Array with variable size of elements of type.
Definition darray_impl.h:25
Definition span_impl.h:99
Storage and lifecycle manager for internal job and parallel callback records.
Definition jobmanager.h:233
static void free_edges(JobContainer &jobData)
Releases heap storage used for the dependency list of jobData.
Definition jobmanager.h:340
static void reset_state(JobContainer &jobData)
Resets a completed or never-submitted job back to the clear state.
Definition jobmanager.h:465
static GAIA_NODISCARD bool done(const JobContainer &jobData)
Checks whether jobData has finished executing.
Definition jobmanager.h:519
static void processing(JobContainer &jobData)
Marks a job as queued for worker processing.
Definition jobmanager.h:435
GAIA_NODISCARD bool is_clear(JobHandle jobHandle) const
Checks whether the job referenced by jobHandle is in the clear state.
Definition jobmanager.h:478
static void run(JobContainer &jobData)
Execute the functor associated with the job container.
Definition jobmanager.h:314
GAIA_NODISCARD ParallelCallbackHandle alloc_parallel_callback(JobArgsFunc callback, uint32_t refs)
Allocates a shared callback record used by parallel jobs.
Definition jobmanager.h:279
static void executing(JobContainer &jobData, uint32_t workerIdx)
Marks a job as executing on the worker identified by workerIdx.
Definition jobmanager.h:446
void free_parallel_callback(ParallelCallbackHandle handle)
Releases a shared callback record.
Definition jobmanager.h:299
void free_job(JobHandle jobHandle)
Invalidates jobHandle by resetting its index in the job pool. Every time a job is deallocated its gen...
Definition jobmanager.h:290
JobContainer & data(JobHandle jobHandle)
Returns mutable internal storage for jobHandle.
Definition jobmanager.h:246
void invoke_parallel_callback(ParallelCallbackHandle handle, const JobArgs &args)
Invokes the shared callback referenced by handle.
Definition jobmanager.h:527
void dep(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition jobmanager.h:355
GAIA_NODISCARD bool release_parallel_callback_ref(ParallelCallbackHandle handle)
Releases one reference to the callback referenced by handle.
Definition jobmanager.h:536
void dep_refresh(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition jobmanager.h:395
GAIA_NODISCARD JobHandle alloc_job(Job job)
Allocates a new job container identified by a unique JobHandle.
Definition jobmanager.h:258
static uint32_t submit(JobContainer &jobData)
Marks a job as submitted.
Definition jobmanager.h:423
static bool signal_edge(JobContainer &jobData)
Signals that one dependency edge of jobData has completed.
Definition jobmanager.h:324
An optimized version of Semaphore that avoids expensive system calls when the counter is greater than...
Definition semaphore_fast.h:12
bool wait()
Decrements semaphore count by 1. If the count is already 0, it waits indefinitely until semaphore cou...
Definition semaphore_fast.h:39
void release(int32_t count=1)
Increments semaphore count by the specified amount.
Definition semaphore_fast.h:26
Definition spinlock.h:8
Definition threadpool.h:87
void set_workers_high_prio_inter(uint32_t &workerIdx, uint32_t count)
Updates the number of worker threads participating at high priority workloads.
Definition threadpool.h:230
JobHandle add(TJob job)
Creates a threadpool job from job.
Definition threadpool.h:375
GAIA_NODISCARD uint32_t background_workers() const
Returns the number of background worker threads.
Definition threadpool.h:178
JobHandle sched_par(JobParallelRef job, uint32_t itemsToProcess, uint32_t groupSize)
Schedules a non-owning parallel job descriptor on worker threads.
Definition threadpool.h:685
void update()
Uses the main thread to help with frame job processing. Background jobs are intentionally excluded fr...
Definition threadpool.h:824
void reset(std::span< JobHandle > jobHandles)
Waits for jobHandles to finish and resets them to a reusable state.
Definition threadpool.h:498
JobHandle sched_par(JobParallel job, uint32_t itemsToProcess, uint32_t groupSize)
Schedules a job to run on worker threads in parallel.
Definition threadpool.h:577
void dep(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition threadpool.h:326
void set_max_workers(uint32_t count, uint32_t countHighPrio)
Set the maximum number of frame execution contexts for this system.
Definition threadpool.h:188
void submit(JobHandle jobHandle)
Pushes jobHandle into the internal queue so worker threads can pick it up and execute it....
Definition threadpool.h:476
void set_background_workers(uint32_t count)
Updates the number of worker threads dedicated to background jobs. Background workers run jobs submit...
Definition threadpool.h:287
void dep_refresh(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition threadpool.h:363
void dep_refresh(JobHandle jobFirst, JobHandle jobSecond)
Makes jobSecond depend on jobFirst. This means jobSecond will not run until jobFirst finishes.
Definition threadpool.h:350
JobHandle sched(Job job)
Schedules a job to run on a worker thread.
Definition threadpool.h:536
void submit(std::span< JobHandle > jobHandles)
Pushes jobHandles into the internal queue so worker threads can pick them up and execute them....
Definition threadpool.h:445
JobHandle sched(Job job, JobHandle dependsOn)
Schedules a job to run on a worker thread.
Definition threadpool.h:563
static GAIA_NODISCARD uint32_t hw_efficiency_cores_cnt()
Returns the number of efficiency cores of the system.
Definition threadpool.h:838
void make_main_thread()
Make the calling thread the effective main thread from the thread pool perspective.
Definition threadpool.h:168
void wait(JobHandle jobHandle)
Wait until a job associated with the jobHandle finishes executing. Cleans up any job allocations and ...
Definition threadpool.h:772
void set_workers_low_prio(uint32_t count)
Updates the number of worker threads participating at low priority workloads.
Definition threadpool.h:272
void del(JobHandle jobHandle)
Deletes a job handle jobHandle from the threadpool.
Definition threadpool.h:423
void reset(JobHandle jobHandle)
Waits for jobHandle to finish and resets it to a reusable state.
Definition threadpool.h:527
void set_workers_high_prio(uint32_t count)
Updates the number of worker threads participating at high priority workloads.
Definition threadpool.h:260
GAIA_NODISCARD uint32_t workers() const
Returns the number of frame worker threads.
Definition threadpool.h:173
void dep(std::span< JobHandle > jobsFirst, JobHandle jobSecond)
Makes jobSecond depend on the jobs listed in jobsFirst. This means jobSecond will not run until all j...
Definition threadpool.h:337
void set_workers_low_prio_inter(uint32_t &workerIdx, uint32_t count)
Updates the number of worker threads participating at low priority workloads.
Definition threadpool.h:246
JobHandle sched_background(Job job)
Schedules a job to run on background workers. Background jobs are not drained by update() and may spa...
Definition threadpool.h:550
static GAIA_NODISCARD uint32_t hw_thread_cnt()
Returns the number of HW threads available on the system. 1 is minimum.
Definition threadpool.h:831
static SmallFunc create(F &&f)
Creates a wrapper from a callable compatible with void().
Definition small_func.h:188
Checks if endianess was detected correctly at compile-time.
Definition bitset.h:9
RAII helper that calls lock() on construction and unlock() on destruction.
Definition utility.h:186
static Result wait(const std::atomic_uint32_t *pFutexValue, uint32_t expected, uint32_t waitMask)
Suspends the caller on the futex while its value remains expected.
Definition futex.h:69
static uint32_t wake(const std::atomic_uint32_t *pFutexValue, uint32_t wakeCount, uint32_t wakeMask=detail::WaitMaskAny)
Wakes up to wakeCount waiters whose waitMask matches wakeMask.
Definition futex.h:101
Definition jobcommon.h:45
util::SmallFunc func
Function to execute when running the job.
Definition jobmanager.h:87
JobPriority prio
Job priority.
Definition jobmanager.h:81
std::atomic_uint32_t state
Current state of the job Consist of upper and bottom part. Least significant bits = special purpose....
Definition jobmanager.h:79
Definition jobhandle.h:13
Definition jobhandle.h:85
Non-owning callback descriptor for parallel jobs.
Definition jobcommon.h:201
Definition jobcommon.h:194
Definition jobcommon.h:39
Definition jobcommon.h:209