10 #include <task_manager.hpp>
14 static bool singleton_initialized;
22 typedef BOOL (WINAPI *LPFN_GLPI)(
23 PSYSTEM_LOGICAL_PROCESSOR_INFORMATION,
28 DWORD CountSetBits(ULONG_PTR bitMask) {
29 DWORD LSHIFT =
sizeof(ULONG_PTR) * 8 - 1;
30 DWORD bitSetCount = 0;
31 ULONG_PTR bitTest = (ULONG_PTR) 1 << LSHIFT;
34 for (i = 0; i <= LSHIFT; ++i) {
35 bitSetCount += ((bitMask & bitTest) ? 1 : 0);
46 int get_cpu_core_count() {
49 PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buffer =
nullptr;
50 PSYSTEM_LOGICAL_PROCESSOR_INFORMATION ptr =
nullptr;
51 DWORD returnLength = 0;
52 DWORD logicalProcessorCount = 0;
53 DWORD numaNodeCount = 0;
54 DWORD processorCoreCount = 0;
55 DWORD processorL1CacheCount = 0;
56 DWORD processorL2CacheCount = 0;
57 DWORD processorL3CacheCount = 0;
58 DWORD processorPackageCount = 0;
60 PCACHE_DESCRIPTOR Cache;
62 glpi = (LPFN_GLPI) GetProcAddress(
63 GetModuleHandle(TEXT(
"kernel32")),
64 "GetLogicalProcessorInformation");
65 if (
nullptr == glpi) {
70 auto rc =
static_cast<DWORD
>(glpi(buffer, &returnLength));
73 if (GetLastError() == ERROR_INSUFFICIENT_BUFFER) {
77 buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION) malloc(
80 if (
nullptr == buffer) {
93 while (byteOffset +
sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION) <= returnLength) {
94 switch (ptr->Relationship) {
95 case RelationNumaNode:
100 case RelationProcessorCore:
101 processorCoreCount++;
104 logicalProcessorCount += CountSetBits(ptr->ProcessorMask);
110 if (Cache->Level == 1) {
111 processorL1CacheCount++;
112 }
else if (Cache->Level == 2) {
113 processorL2CacheCount++;
114 }
else if (Cache->Level == 3) {
115 processorL3CacheCount++;
119 case RelationProcessorPackage:
121 processorPackageCount++;
127 byteOffset +=
sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION);
131 return static_cast<int>(processorCoreCount);
137 int get_cpu_core_count() {
140 file.open(
"/proc/cpuinfo");
146 if(temp.find(
"cpu cores") != std::string::npos){
147 std::stringstream extract;
150 while(!extract.fail()){
152 if(std::isdigit(word.at(0))){
153 std::stringstream convert;
175 while (self->run_threads) {
176 self->work_sem->wait();
177 if (self->global_task_queue.try_dequeue(work)) {
178 task_manager::do_work(work);
189 void task_manager::do_work(
ST::task *work) {
190 if (work->dependency !=
nullptr) {
191 work->dependency->wait();
192 delete work->dependency;
194 work->task_func(work->data);
195 if (work->lock !=
nullptr) {
196 work->lock->notify();
207 if (singleton_initialized) {
208 throw std::runtime_error(
"The task manager cannot be initialized more than once!");
210 singleton_initialized =
true;
214 thread_num =
static_cast<uint8_t
>(get_cpu_core_count());
219 uint16_t task_thread_count = thread_num - 1;
220 if (task_thread_count == 0) {
221 task_thread_count = 1;
223 fprintf(stdout,
"This system has %d physical cores\nStarting %d task threads\n", thread_num, task_thread_count);
226 if (thread_num == 0 || thread_num == 1) {
230 for (uint16_t i = 0; i < thread_num - 1; i++) {
231 task_threads.emplace_back(std::thread(task_thread,
this));
242 if (singleton_initialized) {
243 throw std::runtime_error(
"The task manager cannot be initialized more than once!");
245 singleton_initialized =
true;
251 this->thread_num = thread_num;
253 auto total_threads =
static_cast<uint8_t
>(get_cpu_core_count());
254 fprintf(stdout,
"This system has %d physical cores\nStarting %d task threads\n", total_threads, thread_num);
256 if (thread_num == 0 || thread_num == 1) {
257 this->thread_num = 2;
260 for (uint16_t i = 0; i < this->thread_num - 1; i++) {
261 task_threads.emplace_back(std::thread(task_thread,
this));
270 void task_manager::start_thread(
int (*thread_func)(
void *),
void *data) {
271 std::thread(thread_func, data);
279 singleton_initialized =
false;
282 for (
int i = 0; i < thread_num - 1; i++) {
285 for (
int i = 0; i < thread_num - 1; i++) {
286 task_threads[i].join();
291 while (global_task_queue.try_dequeue(new_task)) {
292 delete new_task->lock;
307 global_task_queue.enqueue(arg);
317 global_task_queue.enqueue(arg);
327 while (!id->try_wait()) {
329 if (run_threads && global_task_queue.try_dequeue(work)) {
An object representing a task to be run by the task manager.
The Task Manager handles all things multi-threaded in the engine.
void start_task_lockfree(ST::task *arg)
void wait_for_task(task_id id)
void work_wait_for_task(task_id id)
task_id start_task(ST::task *arg)