60ThreadPool::f_thread_ids()
69ThreadPool::f_use_tbb()
71 static bool _v = GetEnv<bool>(
"PTL_USE_TBB",
false);
78ThreadPool::f_use_cpu_affinity()
80 static bool _v = GetEnv<bool>(
"PTL_CPU_AFFINITY",
false);
87ThreadPool::f_thread_priority()
89 static int _v = GetEnv<int>(
"PTL_THREAD_PRIORITY", 0);
96ThreadPool::f_verbose()
98 static int _v = GetEnv<int>(
"PTL_VERBOSE", 0);
105ThreadPool::f_default_pool_size()
108 GetEnv<size_type>(
"PTL_NUM_THREADS", Thread::hardware_concurrency());
118 if(tp->get_verbose() > 0)
121 std::cerr <<
"[PTL::ThreadPool] Starting thread " << _idx <<
"..." << std::endl;
124 auto _thr_data = std::make_shared<ThreadData>(tp);
126 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
127 if(!lock.owns_lock())
130 _idx = f_thread_ids().size();
131 f_thread_ids()[std::this_thread::get_id()] = _idx;
133 _data->emplace_back(_thr_data);
135 thread_data() = _thr_data.get();
137 tp->execute_thread(thread_data()->current_queue);
140 if(tp->get_verbose() > 0)
143 std::cerr <<
"[PTL::ThreadPool] Thread " << _idx <<
" terminating..."
161#if defined(PTL_USE_TBB)
162 f_use_tbb() = enable;
173#if defined(PTL_USE_TBB)
174 f_use_cpu_affinity() = enable;
185 return f_thread_ids();
195 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
196 if(!lock.owns_lock())
198 auto itr = f_thread_ids().find(_tid);
199 if(itr == f_thread_ids().end())
201 _idx = f_thread_ids().size();
202 f_thread_ids()[_tid] = _idx;
225 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
226 if(!lock.owns_lock())
228 if(f_thread_ids().find(_tid) == f_thread_ids().end())
230 auto _idx = f_thread_ids().size();
231 f_thread_ids()[_tid] = _idx;
234 return f_thread_ids().at(_tid);
245, use_affinity{ _use_affinity }
249, task_queue{ _task_queue }
250, set_affinity{
std::move(_affinity_func) }
251, initializer{
std::move(_init_func) }
252, finalizer{
std::move(_fini_func) }
258: m_use_affinity{ _cfg.use_affinity }
259, m_tbb_tp{ _cfg.use_tbb }
260, m_verbose{ _cfg.verbose }
261, m_priority{ _cfg.priority }
262, m_pool_state{
std::make_shared<
std::atomic_short>(thread_pool::
state::NONINIT) }
263, m_task_queue{ _cfg.task_queue }
264, m_init_func{ _cfg.initializer }
265, m_fini_func{ _cfg.finalizer }
269 if(master_id != 0 && m_verbose > 1)
272 std::cerr <<
"[PTL::ThreadPool] ThreadPool created on worker thread" << std::endl;
285:
ThreadPool{
Config{ true, f_use_tbb(), _use_affinity, f_verbose(), f_thread_priority(),
286 _pool_size, _task_queue,
std::move(_affinity_func),
287 std::move(_init_func),
std::move(_fini_func) } }
296 std::move(_affinity_func),
297 std::move(_init_func),
298 std::move(_fini_func) }
305 if(m_alive_flag->load())
307 std::cerr <<
"Warning! ThreadPool was not properly destroyed! Call "
308 "destroy_threadpool() before deleting the ThreadPool object to "
309 "eliminate this message."
311 m_pool_state->store(thread_pool::state::STOPPED);
313 m_task_cond->notify_all();
314 m_task_lock->unlock();
315 for(
auto& itr : m_threads)
321 if(m_delete_task_queue)
324 delete m_tbb_task_arena;
325 delete m_tbb_task_group;
333 return !(m_pool_state->load() == thread_pool::state::NONINIT);
341 ++(*m_thread_active);
349 --(*m_thread_active);
360 intmax_t _pin = m_affinity_func(i);
364 std::cerr <<
"[PTL::ThreadPool] Setting pin affinity for thread "
365 <<
get_thread_id(_thread.get_id()) <<
" to " << _pin << std::endl;
368 }
catch(std::runtime_error& e)
370 std::cerr <<
"[PTL::ThreadPool] Error setting pin affinity: " << e.what()
386 std::cerr <<
"[PTL::ThreadPool] Setting thread "
387 <<
get_thread_id(_thread.get_id()) <<
" priority to " << _prio
391 }
catch(std::runtime_error& e)
394 std::cerr <<
"[PTL::ThreadPool] Error setting thread priority: " << e.what()
406 if(proposed_size < 1)
411 if(!m_alive_flag->load())
412 m_pool_state->store(thread_pool::state::STARTED);
414#if defined(PTL_USE_TBB)
420 m_pool_size = proposed_size;
423 if(m_pool_size != proposed_size)
425 delete _global_control;
426 _global_control =
nullptr;
436 std::cerr <<
"[PTL::ThreadPool] ThreadPool [TBB] initialized with "
437 << m_pool_size <<
" threads." << std::endl;
442 if(!m_tbb_task_group)
452 m_alive_flag->store(
true);
456 if(m_pool_state->load() == thread_pool::state::STARTED)
458 if(m_pool_size > proposed_size)
465 std::cerr <<
"[PTL::ThreadPool] ThreadPool initialized with "
466 << m_pool_size <<
" threads." << std::endl;
470 m_delete_task_queue =
true;
475 m_task_queue->
resize(m_pool_size);
479 else if(m_pool_size == proposed_size)
484 std::cerr <<
"ThreadPool initialized with " << m_pool_size <<
" threads."
489 m_delete_task_queue =
true;
500 m_is_joined.reserve(proposed_size);
505 m_delete_task_queue =
true;
510 for(
size_type i = m_pool_size; i < proposed_size; ++i)
521 m_main_threads.push_back(thr.get_id());
523 m_is_joined.push_back(
false);
529 m_threads.emplace_back(std::move(thr));
530 }
catch(std::runtime_error& e)
533 std::cerr <<
"[PTL::ThreadPool] " << e.what()
536 }
catch(std::bad_alloc& e)
539 std::cerr <<
"[PTL::ThreadPool] " << e.what() << std::endl;
549 if(m_is_joined.size() != m_main_threads.size())
551 std::stringstream ss;
552 ss <<
"ThreadPool::initialize_threadpool - boolean is_joined vector "
553 <<
"is a different size than threads vector: " << m_is_joined.size() <<
" vs. "
554 << m_main_threads.size() <<
" (tid: " << std::this_thread::get_id() <<
")";
556 throw std::runtime_error(ss.str());
562 std::cerr <<
"[PTL::ThreadPool] ThreadPool initialized with " << m_pool_size
563 <<
" threads." << std::endl;
566 return m_main_threads.size();
579 m_pool_state->store(thread_pool::state::STOPPED);
583#if defined(PTL_USE_TBB)
587 auto _func = [&]() { m_tbb_task_group->
wait(); };
589 m_tbb_task_arena->
execute(_func);
592 delete m_tbb_task_group;
593 m_tbb_task_group =
nullptr;
597 delete m_tbb_task_arena;
598 m_tbb_task_arena =
nullptr;
603 delete _global_control;
604 _global_control =
nullptr;
609 std::cerr <<
"[PTL::ThreadPool] ThreadPool [TBB] destroyed" << std::endl;
614 if(!m_alive_flag->load())
620 m_task_cond->notify_all();
621 m_task_lock->unlock();
624 if(m_is_joined.size() != m_main_threads.size())
626 std::stringstream ss;
627 ss <<
" ThreadPool::destroy_thread_pool - boolean is_joined vector "
628 <<
"is a different size than threads vector: " << m_is_joined.size() <<
" vs. "
629 << m_main_threads.size() <<
" (tid: " << std::this_thread::get_id() <<
")";
631 throw std::runtime_error(ss.str());
634 for(
size_type i = 0; i < m_is_joined.size(); i++)
638 if(i < m_threads.size())
639 m_threads.at(i).join();
643 if(m_is_joined.at(i))
648 if(std::this_thread::get_id() == m_main_threads[i])
653 auto _tid = m_main_threads[i];
657 if(f_thread_ids().find(_tid) != f_thread_ids().end())
658 f_thread_ids().erase(f_thread_ids().find(_tid));
662 m_is_joined.at(i) =
true;
665 m_thread_data.clear();
667 m_main_threads.clear();
670 m_alive_flag->store(
false);
672 auto start = std::chrono::steady_clock::now();
673 auto elapsed = std::chrono::duration<double>{};
675 while(m_thread_active->load() > 0 && elapsed.count() < 30)
677 std::this_thread::sleep_for(std::chrono::milliseconds(50));
678 elapsed = std::chrono::steady_clock::now() - start;
681 auto _active = m_thread_active->load();
688 std::cerr <<
"[PTL::ThreadPool] ThreadPool destroyed" << std::endl;
693 std::cerr <<
"[PTL::ThreadPool] ThreadPool destroyed but " << _active
694 <<
" threads might still be active (and cause a termination error)"
699 if(m_delete_task_queue)
702 m_task_queue =
nullptr;
713 if(!m_alive_flag->load() || m_pool_size == 0)
716 m_pool_state->store(thread_pool::state::PARTIAL);
721 m_is_stopped.push_back(
true);
722 m_task_cond->notify_one();
723 m_task_lock->unlock();
726 while(!m_is_stopped.empty() && m_stop_threads.empty())
732 while(!m_stop_threads.empty())
734 auto tid = m_stop_threads.front();
736 m_stop_threads.pop_front();
738 for(
auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
742 m_main_threads.erase(itr);
747 m_is_joined.pop_back();
750 m_pool_state->store(thread_pool::state::STARTED);
752 m_pool_size = m_main_threads.size();
753 return m_main_threads.size();
762 _queue =
new UserTaskQueue{
static_cast<intmax_t
>(m_pool_size) };
780 ThreadId tid = ThisThread::get_id();
785 auto start = std::chrono::steady_clock::now();
786 auto elapsed = std::chrono::duration<double>{};
788 while(!_task_queue && elapsed.count() < 60)
790 elapsed = std::chrono::steady_clock::now() - start;
798 throw std::runtime_error(
"No task queue was found after 60 seconds!");
808 auto _task = _task_queue->
GetTask();
819 static thread_local auto p_task_lock = m_task_lock;
823 AutoLock _task_lock(*p_task_lock, std::defer_lock);
826 auto leave_pool = [&]() {
827 auto _state = [&]() {
return static_cast<int>(m_pool_state->load()); };
828 auto _pool_state = _state();
832 if(_pool_state == thread_pool::state::STOPPED)
834 if(_task_lock.owns_lock())
839 else if(_pool_state == thread_pool::state::PARTIAL)
841 if(!_task_lock.owns_lock())
843 if(!m_is_stopped.empty() && m_is_stopped.back())
845 m_stop_threads.push_back(tid);
846 m_is_stopped.pop_back();
847 if(_task_lock.owns_lock())
852 if(_task_lock.owns_lock())
865 while(_task_queue->
empty())
867 auto _state = [&]() {
return static_cast<int>(m_pool_state->load()); };
868 auto _size = [&]() {
return _task_queue->
true_size(); };
869 auto _empty = [&]() {
return _task_queue->
empty(); };
870 auto _wake = [&]() {
return (!_empty() || _size() > 0 || _state() > 0); };
877 if(m_thread_awake->load() > 0)
881 if(!_task_lock.owns_lock())
887 m_task_cond->wait(_task_lock, _wake);
889 if(_state() == thread_pool::state::STOPPED)
893 if(_task_lock.owns_lock())
897 if(m_thread_awake->load() < m_pool_size)
905 if(_task_lock.owns_lock())
919 while(!_task_queue->
empty())
921 auto _task = _task_queue->
GetTask();
static ThreadData *& GetInstance()
VUserTaskQueue * current_queue
std::vector< std::shared_ptr< ThreadData > > thread_data_t
std::function< intmax_t(intmax_t)> affinity_func_t
ThreadPool(const Config &)
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
static uintmax_t get_this_thread_id()
const pool_state_type & state() const
void set_priority(int _prio, Thread &) const
std::map< ThreadId, uintmax_t > thread_id_map_t
task_queue_t *& get_valid_queue(task_queue_t *&) const
static uintmax_t get_thread_id(ThreadId)
void set_affinity(affinity_func_t f)
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
static tbb_global_control_t *& tbb_global_control()
void execute_on_all_threads(FuncT &&_func)
std::function< void()> finalize_func_t
static const thread_id_map_t & get_thread_ids()
static void set_default_use_cpu_affinity(bool _v)
set the default use of cpu affinity
size_type destroy_threadpool()
size_type initialize_threadpool(size_type)
void execute_thread(VUserTaskQueue *)
std::function< void()> initialize_func_t
bool is_initialized() const
static void set_use_tbb(bool _v)
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
virtual void resize(intmax_t)=0
@ max_allowed_parallelism
auto execute(FuncT &&_func) -> decltype(_func())
bool SetPinAffinity(int idx)
bool SetThreadPriority(int _v)
void SetThreadId(int aNewValue)
std::thread::native_handle_type NativeThread
void ConsumeParameters(Args &&...)
MutexTp & TypeMutex(const unsigned int &_n=0)
tbb::global_control tbb_global_control_t
Config(bool, bool, bool, int, int, size_type, VUserTaskQueue *, affinity_func_t, initialize_func_t, finalize_func_t)