94 static constexpr uint32_t MaxWorkers = JobState::DEP_BITS;
97 std::thread::id m_mainThreadId;
100 std::atomic_bool m_stop{};
106 MpmcQueue<JobHandle, 1024> m_jobQueue[JobPriorityCnt];
108 uint32_t m_workersCnt[JobPriorityCnt]{};
113 std::atomic_uint32_t m_blockedInWorkUntil;
121 GAIA_PROF_MUTEX(
SpinLock, m_jobAllocMtx);
129 const auto hwThreads = hw_thread_cnt();
130 const auto hwEffThreads = hw_efficiency_cores_cnt();
131 uint32_t hiPrioWorkers = hwThreads;
132 if (hwEffThreads < hwThreads)
133 hiPrioWorkers -= hwEffThreads;
135 set_max_workers(hwThreads, hiPrioWorkers);
155 m_mainThreadId = std::this_thread::get_id();
160 return m_workers.size();
169 const auto workersCnt = core::get_max(core::get_min(MaxWorkers, count), 1U);
170 if (countHighPrio > count)
171 countHighPrio = count;
177 for (
auto& ctx: m_workersCtx)
181 m_workersCtx.resize(workersCnt);
183 m_workers.resize(workersCnt - 1);
188 detail::tl_workerCtx = m_workersCtx.data();
189 m_workersCtx[0].tp =
this;
190 m_workersCtx[0].workerIdx = 0;
191 m_workersCtx[0].prio = JobPriority::High;
194 for (
auto& worker: m_workers)
198 uint32_t workerIdx = 1;
199 set_workers_high_prio_inter(workerIdx, countHighPrio);
207 count = gaia::core::get_min(count, m_workers.size());
212 m_workersCnt[0] = count + 1;
213 m_workersCnt[1] = m_workers.size() - count;
217 create_worker_threads(workerIdx, JobPriority::High, m_workersCnt[0] - 1);
218 create_worker_threads(workerIdx, JobPriority::Low, m_workersCnt[1]);
226 const uint32_t realCnt = gaia::core::get_max(count, m_workers.size());
231 m_workersCnt[0] = m_workers.size() - realCnt;
232 m_workersCnt[1] = realCnt + 1;
236 create_worker_threads(workerIdx, JobPriority::High, m_workersCnt[0]);
237 create_worker_threads(workerIdx, JobPriority::Low, m_workersCnt[1]);
245 detail::tl_workerCtx = m_workersCtx.data();
247 uint32_t workerIdx = 1;
248 set_workers_high_prio_inter(workerIdx, count);
256 detail::tl_workerCtx = m_workersCtx.data();
258 uint32_t workerIdx = 1;
259 set_workers_low_prio_inter(workerIdx, count);
268 GAIA_ASSERT(main_thread());
279 GAIA_ASSERT(main_thread());
281 m_jobManager.
dep(jobsFirst, jobSecond);
292 GAIA_ASSERT(main_thread());
305 GAIA_ASSERT(main_thread());
314 template <
typename TJob>
316 GAIA_ASSERT(main_thread());
318 job.priority = final_prio(job);
320 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
322 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
324 return m_jobManager.
alloc_job(GAIA_FWD(job));
329 GAIA_ASSERT(main_thread());
330 GAIA_ASSERT(!jobHandles.empty());
332 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
334 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
336 for (
auto& jobHandle: jobHandles)
337 jobHandle = m_jobManager.alloc_job({{}, prio, JobCreationFlags::Default});
347#if GAIA_ASSERT_ENABLED
349 const auto& jobData = m_jobManager.data(jobHandle);
350 GAIA_ASSERT(jobData.state == 0 || m_jobManager.done(jobData));
354 auto& mtx = GAIA_PROF_EXTRACT_MUTEX(m_jobAllocMtx);
356 GAIA_PROF_LOCK_MARK(m_jobAllocMtx);
367 if (jobHandles.empty())
370 GAIA_PROF_SCOPE(tp::submit);
375 for (
auto handle: jobHandles) {
378 auto& jobData = m_jobManager.data(handle);
379 const auto state = m_jobManager.submit(jobData) & JobState::DEP_BITS_MASK;
385 pHandles[cnt++] = handle;
388 auto* ctx = detail::tl_workerCtx;
402 if (jobHandles.empty())
405 GAIA_PROF_SCOPE(tp::reset);
407 for (
auto handle: jobHandles) {
408 auto& jobData = m_jobManager.data(handle);
409 m_jobManager.reset_state(jobData);
413 void reset_state(JobHandle jobHandle) {
420 if (jobHandles.empty())
423 GAIA_ASSERT(main_thread());
424 GAIA_PROF_SCOPE(tp::reset_wait);
427 for (
auto handle: jobHandles) {
433 for (
auto handle: jobHandles) {
437 auto& jobData = m_jobManager.data(handle);
438 const auto state = jobData.
state.load() & JobState::STATE_BITS_MASK;
440 if (state == JobState::Released)
443 m_jobManager.reset_state(jobData);
471 dep(jobHandle, dependsOn);
484 GAIA_ASSERT(main_thread());
487 GAIA_ASSERT(itemsToProcess != 0);
488 if (itemsToProcess == 0)
492 if GAIA_UNLIKELY (m_stop)
496 const auto prio = job.priority = final_prio(job);
499 if (groupSize == 0) {
500 const auto cntWorkers = m_workersCnt[(uint32_t)prio];
501 groupSize = (itemsToProcess + cntWorkers - 1) / cntWorkers;
509 constexpr uint32_t maxUnitsOfWorkPerGroup = 8;
510 groupSize = groupSize / maxUnitsOfWorkPerGroup;
515 const auto jobs = (itemsToProcess + groupSize - 1) / groupSize;
522 const uint32_t groupJobIdxEnd = groupSize < itemsToProcess ? groupSize : itemsToProcess;
523 auto groupJobFunc = [job, groupJobIdxEnd]() {
526 args.idxEnd = groupJobIdxEnd;
530 auto handle = add(
Job{groupJobFunc, prio, JobCreationFlags::Default});
540 add_n(prio, handles);
542#if GAIA_ASSERT_ENABLED
543 for (
auto jobHandle: handles)
544 GAIA_ASSERT(m_jobManager.is_clear(jobHandle));
548 for (uint32_t jobIndex = 0; jobIndex < jobs; ++jobIndex) {
549 const uint32_t groupJobIdxStart = jobIndex * groupSize;
550 const uint32_t groupJobIdxStartPlusGroupSize = groupJobIdxStart + groupSize;
551 const uint32_t groupJobIdxEnd =
552 groupJobIdxStartPlusGroupSize < itemsToProcess ? groupJobIdxStartPlusGroupSize : itemsToProcess;
554 auto groupJobFunc = [job, groupJobIdxStart, groupJobIdxEnd]() {
556 args.idxStart = groupJobIdxStart;
557 args.idxEnd = groupJobIdxEnd;
561 auto& jobData = m_jobManager.data(pHandles[jobIndex]);
562 jobData.
func = groupJobFunc;
567 auto& jobData = m_jobManager.data(pHandles[jobs]);
572 dep(handles.subspan(0, jobs), pHandles[jobs]);
577 return pHandles[jobs];
585 GAIA_PROF_SCOPE(tp::wait);
587 GAIA_ASSERT(main_thread());
593 auto* ctx = detail::tl_workerCtx;
594 auto& jobData = m_jobManager.data(jobHandle);
595 auto state = jobData.
state.load();
598 GAIA_ASSERT(state != 0);
601 for (; (state & JobState::STATE_BITS_MASK) < JobState::Done; state = jobData.state.load()) {
604 if (try_fetch_job(*ctx, otherJobHandle)) {
605 if (run(otherJobHandle, ctx))
611 if ((state & JobState::STATE_BITS_MASK) == JobState::Executing) {
612 const auto workerId = (state & JobState::DEP_BITS_MASK);
613 auto* jobDoneEvent = &m_workersCtx[workerId].event;
614 jobDoneEvent->wait();
621 const auto workerBit = 1U << ctx->workerIdx;
622 const auto oldBlockedMask = m_blockedInWorkUntil.fetch_or(workerBit);
623 const auto newState = jobData.state.load();
624 if (newState == state)
625 Futex::wait(&m_blockedInWorkUntil, oldBlockedMask | workerBit, detail::WaitMaskAny);
626 m_blockedInWorkUntil.fetch_and(~workerBit);
632 GAIA_ASSERT(main_thread());
639 auto hwThreads = (uint32_t)std::thread::hardware_concurrency();
640 return core::get_max(1U, hwThreads);
646 uint32_t efficiencyCores = 0;
647#if GAIA_PLATFORM_APPLE
648 size_t size =
sizeof(efficiencyCores);
649 if (sysctlbyname(
"hw.perflevel1.logicalcpu", &efficiencyCores, &size,
nullptr, 0) != 0)
651#elif GAIA_PLATFORM_FREEBSD
655 size_t size =
sizeof(coreType);
657 GAIA_STRFMT(oidName,
sizeof(oidName),
"dev.cpu.%d.coretype", cpuIndex);
658 if (sysctlbyname(oidName, &coreType, &size,
nullptr, 0) != 0)
668#elif GAIA_PLATFORM_WINDOWS
672 if (!GetLogicalProcessorInformationEx(RelationProcessorCore,
nullptr, &length))
676 auto* pBuffer = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)malloc(length);
677 if (pBuffer ==
nullptr)
681 if (!GetLogicalProcessorInformationEx(RelationProcessorCore, pBuffer, &length)) {
686 uint32_t heterogenousCnt = 0;
699 for (
char* ptr = (
char*)pBuffer; ptr < (
char*)pBuffer + length;
700 ptr += ((SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr)->Size) {
701 auto* entry = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX*)ptr;
702 if (entry->Relationship == RelationProcessorCore) {
703 if (entry->Processor.EfficiencyClass == 0)
710 if (heterogenousCnt == 0)
714#elif GAIA_PLATFORM_LINUX
717 DIR* dir = opendir(
"/sys/devices/cpu_atom/cpus/");
722 while ((entry = readdir(dir)) !=
nullptr) {
723 if (strncmp(entry->d_name,
"cpu", 3) == 0 && entry->d_name[3] >=
'0' && entry->d_name[3] <=
'9')
730 if (efficiencyCores == 0) {
746 return efficiencyCores;
750 static void* thread_func(
void* pCtx) {
753 detail::tl_workerCtx = &ctx;
758 ctx.tp->set_thread_name(ctx.workerIdx, ctx.prio);
761 ctx.tp->set_thread_priority(ctx.workerIdx, ctx.prio);
764 ctx.tp->worker_loop(ctx);
766 detail::tl_workerCtx =
nullptr;
774 void create_thread(uint32_t workerIdx, JobPriority prio) {
776 GAIA_ASSERT(workerIdx > 0);
778 auto& ctx = m_workersCtx[workerIdx];
780 ctx.workerIdx = workerIdx;
782 ctx.threadCreated =
false;
784#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
785 m_workers[workerIdx - 1] = std::thread([&ctx]() {
786 thread_func((
void*)&ctx);
789 pthread_attr_t attr{};
790 int ret = pthread_attr_init(&attr);
792 GAIA_LOG_W(
"pthread_attr_init failed for worker thread %u. ErrCode = %d", workerIdx, ret);
813 if (prio == JobPriority::Low) {
814 #if GAIA_PLATFORM_APPLE
815 ret = pthread_attr_set_qos_class_np(&attr, QOS_CLASS_USER_INTERACTIVE, -9);
818 "pthread_attr_set_qos_class_np failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
819 (uint32_t)prio, ret);
822 ret = pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
825 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
826 (uint32_t)prio, ret);
829 int prioMax = core::get_min(38, sched_get_priority_max(SCHED_OTHER));
830 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_OTHER));
831 int prioUse = core::get_min(prioMin + 5, prioMax);
832 prioUse = core::get_max(prioUse, prioMin);
834 param.sched_priority = prioUse;
836 ret = pthread_attr_setschedparam(&attr, ¶m);
839 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
840 param.sched_priority, workerIdx, (uint32_t)prio, ret);
844 ret = pthread_attr_setschedpolicy(&attr, SCHED_RR);
847 "pthread_attr_setschedpolicy SCHED_RR failed for worker thread %u [prio=%u]. ErrCode = %d", workerIdx,
848 (uint32_t)prio, ret);
851 int prioMax = core::get_min(41, sched_get_priority_max(SCHED_RR));
852 int prioMin = core::get_min(prioMax, sched_get_priority_min(SCHED_RR));
853 int prioUse = core::get_max(prioMax - 5, prioMin);
854 prioUse = core::get_min(prioUse, prioMax);
856 param.sched_priority = prioUse;
858 ret = pthread_attr_setschedparam(&attr, ¶m);
861 "pthread_attr_setschedparam %d failed for worker thread %u [prio=%u]. ErrCode = %d",
862 param.sched_priority, workerIdx, (uint32_t)prio, ret);
867 ret = pthread_create(&m_workers[workerIdx - 1], &attr, thread_func, (
void*)&ctx);
869 GAIA_LOG_W(
"pthread_create failed for worker thread %u. ErrCode = %d", workerIdx, ret);
871 ctx.threadCreated =
true;
874 pthread_attr_destroy(&attr);
878 set_thread_affinity(workerIdx);
883 void join_thread(uint32_t workerIdx) {
884 if GAIA_UNLIKELY (workerIdx > m_workers.size())
887#if GAIA_THREAD_PLATFORM == GAIA_THREAD_STD
888 auto& t = m_workers[workerIdx - 1];
892 auto& ctx = m_workersCtx[workerIdx];
893 if (!ctx.threadCreated)
896 auto& t = m_workers[workerIdx - 1];
897 pthread_join(t,
nullptr);
898 ctx.threadCreated =
false;
902 void create_worker_threads(uint32_t& workerIdx, JobPriority prio, uint32_t count) {
903 for (uint32_t i = 0; i < count; ++i)
904 create_thread(workerIdx++, prio);
907 void set_thread_priority([[maybe_unused]] uint32_t workerIdx, [[maybe_unused]] JobPriority priority) {
908#if GAIA_PLATFORM_WINDOWS
909 HANDLE nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
911 THREAD_POWER_THROTTLING_STATE state{};
912 state.Version = THREAD_POWER_THROTTLING_CURRENT_VERSION;
913 if (priority == JobPriority::High) {
917 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
923 state.ControlMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
924 state.StateMask = THREAD_POWER_THROTTLING_EXECUTION_SPEED;
927 BOOL ret = SetThreadInformation(nativeHandle, ThreadPowerThrottling, &state,
sizeof(state));
929 GAIA_LOG_W(
"SetThreadInformation failed for thread %u", workerIdx);
937 void set_thread_affinity([[maybe_unused]] uint32_t workerIdx) {
977 void set_thread_name(uint32_t workerIdx, JobPriority prio) {
978#if GAIA_PROF_USE_PROFILER_THREAD_NAME
979 char threadName[16]{};
980 GAIA_STRFMT(threadName, 16,
"worker_%s_%u", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
981 GAIA_PROF_THREAD_NAME(threadName);
982#elif GAIA_PLATFORM_WINDOWS
983 auto nativeHandle = (HANDLE)m_workers[workerIdx - 1].native_handle();
985 TOSApiFunc_SetThreadDescription pSetThreadDescFunc =
nullptr;
986 if (
auto* pModule = GetModuleHandleA(
"kernel32.dll")) {
987 auto* pFunc = GetProcAddress(pModule,
"SetThreadDescription");
988 pSetThreadDescFunc =
reinterpret_cast<TOSApiFunc_SetThreadDescription
>(
reinterpret_cast<void*
>(pFunc));
990 if (pSetThreadDescFunc !=
nullptr) {
991 wchar_t threadName[16]{};
992 swprintf_s(threadName, L
"worker_%s_%u", prio == JobPriority::High ? L
"HI" : L
"LO", workerIdx);
994 auto hr = pSetThreadDescFunc(nativeHandle, threadName);
997 "Issue setting name for worker %s thread %u!", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
1000 #if defined _MSC_VER
1001 char threadName[16]{};
1002 GAIA_STRFMT(threadName, 16,
"worker_%s_%u", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
1004 THREADNAME_INFO info{};
1005 info.dwType = 0x1000;
1006 info.szName = threadName;
1007 info.dwThreadID = GetThreadId(nativeHandle);
1010 RaiseException(0x406D1388, 0,
sizeof(info) /
sizeof(ULONG_PTR), (ULONG_PTR*)&info);
1011 } __except (EXCEPTION_EXECUTE_HANDLER) {
1015#elif GAIA_PLATFORM_APPLE
1016 char threadName[16]{};
1017 GAIA_STRFMT(threadName, 16,
"worker_%s_%u", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
1018 auto ret = pthread_setname_np(threadName);
1020 GAIA_LOG_W(
"Issue setting name for worker %s thread %u!", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
1021#elif GAIA_PLATFORM_LINUX || GAIA_PLATFORM_FREEBSD
1022 auto nativeHandle = m_workers[workerIdx - 1];
1024 char threadName[16]{};
1025 GAIA_STRFMT(threadName, 16,
"worker_%s_%u", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
1026 GAIA_PROF_THREAD_NAME(threadName);
1027 auto ret = pthread_setname_np(nativeHandle, threadName);
1029 GAIA_LOG_W(
"Issue setting name for worker %s thread %u!", prio == JobPriority::High ?
"HI" :
"LO", workerIdx);
1035 GAIA_NODISCARD
bool main_thread()
const {
1036 return std::this_thread::get_id() == m_mainThreadId;
1043 void main_thread_tick() {
1044 auto& ctx = *detail::tl_workerCtx;
1048 JobHandle jobHandle;
1049 if (!try_fetch_job(ctx, jobHandle))
1052 (void)run(jobHandle, &ctx);
1056 bool try_fetch_job(ThreadCtx& ctx, JobHandle& jobHandle) {
1058 if (ctx.jobQueue.try_pop(jobHandle))
1062 if (m_jobQueue[(uint32_t)ctx.prio].try_pop(jobHandle))
1066 const auto workerCnt = m_workersCtx.size();
1067 for (uint32_t i = 0; i < workerCnt;) {
1069 if (i == ctx.workerIdx) {
1075 const auto res = m_workersCtx[i].jobQueue.try_steal(jobHandle);
1084 if (res && jobHandle != (JobHandle)JobNull_t{})
1096 void worker_loop(ThreadCtx& ctx) {
1103 JobHandle jobHandle;
1104 if (!try_fetch_job(ctx, jobHandle))
1107 (void)run(jobHandle, detail::tl_workerCtx);
1111 const bool stop = m_stop.load();
1119 if (m_workers.empty())
1126 m_sem.
release((int32_t)m_workers.size());
1128 auto* ctx = detail::tl_workerCtx;
1129 if (ctx ==
nullptr) {
1132 ctx = &m_workersCtx[0];
1133 detail::tl_workerCtx = ctx;
1137 JobHandle jobHandle;
1138 while (try_fetch_job(*ctx, jobHandle)) {
1139 run(jobHandle, ctx);
1142 detail::tl_workerCtx =
nullptr;
1145 GAIA_FOR(m_workers.size()) join_thread(i + 1);
1148 m_stop.store(false);
1152 template <typename TJob>
1153 JobPriority final_prio(const TJob& job) {
1154 const auto cntWorkers = m_workersCnt[(uint32_t)job.priority];
1155 return cntWorkers > 0
1159 : (JobPriority)(((uint32_t)job.priority + 1U) % (uint32_t)JobPriorityCnt);
1162 void signal_edges(JobContainer& jobData) {
1163 const auto max = jobData.edges.depCnt;
1169 auto* ctx = detail::tl_workerCtx;
1173 auto depHandle = jobData.edges.dep;
1174#if GAIA_LOG_JOB_STATES
1175 GAIA_LOG_N(
"SIGNAL %u.%u -> %u.%u", jobData.idx, jobData.gen, depHandle.id(), depHandle.gen());
1179 auto& depData = m_jobManager.data(depHandle);
1180 if (!JobManager::signal_edge(depData))
1189 GAIA_ASSERT(jobData.edges.pDeps !=
nullptr);
1191 auto* pHandles = (JobHandle*)alloca(
sizeof(JobHandle) * max);
1194 auto depHandle = jobData.edges.pDeps[i];
1197 auto& depData = m_jobManager.data(depHandle);
1198 if (!JobManager::signal_edge(depData))
1201 pHandles[cnt++] = depHandle;
1209 auto* pHandles = (JobHandle*)alloca(
sizeof(JobHandle) * jobHandles.size());
1210 uint32_t handlesCnt = 0;
1212 for (
auto handle: jobHandles) {
1213 auto& jobData = m_jobManager.data(handle);
1214 m_jobManager.processing(jobData);
1222 if (!jobData.func.operator
bool())
1223 (void)run(handle, ctx);
1225 pHandles[handlesCnt++] = handle;
1228 std::span handles(pHandles, handlesCnt);
1229 while (!handles.empty()) {
1231 uint32_t pushed = 0;
1232 if (ctx !=
nullptr) {
1233 pushed = ctx->jobQueue.try_push(handles);
1235 for (
auto handle: handles) {
1236 if (m_jobQueue[(uint32_t)handle.prio()].try_push(handle))
1245 const auto cntWorkers = ctx !=
nullptr ? m_workersCnt[(uint32_t)ctx->prio] : m_workers.size();
1246 const auto cnt = (int32_t)core::get_min(pushed, cntWorkers);
1249 handles = handles.subspan(pushed);
1250 if (!handles.empty()) {
1252 run(handles[0], ctx);
1253 handles = handles.subspan(1);
1258 bool run(JobHandle jobHandle, ThreadCtx* ctx) {
1259 if (jobHandle == (JobHandle)JobNull_t{})
1262 auto& jobData = m_jobManager.data(jobHandle);
1263 const bool manualDelete = (jobData.flags & JobCreationFlags::ManualDelete) != 0U;
1264 const bool canWait = (jobData.flags & JobCreationFlags::CanWait) != 0U;
1266 m_jobManager.executing(jobData, ctx->workerIdx);
1268 if (m_blockedInWorkUntil.load() != 0) {
1269 const auto blockedCnt = m_blockedInWorkUntil.exchange(0);
1270 if (blockedCnt != 0)
1271 Futex::wake(&m_blockedInWorkUntil, detail::WaitMaskAll);
1274 GAIA_ASSERT(jobData.idx != (uint32_t)-1 && jobData.data.gen != (uint32_t)-1);
1277 m_jobManager.
run(jobData);
1280 signal_edges(jobData);
1281 JobManager::free_edges(jobData);
1286 const auto* pFutexValue = &jobData.state;