94 static constexpr uint32_t MaxWorkers = JobState::DEP_BITS;
97 std::thread::id m_mainThreadId;
100 std::atomic_bool m_stop{};
106 MpmcQueue<JobHandle, 1024> m_jobQueue[JobPriorityCnt];
108 MpmcQueue<JobHandle, 1024> m_jobQueueBackground;
110 uint32_t m_frameWorkersCnt = 0;
112 uint32_t m_backgroundWorkersCnt = 0;
114 uint32_t m_workersCnt[JobPriorityCnt]{};
119 uint32_t m_workerThreadsCnt[JobPriorityCnt]{};
126 std::atomic_uint32_t m_blockedInWorkUntil;
135 GAIA_PROF_MUTEX(
SpinLock, m_jobAllocMtx);
143 const auto hwThreads = hw_thread_cnt();
144 const auto hwEffThreads = hw_efficiency_cores_cnt();
145 uint32_t hiPrioWorkers = hwThreads;
146 if (hwEffThreads < hwThreads)
147 hiPrioWorkers -= hwEffThreads;
149 set_max_workers(hwThreads, hiPrioWorkers);
169 m_mainThreadId = std::this_thread::get_id();
174 return m_frameWorkersCnt;
179 return m_backgroundWorkersCnt;
189 const auto maxFrameWorkers = MaxWorkers - m_backgroundWorkersCnt;
190 const auto workersCnt = core::get_max(core::get_min(maxFrameWorkers, count), 1U);
191 countHighPrio = core::get_min(countHighPrio, workersCnt);
197 for (
auto& ctx: m_workersCtx)
200 m_frameWorkersCnt = workersCnt - 1;
204 m_workersCtx.resize(workersCnt + m_backgroundWorkersCnt);
206 m_workers.resize(m_frameWorkersCnt + m_backgroundWorkersCnt);
211 detail::tl_workerCtx = m_workersCtx.data();
212 m_workersCtx[0].tp =
this;
213 m_workersCtx[0].workerIdx = 0;
214 m_workersCtx[0].prio = JobPriority::High;
217 for (
auto& worker: m_workers)
221 uint32_t workerIdx = 1;
222 set_workers_high_prio_inter(workerIdx, countHighPrio);
223 create_background_worker_threads(workerIdx);
231 count = gaia::core::get_min(count, m_frameWorkersCnt);
232 m_workerThreadsCnt[0] = count;
233 m_workerThreadsCnt[1] = m_frameWorkersCnt - count;
234 m_workersCnt[0] = count + 1;
235 m_workersCnt[1] = m_workerThreadsCnt[1];
238 create_worker_threads(workerIdx, JobPriority::High, m_workerThreadsCnt[0]);
239 create_worker_threads(workerIdx, JobPriority::Low, m_workerThreadsCnt[1]);
247 const uint32_t realCnt = gaia::core::get_min(count, m_frameWorkersCnt);
248 m_workerThreadsCnt[0] = m_frameWorkersCnt - realCnt;
249 m_workerThreadsCnt[1] = realCnt;
250 m_workersCnt[0] = m_workerThreadsCnt[0] + 1;
251 m_workersCnt[1] = m_workerThreadsCnt[1];
254 create_worker_threads(workerIdx, JobPriority::High, m_workerThreadsCnt[0]);
255 create_worker_threads(workerIdx, JobPriority::Low, m_workerThreadsCnt[1]);
263 detail::tl_workerCtx = m_workersCtx.data();
265 uint32_t workerIdx = 1;
266 set_workers_high_prio_inter(workerIdx, count);
267 create_background_worker_threads(workerIdx);
275 detail::tl_workerCtx = m_workersCtx.data();
277 uint32_t workerIdx = 1;
278 set_workers_low_prio_inter(workerIdx, count);
279 create_background_worker_threads(workerIdx);
288 const auto maxBackgroundWorkers = MaxWorkers - 1;
289 count = core::get_min(maxBackgroundWorkers, count);
291 const auto frameWorkersCntOld = m_frameWorkersCnt;
292 const auto highWorkersCntOld = m_workerThreadsCnt[0];
297 m_backgroundWorkersCnt = count;
299 const auto maxFrameWorkers = MaxWorkers - m_backgroundWorkersCnt - 1;
300 m_frameWorkersCnt = core::get_min(frameWorkersCntOld, maxFrameWorkers);
302 for (
auto& ctx: m_workersCtx)
305 m_workersCtx.resize(m_frameWorkersCnt + 1 + m_backgroundWorkersCnt);
306 m_workers.resize(m_frameWorkersCnt + m_backgroundWorkersCnt);
308 detail::tl_workerCtx = m_workersCtx.data();
309 m_workersCtx[0].tp =
this;
310 m_workersCtx[0].workerIdx = 0;
311 m_workersCtx[0].prio = JobPriority::High;
313 for (
auto& worker: m_workers)
316 uint32_t workerIdx = 1;
317 set_workers_high_prio_inter(workerIdx, highWorkersCntOld);
318 create_background_worker_threads(workerIdx);
327 GAIA_ASSERT(main_thread());
338 GAIA_ASSERT(main_thread());
340 m_jobManager.
dep(jobsFirst, jobSecond);
351 GAIA_ASSERT(main_thread());
364 GAIA_ASSERT(main_thread());
374 template <
typename TJob>
376 GAIA_ASSERT(main_thread());
378 job.priority = final_prio(job);
380 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
382 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
384 return m_jobManager.
alloc_job(GAIA_MOV(job));
389 GAIA_ASSERT(main_thread());
390 GAIA_ASSERT(!jobHandles.empty());
392 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
394 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
396 for (
auto& jobHandle: jobHandles)
397 jobHandle = m_jobManager.alloc_job({{}, prio, JobCreationFlags::Default});
400 GAIA_NODISCARD ParallelCallbackHandle add_parallel_callback(JobArgsFunc callback, uint32_t refs) {
401 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
402 core::lock_scope lock(mtx);
403 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
408 void release_parallel_callback(ParallelCallbackHandle handle) {
412 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
413 core::lock_scope lock(mtx);
414 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
426#if GAIA_ASSERT_ENABLED
428 const auto& jobData = m_jobManager.
data(jobHandle);
429 GAIA_ASSERT(jobData.state == 0 || m_jobManager.
done(jobData));
433 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
435 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
446 if (jobHandles.empty())
449 GAIA_PROF_SCOPE(tp::submit);
454 for (
auto handle: jobHandles) {
457 auto& jobData = m_jobManager.
data(handle);
458 const auto state = m_jobManager.
submit(jobData) & JobState::DEP_BITS_MASK;
464 pHandles[cnt++] = handle;
467 auto* ctx = detail::tl_workerCtx;
481 if (jobHandles.empty())
484 GAIA_PROF_SCOPE(tp::reset);
486 for (
auto handle: jobHandles) {
487 auto& jobData = m_jobManager.
data(handle);
492 void reset_state(JobHandle jobHandle) {
499 if (jobHandles.empty())
502 GAIA_ASSERT(main_thread());
503 GAIA_PROF_SCOPE(tp::reset_wait);
506 for (
auto handle: jobHandles) {
512 for (
auto handle: jobHandles) {
516 auto& jobData = m_jobManager.
data(handle);
517 const auto state = jobData.
state.load() & JobState::STATE_BITS_MASK;
519 if (state == JobState::Released)
537 JobHandle jobHandle = add(GAIA_MOV(job));
551 job.flags = (JobCreationFlags)((uint8_t)job.flags | (uint8_t)JobCreationFlags::Background);
552 JobHandle jobHandle = add(GAIA_MOV(job));
564 JobHandle jobHandle = add(GAIA_MOV(job));
565 dep(jobHandle, dependsOn);
578 GAIA_ASSERT(main_thread());
581 GAIA_ASSERT(itemsToProcess != 0);
582 if (itemsToProcess == 0)
586 if GAIA_UNLIKELY (m_stop)
590 const auto prio = job.priority = final_prio(job);
593 if (groupSize == 0) {
594 const auto cntWorkers = m_workersCnt[(uint32_t)prio];
595 groupSize = (itemsToProcess + cntWorkers - 1) / cntWorkers;
603 constexpr uint32_t maxUnitsOfWorkPerGroup = 8;
604 groupSize = groupSize / maxUnitsOfWorkPerGroup;
609 const auto jobs = (itemsToProcess + groupSize - 1) / groupSize;
616 const uint32_t groupJobIdxEnd = groupSize < itemsToProcess ? groupSize : itemsToProcess;
617 auto groupFunc = GAIA_MOV(job.func);
618 auto groupJobFunc = [func = GAIA_MOV(groupFunc), groupJobIdxEnd]()
mutable {
621 args.idxEnd = groupJobIdxEnd;
625 auto handle = add(
Job{GAIA_MOV(groupJobFunc), prio, JobCreationFlags::Default});
632 auto callbackHandle = add_parallel_callback(GAIA_MOV(job.func), jobs);
637 add_n(prio, handles);
639#if GAIA_ASSERT_ENABLED
640 for (
auto jobHandle: handles)
641 GAIA_ASSERT(m_jobManager.
is_clear(jobHandle));
645 for (uint32_t jobIndex = 0; jobIndex < jobs; ++jobIndex) {
646 const uint32_t groupJobIdxStart = jobIndex * groupSize;
647 const uint32_t groupJobIdxStartPlusGroupSize = groupJobIdxStart + groupSize;
648 const uint32_t groupJobIdxEnd =
649 groupJobIdxStartPlusGroupSize < itemsToProcess ? groupJobIdxStartPlusGroupSize : itemsToProcess;
651 auto groupJobFunc = [
this, callbackHandle, groupJobIdxStart, groupJobIdxEnd]() {
653 args.idxStart = groupJobIdxStart;
654 args.idxEnd = groupJobIdxEnd;
656 release_parallel_callback(callbackHandle);
659 auto& jobData = m_jobManager.
data(pHandles[jobIndex]);
665 auto& jobData = m_jobManager.
data(pHandles[jobs]);
670 dep(handles.subspan(0, jobs), pHandles[jobs]);
675 return pHandles[jobs];
686 GAIA_ASSERT(main_thread());
687 GAIA_ASSERT(job.pCtx !=
nullptr);
688 GAIA_ASSERT(job.invoke !=
nullptr);
690 GAIA_ASSERT(itemsToProcess != 0);
691 if (itemsToProcess == 0)
694 if GAIA_UNLIKELY (m_stop)
697 const auto prio = job.priority = final_prio(job);
699 if (groupSize == 0) {
700 const auto cntWorkers = m_workersCnt[(uint32_t)prio];
701 groupSize = (itemsToProcess + cntWorkers - 1) / cntWorkers;
703 constexpr uint32_t maxUnitsOfWorkPerGroup = 8;
704 groupSize = groupSize / maxUnitsOfWorkPerGroup;
709 const auto jobs = (itemsToProcess + groupSize - 1) / groupSize;
712 const uint32_t groupJobIdxEnd = groupSize < itemsToProcess ? groupSize : itemsToProcess;
713 auto* pCtx = job.pCtx;
714 auto invoke = job.invoke;
715 auto groupJobFunc = [pCtx, invoke, groupJobIdxEnd]() {
718 args.idxEnd = groupJobIdxEnd;
722 auto handle = add(
Job{GAIA_MOV(groupJobFunc), prio, JobCreationFlags::Default});
730 add_n(prio, handles);
732#if GAIA_ASSERT_ENABLED
733 for (
auto jobHandle: handles)
734 GAIA_ASSERT(m_jobManager.
is_clear(jobHandle));
737 for (uint32_t jobIndex = 0; jobIndex < jobs; ++jobIndex) {
738 const uint32_t groupJobIdxStart = jobIndex * groupSize;
739 const uint32_t groupJobIdxStartPlusGroupSize = groupJobIdxStart + groupSize;
740 const uint32_t groupJobIdxEnd =
741 groupJobIdxStartPlusGroupSize < itemsToProcess ? groupJobIdxStartPlusGroupSize : itemsToProcess;
743 auto* pCtx = job.pCtx;
744 auto invoke = job.invoke;
745 auto groupJobFunc = [pCtx, invoke, groupJobIdxStart, groupJobIdxEnd]() {
747 args.idxStart = groupJobIdxStart;
748 args.idxEnd = groupJobIdxEnd;
752 auto& jobData = m_jobManager.
data(pHandles[jobIndex]);
757 auto& jobData = m_jobManager.
data(pHandles[jobs]);
761 dep(handles.subspan(0, jobs), pHandles[jobs]);
763 return pHandles[jobs];
773 GAIA_PROF_SCOPE(tp::wait);
775 GAIA_ASSERT(main_thread());
781 auto* ctx = detail::tl_workerCtx;
782 auto& jobData = m_jobManager.
data(jobHandle);
783 const bool waitBackground = is_background(jobData);
784 auto state = jobData.state.load();
787 GAIA_ASSERT(state != 0);
790 for (; (state & JobState::STATE_BITS_MASK) < JobState::Done; state = jobData.state.load()) {
793 const bool canHelpBackground = waitBackground && m_backgroundWorkersCnt == 0;
794 const bool hasBackgroundJob = canHelpBackground && try_fetch_background_job(otherJobHandle);
795 const bool hasJob = hasBackgroundJob || try_fetch_job(*ctx, otherJobHandle);
797 if (run(otherJobHandle, ctx))
803 if ((state & JobState::STATE_BITS_MASK) == JobState::Executing) {
804 const auto workerId = (state & JobState::DEP_BITS_MASK);
805 auto* jobDoneEvent = &m_workersCtx[workerId].event;
806 jobDoneEvent->wait();
813 const auto workerBit = 1U << ctx->workerIdx;
814 const auto oldBlockedMask = m_blockedInWorkUntil.fetch_or(workerBit);
815 const auto newState = jobData.state.load();
816 if (newState == state)
817 Futex::wait(&m_blockedInWorkUntil, oldBlockedMask | workerBit, detail::WaitMaskAny);
818 m_blockedInWorkUntil.fetch_and(~workerBit);
825 GAIA_ASSERT(main_thread());
832 auto hwThreads = (uint32_t)std::thread::hardware_concurrency();
833 return core::get_max(1U, hwThreads);
839 uint32_t efficiencyCores = 0;
840#if GAIA_PLATFORM_APPLE
841 size_t size =
sizeof(efficiencyCores);
842 if (sysctlbyname(
"hw.perflevel1.logicalcpu", &efficiencyCores, &size,
nullptr, 0) != 0)
844#elif GAIA_PLATFORM_FREEBSD
848 size_t size =
sizeof(coreType);
850 GAIA_STRFMT(oidName,
sizeof(oidName),
"dev.cpu.%d.coretype", cpuIndex);
851 if (sysctlbyname(oidName, &coreType, &size,
nullptr, 0) != 0)
861#elif GAIA_PLATFORM_WINDOWS
865 if (!GetLogicalProcessorInformationEx(RelationProcessorCore,
nullptr, &length))
869 auto* pBuffer = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)malloc(length);
870 if (pBuffer ==
nullptr)
874 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, pBuffer, &length)) {
879 uint32_t heterogenousCnt = 0;
892 for (
char* ptr = (
char*)pBuffer; ptr < (
char*)pBuffer + length;
893 ptr += ((SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr)->Size) {
894 auto* entry = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr;
895 if (entry->Relationship == RelationProcessorCore) {
896 if (entry->Processor.EfficiencyClass == 0)
903 if (heterogenousCnt == 0)
907#elif GAIA_PLATFORM_LINUX
910 DIR* dir = opendir(
"/sys/devices/cpu_atom/cpus/");
915 while ((entry = readdir(dir)) !=
nullptr) {
916 if (strncmp(entry->d_name,
"cpu", 3) == 0 && entry->d_name[3] >=
'0' && entry->d_name[3] <=
'9')
923 if (efficiencyCores == 0) {
939 return efficiencyCores;
943 static void* thread_func(
void* pCtx) {
946 detail::tl_workerCtx = &ctx;
951 ctx.tp->set_thread_name(ctx.workerIdx, ctx.prio);
954 ctx.tp->set_thread_priority(ctx.workerIdx, ctx.prio);
957 ctx.tp->worker_loop(ctx);
959 detail::tl_workerCtx =
nullptr;
968 void create_thread(uint32_t workerIdx, JobPriority prio,
bool background) {
970 GAIA_ASSERT(workerIdx > 0);
972 auto& ctx = m_workersCtx[workerIdx];
974 ctx.workerIdx = workerIdx;
976 ctx.background = background;
977 ctx.threadCreated =
false;
979#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
980 m_workers[workerIdx - 1] = std::thread([&ctx]() {
981 thread_func((
void*)&ctx);
984 pthread_attr_t attr{};
985 int ret = pthread_attr_init(&attr);
987 GAIA_LOG_W(
"pthread_attr_init failed for worker thread %u. ErrCode = %d", workerIdx, ret);
1008 if (prio == JobPriority::Low) {
1009 #if GAIA_PLATFORM_APPLE
1010 ret = pthread_attr_set_qos_class_np(&attr, QOS_CLASS_USER_INTERACTIVE, -9);
1013 "pthread_attr_set_qos_class_np failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
1014 (uint32_t)prio, ret);
1017 ret = pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
1020 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
1021 (uint32_t)prio, ret);
1024 int prioMax = core::get_min(38, sched_get_priority_max(SCHED_OTHER));
1025 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_OTHER));
1026 int prioUse = core::get_min(prioMin + 5, prioMax);
1027 prioUse = core::get_max(prioUse, prioMin);
1028 sched_param param{};
1029 param.sched_priority = prioUse;
1031 ret = pthread_attr_setschedparam(&attr, ¶m);
1034 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
1035 param.sched_priority, workerIdx, (uint32_t)prio, ret);
1039 ret = pthread_attr_setschedpolicy(&attr, SCHED_RR);
1042 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
1043 (uint32_t)prio, ret);
1046 int prioMax = core::get_min(41, sched_get_priority_max(SCHED_RR));
1047 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_RR));
1048 int prioUse = core::get_max(prioMax - 5, prioMin);
1049 prioUse = core::get_min(prioUse, prioMax);
1050 sched_param param{};
1051 param.sched_priority = prioUse;
1053 ret = pthread_attr_setschedparam(&attr, ¶m);
1056 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
1057 param.sched_priority, workerIdx, (uint32_t)prio, ret);
1062 ret = pthread_create(&m_workers[workerIdx - 1], &attr, thread_func, (
void*)&ctx);
1064 GAIA_LOG_W(
"pthread_create failed for worker thread %u. ErrCode = %d", workerIdx, ret);
1066 ctx.threadCreated =
true;
1069 pthread_attr_destroy(&attr);
1073 set_thread_affinity(workerIdx);
1078 void join_thread(uint32_t workerIdx) {
1079 if GAIA_UNLIKELY (workerIdx > m_workers.size())
1082#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
1083 auto& t = m_workers[workerIdx - 1];
1087 auto& ctx = m_workersCtx[workerIdx];
1088 if (!ctx.threadCreated)
1091 auto& t = m_workers[workerIdx - 1];
1092 pthread_join(t,
nullptr);
1093 ctx.threadCreated =
false;
1097 void create_worker_threads(uint32_t& workerIdx, JobPriority prio, uint32_t count) {
1098 for (uint32_t i = 0; i < count; ++i)
1099 create_thread(workerIdx++, prio,
false);
1102 void create_background_worker_threads(uint32_t& workerIdx) {
1103 for (uint32_t i = 0; i < m_backgroundWorkersCnt; ++i)
1104 create_thread(workerIdx++, JobPriority::Low,
true);
1107 void set_thread_priority([[maybe_unused]] uint32_t workerIdx, [[maybe_unused]] JobPriority priority) {
1108#if GAIA_PLATFORM_WINDOWS
1109 HANDLE nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
1111 THREAD_POWER_THROTTLING_STATE state{};
1112 state.Version = THREAD_POWER_THROTTLING_CURRENT_VERSION;
1113 if (priority == JobPriority::High) {
1117 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
1118 state.StateMask = 0;
1123 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
1124 state.StateMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
1127 BOOL ret = SetThreadInformation(nativeHandle, ThreadPowerThrottling, &state,
sizeof(state));
1129 GAIA_LOG_W(
"SetThreadInformation failed for thread %u", workerIdx);
1137 void set_thread_affinity([[maybe_unused]] uint32_t workerIdx) {
1177 void set_thread_name(uint32_t workerIdx, JobPriority prio) {
1178 const bool background = m_workersCtx[workerIdx].background;
1179 const char* workerKind = background ?
"BG" : prio == JobPriority::High ?
"HI" :
"LO";
1180#if GAIA_PROF_USE_PROFILER_THREAD_NAME
1181 char threadName[16]{};
1182 GAIA_STRFMT(threadName, 16,
"worker_%s_%u", workerKind, workerIdx);
1183 GAIA_PROF_THREAD_NAME(threadName);
1184#elif GAIA_PLATFORM_WINDOWS
1185 auto nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
1186 const wchar_t* workerKindW = background ? L
"BG" : prio == JobPriority::High ? L
"HI" : L
"LO";
1188 TOSApiFunc_SetThreadDescription pSetThreadDescFunc =
nullptr;
1189 if (
auto* pModule = GetModuleHandleA(
"kernel32.dll")) {
1190 auto* pFunc = GetProcAddress(pModule,
"SetThreadDescription");
1191 pSetThreadDescFunc =
reinterpret_cast<TOSApiFunc_SetThreadDescription
>(
reinterpret_cast<void*
>(pFunc));
1193 if (pSetThreadDescFunc !=
nullptr) {
1194 wchar_t threadName[16]{};
1195 swprintf_s(threadName, L
"worker_%s_%u", workerKindW, workerIdx);
1197 auto hr = pSetThreadDescFunc(nativeHandle, threadName);
1199 GAIA_LOG_W(
"Issue setting name for worker %s thread %u!", workerKind, workerIdx);
1202 #if defined _MSC_VER
1203 char threadName[16]{};
1204 GAIA_STRFMT(threadName, 16,
"worker_%s_%u", workerKind, workerIdx);
1206 THREADNAME_INFO info{};
1207 info.dwType = 0x1000;
1208 info.szName = threadName;
1209 info.dwThreadID = GetThreadId(nativeHandle);
1212 RaiseException(0x406D1388, 0,
sizeof(info) /
sizeof(ULONG_PTR), (ULONG_PTR*)&info);
1213 } __except (EXCEPTION_EXECUTE_HANDLER) {
1217#elif GAIA_PLATFORM_APPLE
1218 char threadName[16]{};
1219 GAIA_STRFMT(threadName, 16,
"worker_%s_%u", workerKind, workerIdx);
1220 auto ret = pthread_setname_np(threadName);
1222 GAIA_LOG_W(
"Issue setting name for worker %s thread %u!", workerKind, workerIdx);
1223#elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
1224 auto nativeHandle = m_workers[workerIdx - 1];
1226 char threadName[16]{};
1227 GAIA_STRFMT(threadName, 16,
"worker_%s_%u", workerKind, workerIdx);
1228 GAIA_PROF_THREAD_NAME(threadName);
1229 auto ret = pthread_setname_np(nativeHandle, threadName);
1231 GAIA_LOG_W(
"Issue setting name for worker %s thread %u!", workerKind, workerIdx);
1237 GAIA_NODISCARD
bool main_thread()
const {
1238 return std::this_thread::get_id() == m_mainThreadId;
1243 void main_thread_tick() {
1244 auto& ctx = *detail::tl_workerCtx;
1248 JobHandle jobHandle;
1249 if (!try_fetch_job(ctx, jobHandle))
1252 (void)run(jobHandle, &ctx);
1261 GAIA_NODISCARD
bool try_steal_job(ThreadCtx& ctx, JobPriority prio, JobHandle& jobHandle) {
1262 const auto workerCnt = m_workersCtx.size();
1263 for (uint32_t i = 0; i < workerCnt;) {
1265 if (i == ctx.workerIdx || m_workersCtx[i].background || m_workersCtx[i].prio != prio) {
1270 const auto res = m_workersCtx[i].jobQueue.try_steal(jobHandle);
1278 if (jobHandle != (JobHandle)JobNull_t{})
1292 GAIA_NODISCARD
bool try_fetch_prio(ThreadCtx& ctx, JobPriority prio, JobHandle& jobHandle) {
1293 if (m_jobQueue[(uint32_t)prio].try_pop(jobHandle))
1296 return try_steal_job(ctx, prio, jobHandle);
1302 GAIA_NODISCARD
bool try_fetch_background_job(JobHandle& jobHandle) {
1303 return m_jobQueueBackground.try_pop(jobHandle);
1310 GAIA_NODISCARD
bool try_fetch_job(ThreadCtx& ctx, JobHandle& jobHandle) {
1312 return try_fetch_background_job(jobHandle);
1315 if (ctx.jobQueue.try_pop(jobHandle))
1319 if (ctx.workerIdx == 0) {
1320 if (try_fetch_prio(ctx, JobPriority::High, jobHandle))
1323 return try_fetch_prio(ctx, JobPriority::Low, jobHandle);
1326 return try_fetch_prio(ctx, ctx.prio, jobHandle);
1333 GAIA_NODISCARD
bool can_run_inline(
const ThreadCtx* ctx,
const JobContainer& jobData)
const {
1334 const bool background = is_background(jobData);
1336 return (ctx !=
nullptr && ctx->background) || m_backgroundWorkersCnt == 0;
1339 if (ctx ==
nullptr || ctx->workerIdx == 0)
1343 if (!ctx->background && ctx->prio == jobData.prio)
1348 return m_workerThreadsCnt[(uint32_t)jobData.prio] == 0;
1354 void wait_for_queue_space(ThreadCtx& ctx,
const JobContainer& jobData) {
1355 const bool background = is_background(jobData);
1360 if (m_backgroundWorkersCnt != 0)
1363 const auto prioIdx = (uint32_t)jobData.prio;
1364 if (m_workerThreadsCnt[prioIdx] != 0)
1369 JobHandle otherJobHandle;
1370 const bool hasWork =
1371 ctx.background ? try_fetch_background_job(otherJobHandle) : try_fetch_prio(ctx, ctx.prio, otherJobHandle);
1373 (void)run(otherJobHandle, &ctx);
1377 std::this_thread::yield();
1383 void worker_loop(ThreadCtx& ctx) {
1387 m_semBackground.
wait();
1389 m_sem[(uint32_t)ctx.prio].
wait();
1393 JobHandle jobHandle;
1394 if (!try_fetch_job(ctx, jobHandle))
1397 (void)run(jobHandle, detail::tl_workerCtx);
1401 const bool stop = m_stop.load();
1409 if (m_workers.empty())
1416 GAIA_FOR(JobPriorityCnt) {
1417 if (m_workerThreadsCnt[i] != 0)
1418 m_sem[i].
release((int32_t)m_workerThreadsCnt[i]);
1420 if (m_backgroundWorkersCnt != 0)
1421 m_semBackground.
release((int32_t)m_backgroundWorkersCnt);
1423 auto* ctx = detail::tl_workerCtx;
1424 if (ctx ==
nullptr) {
1427 ctx = &m_workersCtx[0];
1428 detail::tl_workerCtx = ctx;
1432 JobHandle jobHandle;
1433 while (try_fetch_job(*ctx, jobHandle)) {
1434 run(jobHandle, ctx);
1436 while (try_fetch_background_job(jobHandle)) {
1437 run(jobHandle, ctx);
1440 detail::tl_workerCtx =
nullptr;
1443 GAIA_FOR(m_workers.size()) join_thread(i + 1);
1446 m_stop.store(false);
1450 JobPriority final_frame_prio(JobPriority priority) {
1451 const auto cntWorkers = m_workersCnt[(uint32_t)priority];
1452 return cntWorkers > 0
1456 : (JobPriority)(((uint32_t)priority + 1U) % (uint32_t)JobPriorityCnt);
1460 JobPriority final_prio(
const Job& job) {
1461 if ((job.flags & JobCreationFlags::Background) != 0U)
1462 return job.priority;
1464 return final_frame_prio(job.priority);
1468 template <
typename TJob>
1469 JobPriority final_prio(
const TJob& job) {
1470 return final_frame_prio(job.priority);
1476 GAIA_NODISCARD
static bool is_background(
const JobContainer& jobData) {
1477 return (jobData.flags & JobCreationFlags::Background) != 0U;
1480 void signal_edges(JobContainer& jobData) {
1481 const auto max = jobData.edges.depCnt;
1487 auto* ctx = detail::tl_workerCtx;
1491 auto depHandle = jobData.edges.dep;
1492#if GAIA_LOG_JOB_STATES
1493 GAIA_LOG_N(
"SIGNAL %u.%u -> %u.%u", jobData.idx, jobData.gen, depHandle.id(), depHandle.gen());
1497 auto& depData = m_jobManager.
data(depHandle);
1507 GAIA_ASSERT(jobData.edges.pDeps !=
nullptr);
1509 auto* pHandles = (JobHandle*)alloca(
sizeof(JobHandle) * max);
1512 auto depHandle = jobData.edges.pDeps[i];
1515 auto& depData = m_jobManager.
data(depHandle);
1519 pHandles[cnt++] = depHandle;
1530 auto* pHandles = (JobHandle*)alloca(
sizeof(JobHandle) * jobHandles.size());
1531 uint32_t handlesCnt = 0;
1533 for (
auto handle: jobHandles) {
1534 auto& jobData = m_jobManager.
data(handle);
1543 if (!jobData.func.operator
bool())
1544 (void)run(handle, ctx);
1546 pHandles[handlesCnt++] = handle;
1549 std::span handles(pHandles, handlesCnt);
1550 while (!handles.empty()) {
1552 uint32_t pushed = 0;
1553 uint32_t released[JobPriorityCnt]{};
1554 uint32_t backgroundReleased = 0;
1555 for (; pushed < handles.size(); ++pushed) {
1556 const auto handle = handles[pushed];
1557 const auto& jobData = m_jobManager.
data(handle);
1558 if (is_background(jobData)) {
1559 if (!m_jobQueueBackground.try_push(handle))
1562 ++backgroundReleased;
1566 const auto prio = jobData.
prio;
1570 const bool useLocalQueue = ctx !=
nullptr && !ctx->background && ctx->workerIdx != 0 && ctx->prio == prio;
1572 useLocalQueue ? ctx->jobQueue.try_push(handle) : m_jobQueue[(uint32_t)prio].try_push(handle);
1576 released[(uint32_t)prio]++;
1579 GAIA_FOR(JobPriorityCnt) {
1582 const auto cnt = core::get_min(released[i], m_workerThreadsCnt[i]);
1584 m_sem[i].
release((int32_t)cnt);
1586 const auto backgroundCnt = core::get_min(backgroundReleased, m_backgroundWorkersCnt);
1587 if (backgroundCnt != 0)
1588 m_semBackground.
release((int32_t)backgroundCnt);
1590 handles = handles.subspan(pushed);
1591 if (!handles.empty()) {
1592 const auto handle = handles[0];
1593 const auto& jobData = m_jobManager.
data(handle);
1595 if (can_run_inline(ctx, jobData)) {
1599 handles = handles.subspan(1);
1601 GAIA_ASSERT(ctx !=
nullptr);
1602 wait_for_queue_space(*ctx, jobData);
1608 bool run(JobHandle jobHandle, ThreadCtx* ctx) {
1609 if (jobHandle == (JobHandle)JobNull_t{})
1612 auto& jobData = m_jobManager.
data(jobHandle);
1613 const bool manualDelete = (jobData.flags & JobCreationFlags::ManualDelete) != 0U;
1614 const bool canWait = (jobData.flags & JobCreationFlags::CanWait) != 0U;
1616 m_jobManager.
executing(jobData, ctx->workerIdx);
1618 if (m_blockedInWorkUntil.load() != 0) {
1619 const auto blockedCnt = m_blockedInWorkUntil.exchange(0);
1620 if (blockedCnt != 0)
1621 Futex::wake(&m_blockedInWorkUntil, detail::WaitMaskAll);
1624 GAIA_ASSERT(jobData.idx != (uint32_t)-1 && jobData.data.gen != (uint32_t)-1);
1627 m_jobManager.
run(jobData);
1630 signal_edges(jobData);
1636 const auto* pFutexValue = &jobData.state;