40#include <system_error>
50, m_is_clone((parent) != nullptr)
51, m_thread_bin((parent) ? (
ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
52, m_insert_bin((parent) ? (
ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
53, m_hold((parent) ? parent->m_hold : new
std::atomic_bool(false))
54, m_ntasks((parent) ? parent->m_ntasks : new
std::atomic_uintmax_t(0))
55, m_mutex((parent) ? parent->m_mutex : new
Mutex{})
61 for(intmax_t i = 0; i < nworkers + 1; ++i)
66 if(GetEnv<int>(
"PTL_VERBOSE", 0) > 3)
71 << __FUNCTION__ <<
":" << __LINE__ <<
"] "
72 <<
"this = " <<
this <<
", "
73 <<
"clone = " << std::boolalpha << m_is_clone <<
", "
74 <<
"thread = " << m_thread_bin <<
", "
75 <<
"insert = " << m_insert_bin <<
", "
76 <<
"hold = " << m_hold->load() <<
" @ " << m_hold <<
", "
77 <<
"tasks = " << m_ntasks->load() <<
" @ " << m_ntasks <<
", "
78 <<
"subqueue = " << m_subqueues <<
", "
81 std::cout << ss.str() << std::endl;
92 for(
auto& itr : *m_subqueues)
111 throw std::runtime_error(
"nullptr to mutex");
125 delete m_subqueues->back();
126 m_subqueues->pop_back();
145 static thread_local intmax_t tl_bin =
155 return (++m_insert_bin % (
m_workers + 1));
168 auto get_task = [&]() {
172 _task = task_subq->
PopTask(
true);
179 return (_task !=
nullptr);
184 while(!task_subq->
empty())
203 intmax_t n = (subq < 0) ? tbin : subq;
207 if(m_hold->load(std::memory_order_relaxed))
214 auto get_task = [&](intmax_t _n) {
221 _task = task_subq->
PopTask(n == tbin);
228 return (_task !=
nullptr);
236 for(intmax_t i = 0; i < nitr; ++i, ++n)
258 bool spin = m_hold->load(std::memory_order_relaxed);
274 auto insert_task = [&](intmax_t _n) {
285 task_subq->
PushTask(std::move(task));
301 while(!insert_task(n))
323 using thread_execute_map_t = std::map<int64_t, bool>;
331 task_group_type tg{ [](
int& ref,
int i) {
return (ref += i); }, tp };
335 while(tp->get_active_threads_count() > 0)
336 ThisThread::sleep_for(std::chrono::milliseconds(10));
338 thread_execute_map_t thread_execute_map{};
339 std::vector<std::shared_ptr<VTask>> _tasks{};
349 auto thread_specific_func = [&]() {
369 int nexecuted = tg.join();
372 std::stringstream msg;
373 msg <<
"Failure executing routine on all threads! Only " << nexecuted
374 <<
" threads executed function out of " <<
m_workers <<
" workers";
375 std::cerr << msg.str() << std::endl;
387 using thread_execute_map_t = std::map<int64_t, bool>;
389 task_group_type tg{ [](
int& ref,
int i) {
return (ref += i); }, tp };
393 while(tp->get_active_threads_count() > 0)
394 ThisThread::sleep_for(std::chrono::milliseconds(10));
402 thread_execute_map_t thread_execute_map{};
407 auto thread_specific_func = [&]() {
413 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
423 if(tid_set.count(ThisThread::get_id()) > 0)
435 decltype(tid_set.size()) nexecuted = tg.join();
436 if(nexecuted != tid_set.size())
438 std::stringstream msg;
439 msg <<
"Failure executing routine on specific threads! Only " << nexecuted
440 <<
" threads executed function out of " << tid_set.size() <<
" workers";
441 std::cerr << msg.str() << std::endl;
449UserTaskQueue::AcquireHold()
452 while(!(_hold = m_hold->load(std::memory_order_relaxed)))
454 m_hold->compare_exchange_strong(_hold,
true, std::memory_order_release,
455 std::memory_order_relaxed);
462UserTaskQueue::ReleaseHold()
465 while((_hold = m_hold->load(std::memory_order_relaxed)))
467 m_hold->compare_exchange_strong(_hold,
false, std::memory_order_release,
468 std::memory_order_relaxed);
void PushTask(task_pointer &&) PTL_NO_SANITIZE_THREAD
task_pointer PopTask(bool front=true) PTL_NO_SANITIZE_THREAD
static ThreadData *& GetInstance()
static uintmax_t get_this_thread_id()
VUserTaskQueue * clone() override
size_type true_size() const override
void resize(intmax_t) override
task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
intmax_t GetThreadBin() const override
void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
intmax_t GetInsertBin() const
std::vector< TaskSubQueue * > TaskSubQueueContainer
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
bool true_empty() const override
std::shared_ptr< VTask > task_pointer
~UserTaskQueue() override
task_pointer GetThreadBinTask()
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
MutexTp & TypeMutex(const unsigned int &_n=0)
std::recursive_mutex RecursiveMutex