94 static constexpr uint32_t MaxWorkers = JobState::DEP_BITS;
97 std::thread::id m_mainThreadId;
100 std::atomic_bool m_stop{};
106 MpmcQueue<JobHandle, 1024> m_jobQueue[JobPriorityCnt];
108 uint32_t m_workersCnt[JobPriorityCnt]{};
113 std::atomic_uint32_t m_blockedInWorkUntil;
121 GAIA_PROF_MUTEX(
SpinLock, m_jobAllocMtx);
129 const auto hwThreads = hw_thread_cnt();
130 const auto hwEffThreads = hw_efficiency_cores_cnt();
131 uint32_t hiPrioWorkers = hwThreads;
132 if (hwEffThreads < hwThreads)
133 hiPrioWorkers -= hwEffThreads;
135 set_max_workers(hwThreads, hiPrioWorkers);
155 m_mainThreadId = std::this_thread::get_id();
160 return m_workers.size();
169 const auto workersCnt = core::get_max(core::get_min(MaxWorkers, count), 1U);
170 if (countHighPrio > count)
171 countHighPrio = count;
177 for (
auto& ctx: m_workersCtx)
181 m_workersCtx.resize(workersCnt);
183 m_workers.resize(workersCnt - 1);
188 detail::tl_workerCtx = m_workersCtx.data();
189 m_workersCtx[0].tp =
this;
190 m_workersCtx[0].workerIdx = 0;
191 m_workersCtx[0].prio = JobPriority::High;
194 for (
auto& worker: m_workers)
198 uint32_t workerIdx = 1;
199 set_workers_high_prio_inter(workerIdx, countHighPrio);
207 count = gaia::core::get_min(count, m_workers.size());
212 m_workersCnt[0] = count + 1;
213 m_workersCnt[1] = m_workers.size() - count;
217 create_worker_threads(workerIdx, JobPriority::High, m_workersCnt[0] - 1);
218 create_worker_threads(workerIdx, JobPriority::Low, m_workersCnt[1]);
226 const uint32_t realCnt = gaia::core::get_max(count, m_workers.size());
231 m_workersCnt[0] = m_workers.size() - realCnt;
232 m_workersCnt[1] = realCnt + 1;
236 create_worker_threads(workerIdx, JobPriority::High, m_workersCnt[0]);
237 create_worker_threads(workerIdx, JobPriority::Low, m_workersCnt[1]);
246 uint32_t workerIdx = 1;
247 set_workers_high_prio_inter(workerIdx, count);
256 uint32_t workerIdx = 1;
257 set_workers_low_prio_inter(workerIdx, count);
266 GAIA_ASSERT(main_thread());
277 GAIA_ASSERT(main_thread());
279 m_jobManager.
dep(jobsFirst, jobSecond);
290 GAIA_ASSERT(main_thread());
303 GAIA_ASSERT(main_thread());
312 template <
typename TJob>
314 GAIA_ASSERT(main_thread());
316 job.priority = final_prio(job);
318 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
320 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
322 return m_jobManager.
alloc_job(GAIA_FWD(job));
327 GAIA_ASSERT(main_thread());
328 GAIA_ASSERT(!jobHandles.empty());
330 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
332 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
334 for (
auto& jobHandle: jobHandles)
335 jobHandle = m_jobManager.alloc_job({{}, prio, JobCreationFlags::Default});
345#if GAIA_ASSERT_ENABLED
347 const auto& jobData = m_jobManager.data(jobHandle);
348 GAIA_ASSERT(jobData.state == 0 || m_jobManager.done(jobData));
352 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
354 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
365 if (jobHandles.empty())
368 GAIA_PROF_SCOPE(tp::submit);
373 for (
auto handle: jobHandles) {
376 auto& jobData = m_jobManager.data(handle);
377 const auto state = m_jobManager.submit(jobData) & JobState::DEP_BITS_MASK;
383 pHandles[cnt++] = handle;
386 auto* ctx = detail::tl_workerCtx;
400 if (jobHandles.empty())
403 GAIA_PROF_SCOPE(tp::reset);
405 for (
auto handle: jobHandles) {
406 auto& jobData = m_jobManager.data(handle);
407 m_jobManager.reset_state(jobData);
411 void reset_state(JobHandle jobHandle) {
434 dep(jobHandle, dependsOn);
447 GAIA_ASSERT(main_thread());
450 GAIA_ASSERT(itemsToProcess != 0);
451 if (itemsToProcess == 0)
455 if GAIA_UNLIKELY (m_stop)
459 const auto prio = job.priority = final_prio(job);
462 if (groupSize == 0) {
463 const auto cntWorkers = m_workersCnt[(uint32_t)prio];
464 groupSize = (itemsToProcess + cntWorkers - 1) / cntWorkers;
472 constexpr uint32_t maxUnitsOfWorkPerGroup = 8;
473 groupSize = groupSize / maxUnitsOfWorkPerGroup;
478 const auto jobs = (itemsToProcess + groupSize - 1) / groupSize;
485 const uint32_t groupJobIdxEnd = groupSize < itemsToProcess ? groupSize : itemsToProcess;
486 auto groupJobFunc = [job, groupJobIdxEnd]() {
489 args.idxEnd = groupJobIdxEnd;
493 auto handle = add(
Job{groupJobFunc, prio, JobCreationFlags::Default});
503 add_n(prio, handles);
505#if GAIA_ASSERT_ENABLED
506 for (
auto jobHandle: handles)
507 GAIA_ASSERT(m_jobManager.is_clear(jobHandle));
511 for (uint32_t jobIndex = 0; jobIndex < jobs; ++jobIndex) {
512 const uint32_t groupJobIdxStart = jobIndex * groupSize;
513 const uint32_t groupJobIdxStartPlusGroupSize = groupJobIdxStart + groupSize;
514 const uint32_t groupJobIdxEnd =
515 groupJobIdxStartPlusGroupSize < itemsToProcess ? groupJobIdxStartPlusGroupSize : itemsToProcess;
517 auto groupJobFunc = [job, groupJobIdxStart, groupJobIdxEnd]() {
519 args.idxStart = groupJobIdxStart;
520 args.idxEnd = groupJobIdxEnd;
524 auto& jobData = m_jobManager.data(pHandles[jobIndex]);
525 jobData.
func = groupJobFunc;
530 auto& jobData = m_jobManager.data(pHandles[jobs]);
535 dep(handles.subspan(0, jobs), pHandles[jobs]);
540 return pHandles[jobs];
547 GAIA_PROF_SCOPE(tp::wait);
549 GAIA_ASSERT(main_thread());
551 auto* ctx = detail::tl_workerCtx;
552 auto& jobData = m_jobManager.data(jobHandle);
553 auto state = jobData.
state.load();
556 GAIA_ASSERT(state != 0);
559 for (; (state & JobState::STATE_BITS_MASK) < JobState::Done; state = jobData.state.load()) {
562 if (try_fetch_job(*ctx, otherJobHandle)) {
563 if (run(otherJobHandle, ctx))
569 if ((state & JobState::STATE_BITS_MASK) == JobState::Executing) {
570 const auto workerId = (state & JobState::DEP_BITS_MASK);
571 auto* jobDoneEvent = &m_workersCtx[workerId].event;
572 jobDoneEvent->wait();
579 const auto workerBit = 1U << ctx->workerIdx;
580 const auto oldBlockedMask = m_blockedInWorkUntil.fetch_or(workerBit);
581 const auto newState = jobData.state.load();
582 if (newState == state)
583 Futex::wait(&m_blockedInWorkUntil, oldBlockedMask | workerBit, detail::WaitMaskAny);
584 m_blockedInWorkUntil.fetch_and(~workerBit);
590 GAIA_ASSERT(main_thread());
597 auto hwThreads = (uint32_t)std::thread::hardware_concurrency();
598 return core::get_max(1U, hwThreads);
604 uint32_t efficiencyCores = 0;
605#if GAIA_PLATFORM_APPLE
606 size_t size =
sizeof(efficiencyCores);
607 if (sysctlbyname(
"hw.perflevel1.logicalcpu", &efficiencyCores, &size,
nullptr, 0) != 0)
609#elif GAIA_PLATFORM_FREEBSD
613 size_t size =
sizeof(coreType);
615 snprintf(oidName,
sizeof(oidName),
"dev.cpu.%d.coretype", cpuIndex);
616 if (sysctlbyname(oidName, &coreType, &size,
nullptr, 0) != 0)
626#elif GAIA_PLATFORM_WINDOWS
630 if (!GetLogicalProcessorInformationEx(RelationProcessorCore,
nullptr, &length))
634 auto* pBuffer = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)malloc(length);
635 if (pBuffer ==
nullptr)
639 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, pBuffer, &length)) {
644 uint32_t heterogenousCnt = 0;
657 for (
char* ptr = (
char*)pBuffer; ptr < (
char*)pBuffer + length;
658 ptr += ((SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr)->Size) {
659 auto* entry = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr;
660 if (entry->Relationship == RelationProcessorCore) {
661 if (entry->Processor.EfficiencyClass == 0)
668 if (heterogenousCnt == 0)
672#elif GAIA_PLATFORM_LINUX
675 DIR* dir = opendir(
"/sys/devices/cpu_atom/cpus/");
680 while ((entry = readdir(dir)) !=
nullptr) {
681 if (strncmp(entry->d_name,
"cpu", 3) == 0 && entry->d_name[3] >=
'0' && entry->d_name[3] <=
'9')
688 if (efficiencyCores == 0) {
704 return efficiencyCores;
708 static void* thread_func(
void* pCtx) {
711 detail::tl_workerCtx = &ctx;
716 ctx.tp->set_thread_name(ctx.workerIdx, ctx.prio);
719 ctx.tp->set_thread_priority(ctx.workerIdx, ctx.prio);
722 ctx.tp->worker_loop(ctx);
724 detail::tl_workerCtx =
nullptr;
732 void create_thread(uint32_t workerIdx, JobPriority prio) {
734 GAIA_ASSERT(workerIdx > 0);
736 auto& ctx = m_workersCtx[workerIdx];
738 ctx.workerIdx = workerIdx;
741#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
742 m_workers[workerIdx - 1] = std::thread([&ctx]() {
743 thread_func((
void*)&ctx);
746 pthread_attr_t attr{};
747 int ret = pthread_attr_init(&attr);
749 GAIA_LOG_W(
"pthread_attr_init failed for worker thread %u. ErrCode = %d", workerIdx, ret);
770 if (prio == JobPriority::Low) {
771 #if GAIA_PLATFORM_APPLE
772 ret = pthread_attr_set_qos_class_np(&attr, QOS_CLASS_USER_INTERACTIVE, -9);
775 "pthread_attr_set_qos_class_np failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
776 (uint32_t)prio, ret);
779 ret = pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
782 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
783 (uint32_t)prio, ret);
786 int prioMax = core::get_min(38, sched_get_priority_max(SCHED_OTHER));
787 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_OTHER));
788 int prioUse = core::get_min(prioMin + 5, prioMax);
789 prioUse = core::get_max(prioUse, prioMin);
791 param.sched_priority = prioUse;
793 ret = pthread_attr_setschedparam(&attr, ¶m);
796 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
797 param.sched_priority, workerIdx, (uint32_t)prio, ret);
801 ret = pthread_attr_setschedpolicy(&attr, SCHED_RR);
804 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
805 (uint32_t)prio, ret);
808 int prioMax = core::get_min(41, sched_get_priority_max(SCHED_RR));
809 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_RR));
810 int prioUse = core::get_max(prioMax - 5, prioMin);
811 prioUse = core::get_min(prioUse, prioMax);
813 param.sched_priority = prioUse;
815 ret = pthread_attr_setschedparam(&attr, ¶m);
818 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
819 param.sched_priority, workerIdx, (uint32_t)prio, ret);
824 ret = pthread_create(&m_workers[workerIdx - 1], &attr, thread_func, (
void*)&ctx);
826 GAIA_LOG_W(
"pthread_create failed for worker thread %u. ErrCode = %d", workerIdx, ret);
829 pthread_attr_destroy(&attr);
833 set_thread_affinity(workerIdx);
838 void join_thread(uint32_t workerIdx) {
839 if GAIA_UNLIKELY (workerIdx > m_workers.size())
842#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
843 auto& t = m_workers[workerIdx - 1];
847 auto& t = m_workers[workerIdx - 1];
848 pthread_join(t,
nullptr);
852 void create_worker_threads(uint32_t& workerIdx, JobPriority prio, uint32_t count) {
853 for (uint32_t i = 0; i < count; ++i)
854 create_thread(workerIdx++, prio);
857 void set_thread_priority([[maybe_unused]] uint32_t workerIdx, [[maybe_unused]] JobPriority priority) {
858#if GAIA_PLATFORM_WINDOWS
859 HANDLE nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
861 THREAD_POWER_THROTTLING_STATE state{};
862 state.Version = THREAD_POWER_THROTTLING_CURRENT_VERSION;
863 if (priority == JobPriority::High) {
867 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
873 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
874 state.StateMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
877 BOOL ret = SetThreadInformation(nativeHandle, ThreadPowerThrottling, &state,
sizeof(state));
879 GAIA_LOG_W(
"SetThreadInformation failed for thread %u", workerIdx);
887 void set_thread_affinity([[maybe_unused]] uint32_t workerIdx) {
927 void set_thread_name(uint32_t workerIdx, JobPriority prio) {
928#if GAIA_PROF_USE_PROFILER_THREAD_NAME
929 char threadName[16]{};
930 snprintf(threadName, 16,
"worker_%s_%u", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
931 GAIA_PROF_THREAD_NAME(threadName);
932#elif GAIA_PLATFORM_WINDOWS
933 auto nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
935 TOSApiFunc_SetThreadDescription pSetThreadDescFunc =
nullptr;
936 if (
auto* pModule = GetModuleHandleA(
"kernel32.dll")) {
937 auto* pFunc = GetProcAddress(pModule,
"SetThreadDescription");
938 pSetThreadDescFunc =
reinterpret_cast<TOSApiFunc_SetThreadDescription
>(
reinterpret_cast<void*
>(pFunc));
940 if (pSetThreadDescFunc !=
nullptr) {
941 wchar_t threadName[16]{};
942 swprintf_s(threadName, L
"worker_%s_%u", prio == JobPriority::High ? L
"HI" : L
"LO", workerIdx);
944 auto hr = pSetThreadDescFunc(nativeHandle, threadName);
947 "Issue setting name for worker %s thread %u!", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
951 char threadName[16]{};
952 snprintf(threadName, 16,
"worker_%s_%u", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
954 THREADNAME_INFO info{};
955 info.dwType = 0x1000;
956 info.szName = threadName;
957 info.dwThreadID = GetThreadId(nativeHandle);
960 RaiseException(0x406D1388, 0,
sizeof(info) /
sizeof(ULONG_PTR), (ULONG_PTR*)&info);
961 } __except (EXCEPTION_EXECUTE_HANDLER) {
965#elif GAIA_PLATFORM_APPLE
966 char threadName[16]{};
967 snprintf(threadName, 16,
"worker_%s_%u", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
968 auto ret = pthread_setname_np(threadName);
970 GAIA_LOG_W(
"Issue setting name for worker %s thread %u!", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
971#elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
972 auto nativeHandle = m_workers[workerIdx - 1];
974 char threadName[16]{};
975 snprintf(threadName, 16,
"worker_%s_%u", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
976 GAIA_PROF_THREAD_NAME(threadName);
977 auto ret = pthread_setname_np(nativeHandle, threadName);
979 GAIA_LOG_W(
"Issue setting name for worker %s thread %u!", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
985 GAIA_NODISCARD
bool main_thread()
const {
986 return std::this_thread::get_id() == m_mainThreadId;
993 void main_thread_tick() {
994 auto& ctx = *detail::tl_workerCtx;
999 if (!try_fetch_job(ctx, jobHandle))
1002 if (run(jobHandle, &ctx))
1007 bool try_fetch_job(ThreadCtx& ctx, JobHandle& jobHandle) {
1009 if (ctx.jobQueue.try_pop(jobHandle))
1013 if (m_jobQueue[(uint32_t)ctx.prio].try_pop(jobHandle))
1017 const auto workerCnt = m_workersCtx.size();
1018 for (uint32_t i = 0; i < workerCnt;) {
1020 if (i == ctx.workerIdx) {
1026 const auto res = m_workersCtx[i].jobQueue.try_steal(jobHandle);
1035 if (res && jobHandle != (JobHandle)JobNull_t{})
1047 void worker_loop(ThreadCtx& ctx) {
1054 JobHandle jobHandle;
1055 if (!try_fetch_job(ctx, jobHandle))
1058 (void)run(jobHandle, detail::tl_workerCtx);
1062 const bool stop = m_stop.load();
1070 if (m_workers.empty())
1077 m_sem.
release((int32_t)m_workers.size());
1079 auto* ctx = detail::tl_workerCtx;
1082 JobHandle jobHandle;
1083 while (try_fetch_job(*ctx, jobHandle)) {
1084 run(jobHandle, ctx);
1087 detail::tl_workerCtx =
nullptr;
1090 GAIA_FOR(m_workers.size()) join_thread(i + 1);
1093 m_stop.store(false);
1097 template <typename TJob>
1098 JobPriority final_prio(const TJob& job) {
1099 const auto cntWorkers = m_workersCnt[(uint32_t)job.priority];
1100 return cntWorkers > 0
1104 : (JobPriority)(((uint32_t)job.priority + 1U) % (uint32_t)JobPriorityCnt);
1107 void signal_edges(JobContainer& jobData) {
1108 const auto max = jobData.edges.depCnt;
1114 auto* ctx = detail::tl_workerCtx;
1118 auto depHandle = jobData.edges.dep;
1119#if GAIA_LOG_JOB_STATES
1120 GAIA_LOG_N(
"SIGNAL %u.%u -> %u.%u", jobData.idx, jobData.gen, depHandle.id(), depHandle.gen());
1124 auto& depData = m_jobManager.data(depHandle);
1125 if (!JobManager::signal_edge(depData))
1134 GAIA_ASSERT(jobData.edges.pDeps !=
nullptr);
1136 auto* pHandles = (JobHandle*)alloca(
sizeof(JobHandle) * max);
1139 auto depHandle = jobData.edges.pDeps[i];
1142 auto& depData = m_jobManager.data(depHandle);
1143 if (!JobManager::signal_edge(depData))
1146 pHandles[cnt++] = depHandle;
1154 auto* pHandles = (JobHandle*)alloca(
sizeof(JobHandle) * jobHandles.size());
1155 uint32_t handlesCnt = 0;
1157 for (
auto handle: jobHandles) {
1158 auto& jobData = m_jobManager.data(handle);
1159 m_jobManager.processing(jobData);
1167 if (!jobData.func.operator
bool())
1168 (void)run(handle, ctx);
1170 pHandles[handlesCnt++] = handle;
1173 std::span handles(pHandles, handlesCnt);
1174 while (!handles.empty()) {
1176 uint32_t pushed = 0;
1177 if (ctx !=
nullptr) {
1178 pushed = ctx->jobQueue.try_push(handles);
1180 for (
auto handle: handles) {
1181 if (m_jobQueue[(uint32_t)handle.prio()].try_push(handle))
1190 const auto cntWorkers = m_workersCnt[(uint32_t)ctx->prio];
1191 const auto cnt = (int32_t)core::get_min(pushed, cntWorkers);
1194 handles = handles.subspan(pushed);
1195 if (!handles.empty()) {
1197 run(handles[0], ctx);
1198 handles = handles.subspan(1);
1203 bool run(JobHandle jobHandle, ThreadCtx* ctx) {
1204 if (jobHandle == (JobHandle)JobNull_t{})
1207 auto& jobData = m_jobManager.data(jobHandle);
1208 const bool manualDelete = (jobData.flags & JobCreationFlags::ManualDelete) != 0U;
1209 const bool canWait = (jobData.flags & JobCreationFlags::CanWait) != 0U;
1211 m_jobManager.executing(jobData, ctx->workerIdx);
1213 if (m_blockedInWorkUntil.load() != 0) {
1214 const auto blockedCnt = m_blockedInWorkUntil.exchange(0);
1215 if (blockedCnt != 0)
1216 Futex::wake(&m_blockedInWorkUntil, detail::WaitMaskAll);
1219 GAIA_ASSERT(jobData.idx != (uint32_t)-1 && jobData.data.gen != (uint32_t)-1);
1222 m_jobManager.
run(jobData);
1225 signal_edges(jobData);
1226 JobManager::free_edges(jobData);
1231 const auto* pFutexValue = &jobData.state;