ST_engine  0.3-ALPHA
task_manager.cpp
1 /* This file is part of the "ST" project.
2  * You may use, distribute or modify this code under the terms
3  * of the GNU General Public License version 2.
4  * See LICENCE.txt in the root directory of the project.
5  *
6  * Author: Maxim Atanasov
7  * E-mail: maxim.atanasov@protonmail.com
8  */
9 
10 #include <task_manager.hpp>
11 #include <fstream>
12 #include <sstream>
13 
14 static bool singleton_initialized;
15 
16 #ifdef _MSC_VER
17 
18 #include <Windows.h>
19 #include <malloc.h>
20 #include <cstdio>
21 
22 typedef BOOL (WINAPI *LPFN_GLPI)(
23  PSYSTEM_LOGICAL_PROCESSOR_INFORMATION,
24  PDWORD);
25 
26 
27 // Helper function to count set bits in the processor mask.
28 DWORD CountSetBits(ULONG_PTR bitMask) {
29  DWORD LSHIFT = sizeof(ULONG_PTR) * 8 - 1;
30  DWORD bitSetCount = 0;
31  ULONG_PTR bitTest = (ULONG_PTR) 1 << LSHIFT;
32  DWORD i;
33 
34  for (i = 0; i <= LSHIFT; ++i) {
35  bitSetCount += ((bitMask & bitTest) ? 1 : 0);
36  bitTest /= 2;
37  }
38 
39  return bitSetCount;
40 }
41 
46 int get_cpu_core_count() {
47  LPFN_GLPI glpi;
48  BOOL done = FALSE;
49  PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buffer = nullptr;
50  PSYSTEM_LOGICAL_PROCESSOR_INFORMATION ptr = nullptr;
51  DWORD returnLength = 0;
52  DWORD logicalProcessorCount = 0;
53  DWORD numaNodeCount = 0;
54  DWORD processorCoreCount = 0;
55  DWORD processorL1CacheCount = 0;
56  DWORD processorL2CacheCount = 0;
57  DWORD processorL3CacheCount = 0;
58  DWORD processorPackageCount = 0;
59  DWORD byteOffset = 0;
60  PCACHE_DESCRIPTOR Cache;
61 
62  glpi = (LPFN_GLPI) GetProcAddress(
63  GetModuleHandle(TEXT("kernel32")),
64  "GetLogicalProcessorInformation");
65  if (nullptr == glpi) {
66  return (1);
67  }
68 
69  while (!done) {
70  auto rc = static_cast<DWORD>(glpi(buffer, &returnLength));
71 
72  if (FALSE == rc) {
73  if (GetLastError() == ERROR_INSUFFICIENT_BUFFER) {
74  if (buffer)
75  free(buffer);
76 
77  buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION) malloc(
78  returnLength);
79 
80  if (nullptr == buffer) {
81  return (2);
82  }
83  } else {
84  return (3);
85  }
86  } else {
87  done = TRUE;
88  }
89  }
90 
91  ptr = buffer;
92 
93  while (byteOffset + sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION) <= returnLength) {
94  switch (ptr->Relationship) {
95  case RelationNumaNode:
96  // Non-NUMA systems report a single record of this type.
97  numaNodeCount++;
98  break;
99 
100  case RelationProcessorCore:
101  processorCoreCount++;
102 
103  // A hyperthreaded core supplies more than one logical processor.
104  logicalProcessorCount += CountSetBits(ptr->ProcessorMask);
105  break;
106 
107  case RelationCache:
108  // Cache data is in ptr->Cache, one CACHE_DESCRIPTOR structure for each cache.
109  Cache = &ptr->Cache;
110  if (Cache->Level == 1) {
111  processorL1CacheCount++;
112  } else if (Cache->Level == 2) {
113  processorL2CacheCount++;
114  } else if (Cache->Level == 3) {
115  processorL3CacheCount++;
116  }
117  break;
118 
119  case RelationProcessorPackage:
120  // Logical processors share a physical package.
121  processorPackageCount++;
122  break;
123 
124  default:
125  break;
126  }
127  byteOffset += sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION);
128  ptr++;
129  }
130  free(buffer);
131  return static_cast<int>(processorCoreCount);
132 }
133 
134 #else
135 #include <unistd.h>
136 
137 int get_cpu_core_count() {
138  int cores = 0;
139  std::ifstream file;
140  file.open("/proc/cpuinfo");
141  if(file.is_open()){
142  std::string temp;
143  while(!file.eof()){
144  getline(file, temp);
145  if(!temp.empty()) {
146  if(temp.find("cpu cores") != std::string::npos){
147  std::stringstream extract;
148  extract << temp;
149  std::string word;
150  while(!extract.fail()){
151  extract >> word;
152  if(std::isdigit(word.at(0))){
153  std::stringstream convert;
154  convert << word;
155  convert >> cores;
156  }
157  }
158  break;
159  }
160  }
161  }
162  file.close();
163  }
164  return cores;
165 }
166 #endif
167 
173 int task_manager::task_thread(task_manager *self) {
174  ST::task *work;
175  while (self->run_threads) {
176  self->work_sem->wait();
177  if (self->global_task_queue.try_dequeue(work)) { //get a function pointer and data
178  task_manager::do_work(work);
179  }
180  }
181  return 0;
182 }
183 
189 void task_manager::do_work(ST::task *work) {
190  if (work->dependency != nullptr) { //wait for dependency to finish
191  work->dependency->wait();
192  delete work->dependency;
193  }
194  work->task_func(work->data); // call it
195  if (work->lock != nullptr) {
196  work->lock->notify(); //increment the semaphore
197  }
198  delete work;
199 }
200 
206 
207  if (singleton_initialized) {
208  throw std::runtime_error("The task manager cannot be initialized more than once!");
209  } else {
210  singleton_initialized = true;
211  }
212 
213  //check how many threads we have
214  thread_num = static_cast<uint8_t>(get_cpu_core_count());
215 
216  //initialize semaphore for worker threads
217  work_sem = new semaphore;
218 
219  uint16_t task_thread_count = thread_num - 1;
220  if (task_thread_count == 0) {
221  task_thread_count = 1;
222  }
223  fprintf(stdout, "This system has %d physical cores\nStarting %d task threads\n", thread_num, task_thread_count);
224 
225  //if we can't tell or there is only one core, then start one worker thread
226  if (thread_num == 0 || thread_num == 1) {
227  thread_num = 2;
228  }
229 
230  for (uint16_t i = 0; i < thread_num - 1; i++) {
231  task_threads.emplace_back(std::thread(task_thread, this));
232  }
233 }
234 
240 task_manager::task_manager(uint8_t thread_num) {
241 
242  if (singleton_initialized) {
243  throw std::runtime_error("The task manager cannot be initialized more than once!");
244  } else {
245  singleton_initialized = true;
246  }
247 
248  //initialize semaphore for worker threads
249  work_sem = new semaphore;
250 
251  this->thread_num = thread_num;
252 
253  auto total_threads = static_cast<uint8_t>(get_cpu_core_count());
254  fprintf(stdout, "This system has %d physical cores\nStarting %d task threads\n", total_threads, thread_num);
255 
256  if (thread_num == 0 || thread_num == 1) {
257  this->thread_num = 2;
258  }
259 
260  for (uint16_t i = 0; i < this->thread_num - 1; i++) {
261  task_threads.emplace_back(std::thread(task_thread, this));
262  }
263 }
264 
270 void task_manager::start_thread(int (*thread_func)(void *), void *data) {
271  std::thread(thread_func, data);
272 }
273 
279  singleton_initialized = false;
280 
281  run_threads = false;
282  for (int i = 0; i < thread_num - 1; i++) {
283  work_sem->notify();
284  }
285  for (int i = 0; i < thread_num - 1; i++) {
286  task_threads[i].join();
287  }
288 
289  //finish running any remaining tasks
290  ST::task *new_task;
291  while (global_task_queue.try_dequeue(new_task)) { //get a function pointer and data
292  delete new_task->lock;
293  delete new_task;
294  }
295  delete work_sem;
296 }
297 
304  //Apparently a bit cheaper to create a new semaphore instead of reusing old ones
305  //This may depend on the platform (OS implementation of Semaphores)
306  arg->lock = new semaphore;
307  global_task_queue.enqueue(arg);
308  work_sem->notify();
309  return arg->lock;
310 }
311 
317  global_task_queue.enqueue(arg);
318  work_sem->notify();
319 }
320 
326  if (id != nullptr) {
327  while (!id->try_wait()) {
328  ST::task *work;
329  if (run_threads && global_task_queue.try_dequeue(work)) { //get a function pointer and data
330  do_work(work);
331  }
332  }
333  delete id;
334  }
335 }
336 
342  if (id != nullptr) {
343  id->wait();
344  delete id;
345  }
346 }
An object representing a task to be run by the task manager.
Definition: task.hpp:24
The Task Manager handles all things multi-threaded in the engine.
void start_task_lockfree(ST::task *arg)
void wait_for_task(task_id id)
void work_wait_for_task(task_id id)
task_id start_task(ST::task *arg)