Geant4 11.1.1
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
UserTaskQueue.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21// Class Description:
22// ---------------------------------------------------------------
23// Author: Jonathan Madsen
24// ---------------------------------------------------------------
25
26#include "PTL/UserTaskQueue.hh"
27
28#include "PTL/AutoLock.hh"
29#include "PTL/TaskGroup.hh"
30#include "PTL/ThreadData.hh"
31#include "PTL/ThreadPool.hh"
32#include "PTL/Utility.hh"
33
34#include <cassert>
35#include <chrono>
36#include <functional>
37#include <iostream>
38#include <map>
39#include <stdexcept>
40#include <system_error>
41#include <thread>
42#include <utility>
43
44using namespace PTL;
45
46//======================================================================================//
47
49: VUserTaskQueue(nworkers)
50, m_is_clone((parent) != nullptr)
51, m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
52, m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
53, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
54, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
55, m_mutex((parent) ? parent->m_mutex : new Mutex{})
56, m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
57{
58 // create nthreads + 1 subqueues so there is always a subqueue available
59 if(!parent)
60 {
61 for(intmax_t i = 0; i < nworkers + 1; ++i)
62 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
63 }
64
65#if defined(DEBUG)
66 if(GetEnv<int>("PTL_VERBOSE", 0) > 3)
67 {
68 RecursiveAutoLock l(TypeMutex<decltype(std::cout), RecursiveMutex>());
69 std::stringstream ss;
70 ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " ["
71 << __FUNCTION__ << ":" << __LINE__ << "] "
72 << "this = " << this << ", "
73 << "clone = " << std::boolalpha << m_is_clone << ", "
74 << "thread = " << m_thread_bin << ", "
75 << "insert = " << m_insert_bin << ", "
76 << "hold = " << m_hold->load() << " @ " << m_hold << ", "
77 << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", "
78 << "subqueue = " << m_subqueues << ", "
79 << "size = " << true_size() << ", "
80 << "empty = " << true_empty();
81 std::cout << ss.str() << std::endl;
82 }
83#endif
84}
85
86//======================================================================================//
87
89{
90 if(!m_is_clone)
91 {
92 for(auto& itr : *m_subqueues)
93 {
94 assert(itr->empty());
95 delete itr;
96 }
97 m_subqueues->clear();
98 delete m_hold;
99 delete m_ntasks;
100 delete m_mutex;
101 delete m_subqueues;
102 }
103}
104
105//======================================================================================//
106
107void
109{
110 if(!m_mutex)
111 throw std::runtime_error("nullptr to mutex");
112 AutoLock lk(m_mutex);
113 if(m_workers < n)
114 {
115 while(m_workers < n)
116 {
117 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
118 ++m_workers;
119 }
120 }
121 else if(m_workers > n)
122 {
123 while(m_workers > n)
124 {
125 delete m_subqueues->back();
126 m_subqueues->pop_back();
127 --m_workers;
128 }
129 }
130}
131
132//======================================================================================//
133
136{
137 return new UserTaskQueue(workers(), this);
138}
139//======================================================================================//
140
141intmax_t
143{
144 // get a thread id number
145 static thread_local intmax_t tl_bin =
146 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
147 return tl_bin;
148}
149
150//======================================================================================//
151
152intmax_t
154{
155 return (++m_insert_bin % (m_workers + 1));
156}
157
158//======================================================================================//
159
162{
163 intmax_t tbin = GetThreadBin();
164 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
165 task_pointer _task = nullptr;
166
167 //------------------------------------------------------------------------//
168 auto get_task = [&]() {
169 if(task_subq->AcquireClaim())
170 {
171 // run task
172 _task = task_subq->PopTask(true);
173 // release the claim on the bin
174 task_subq->ReleaseClaim();
175 }
176 if(_task)
177 --(*m_ntasks);
178 // return success if valid pointer
179 return (_task != nullptr);
180 };
181 //------------------------------------------------------------------------//
182
183 // while not empty
184 while(!task_subq->empty())
185 {
186 if(get_task())
187 break;
188 }
189 return _task;
190}
191
192//======================================================================================//
193
195UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr)
196{
197 // exit if empty
198 if(this->true_empty())
199 return nullptr;
200
201 // ensure the thread has a bin assignment
202 intmax_t tbin = GetThreadBin();
203 intmax_t n = (subq < 0) ? tbin : subq;
204 if(nitr < 1)
205 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed);
206
207 if(m_hold->load(std::memory_order_relaxed))
208 {
209 return GetThreadBinTask();
210 }
211
212 task_pointer _task = nullptr;
213 //------------------------------------------------------------------------//
214 auto get_task = [&](intmax_t _n) {
215 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
216 // try to acquire a claim for the bin
217 // if acquired, no other threads will access bin until claim is released
218 if(!task_subq->empty() && task_subq->AcquireClaim())
219 {
220 // pop task out of bin
221 _task = task_subq->PopTask(n == tbin);
222 // release the claim on the bin
223 task_subq->ReleaseClaim();
224 }
225 if(_task)
226 --(*m_ntasks);
227 // return success if valid pointer
228 return (_task != nullptr);
229 };
230 //------------------------------------------------------------------------//
231
232 // there are num_workers+1 bins so there is always a bin that is open
233 // execute num_workers+2 iterations so the thread checks its bin twice
234 // while(!empty())
235 {
236 for(intmax_t i = 0; i < nitr; ++i, ++n)
237 {
238 if(get_task(n % (m_workers + 1)))
239 return _task;
240 }
241 }
242
243 // only reached if looped over all bins (and looked in own bin twice)
244 // and found no work so return an empty task and the thread will be put to
245 // sleep if there is still no work by the time it reaches its
246 // condition variable
247 return _task;
248}
249
250//======================================================================================//
251
252intmax_t
254{
255 // increment number of tasks
256 ++(*m_ntasks);
257
258 bool spin = m_hold->load(std::memory_order_relaxed);
259 intmax_t tbin = GetThreadBin();
260
261 if(data && data->within_task)
262 {
263 subq = tbin;
264 // spin = true;
265 }
266
267 // subq is -1 unless specified so unless specified
268 // GetInsertBin() call increments a counter and returns
269 // counter % (num_workers + 1) so that tasks are distributed evenly
270 // among the bins
271 intmax_t n = (subq < 0) ? GetInsertBin() : subq;
272
273 //------------------------------------------------------------------------//
274 auto insert_task = [&](intmax_t _n) {
275 TaskSubQueue* task_subq = (*m_subqueues)[_n];
276 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
277 // if not threads bin and size difference, insert into smaller
278 // if(n != tbin && next_subq->size() < task_subq->size())
279 // task_subq = next_subq;
280 // try to acquire a claim for the bin
281 // if acquired, no other threads will access bin until claim is released
282 if(task_subq->AcquireClaim())
283 {
284 // push the task into the bin
285 task_subq->PushTask(std::move(task));
286 // release the claim on the bin
287 task_subq->ReleaseClaim();
288 // return success
289 return true;
290 }
291 return false;
292 };
293 //------------------------------------------------------------------------//
294
295 // if not in "hold/spin mode", where thread only inserts tasks into
296 // specified bin, then move onto next bin
297 //
298 if(spin)
299 {
300 n = n % (m_workers + 1);
301 while(!insert_task(n))
302 ;
303 return n;
304 }
305
306 // there are num_workers+1 bins so there is always a bin that is open
307 // execute num_workers+2 iterations so the thread checks its bin twice
308 while(true)
309 {
310 auto _n = (n++) % (m_workers + 1);
311 if(insert_task(_n))
312 return _n;
313 }
314 return GetThreadBin();
315}
316
317//======================================================================================//
318
319void
321{
322 using task_group_type = TaskGroup<int, int>;
323 using thread_execute_map_t = std::map<int64_t, bool>;
324
325 if(!tp->is_alive())
326 {
327 func();
328 return;
329 }
330
331 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
332
333 // wait for all threads to finish any work
334 // NOTE: will cause deadlock if called from a task
335 while(tp->get_active_threads_count() > 0)
336 ThisThread::sleep_for(std::chrono::milliseconds(10));
337
338 thread_execute_map_t thread_execute_map{};
339 std::vector<std::shared_ptr<VTask>> _tasks{};
340 _tasks.reserve(m_workers + 1);
341
342 AcquireHold();
343 for(int i = 0; i < (m_workers + 1); ++i)
344 {
345 if(i == GetThreadBin())
346 continue;
347
348 //--------------------------------------------------------------------//
349 auto thread_specific_func = [&]() {
350 ScopeDestructor _dtor = tg.get_scope_destructor();
351 static Mutex _mtx;
352 _mtx.lock();
353 bool& _executed = thread_execute_map[GetThreadBin()];
354 _mtx.unlock();
355 if(!_executed)
356 {
357 func();
358 _executed = true;
359 return 1;
360 }
361 return 0;
362 };
363 //--------------------------------------------------------------------//
364
365 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
366 }
367
368 tp->notify_all();
369 int nexecuted = tg.join();
370 if(nexecuted != m_workers)
371 {
372 std::stringstream msg;
373 msg << "Failure executing routine on all threads! Only " << nexecuted
374 << " threads executed function out of " << m_workers << " workers";
375 std::cerr << msg.str() << std::endl;
376 }
377 ReleaseHold();
378}
379
380//======================================================================================//
381
382void
384 function_type func)
385{
386 using task_group_type = TaskGroup<int, int>;
387 using thread_execute_map_t = std::map<int64_t, bool>;
388
389 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
390
391 // wait for all threads to finish any work
392 // NOTE: will cause deadlock if called from a task
393 while(tp->get_active_threads_count() > 0)
394 ThisThread::sleep_for(std::chrono::milliseconds(10));
395
396 if(!tp->is_alive())
397 {
398 func();
399 return;
400 }
401
402 thread_execute_map_t thread_execute_map{};
403
404 //========================================================================//
405 // wrap the function so that it will only be executed if the thread
406 // has an ID in the set
407 auto thread_specific_func = [&]() {
408 ScopeDestructor _dtor = tg.get_scope_destructor();
409 static Mutex _mtx;
410 _mtx.lock();
411 bool& _executed = thread_execute_map[GetThreadBin()];
412 _mtx.unlock();
413 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
414 {
415 func();
416 _executed = true;
417 return 1;
418 }
419 return 0;
420 };
421 //========================================================================//
422
423 if(tid_set.count(ThisThread::get_id()) > 0)
424 func();
425
426 AcquireHold();
427 for(int i = 0; i < (m_workers + 1); ++i)
428 {
429 if(i == GetThreadBin())
430 continue;
431
432 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
433 }
434 tp->notify_all();
435 decltype(tid_set.size()) nexecuted = tg.join();
436 if(nexecuted != tid_set.size())
437 {
438 std::stringstream msg;
439 msg << "Failure executing routine on specific threads! Only " << nexecuted
440 << " threads executed function out of " << tid_set.size() << " workers";
441 std::cerr << msg.str() << std::endl;
442 }
443 ReleaseHold();
444}
445
446//======================================================================================//
447
448void
449UserTaskQueue::AcquireHold()
450{
451 bool _hold;
452 while(!(_hold = m_hold->load(std::memory_order_relaxed)))
453 {
454 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release,
455 std::memory_order_relaxed);
456 }
457}
458
459//======================================================================================//
460
461void
462UserTaskQueue::ReleaseHold()
463{
464 bool _hold;
465 while((_hold = m_hold->load(std::memory_order_relaxed)))
466 {
467 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release,
468 std::memory_order_relaxed);
469 }
470}
471
472//======================================================================================//
void PushTask(task_pointer &&) PTL_NO_SANITIZE_THREAD
task_pointer PopTask(bool front=true) PTL_NO_SANITIZE_THREAD
bool empty() const
static ThreadData *& GetInstance()
Definition: ThreadData.cc:32
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:215
VUserTaskQueue * clone() override
size_type true_size() const override
void resize(intmax_t) override
task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
intmax_t GetThreadBin() const override
void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
intmax_t GetInsertBin() const
std::vector< TaskSubQueue * > TaskSubQueueContainer
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
bool true_empty() const override
std::shared_ptr< VTask > task_pointer
~UserTaskQueue() override
task_pointer GetThreadBinTask()
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
intmax_t workers() const
Definition: AutoLock.hh:255
std::mutex Mutex
Definition: Threading.hh:57
MutexTp & TypeMutex(const unsigned int &_n=0)
Definition: Threading.hh:74
std::recursive_mutex RecursiveMutex
Definition: Threading.hh:58