22 #ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED 23 #define THREADPOOL_POOL_CORE_HPP_INCLUDED 31 #include "../task_adaptors.hpp" 33 #include <boost/thread.hpp> 34 #include <boost/thread/exceptions.hpp> 35 #include <boost/thread/mutex.hpp> 36 #include <boost/thread/condition.hpp> 37 #include <boost/smart_ptr.hpp> 38 #include <boost/bind.hpp> 39 #include <boost/static_assert.hpp> 40 #include <boost/type_traits.hpp> 46 namespace boost {
namespace threadpool {
namespace detail 70 template <
typename>
class SchedulingPolicy,
71 template <
typename>
class SizePolicy,
72 template <
typename>
class SizePolicyController,
73 template <
typename>
class ShutdownPolicy
76 :
public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
108 #if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06 109 friend class SizePolicy;
110 friend class ShutdownPolicy;
112 friend class SizePolicy<pool_type>;
113 friend class ShutdownPolicy<pool_type>;
139 , m_target_worker_count(0)
140 , m_active_worker_count(0)
141 , m_terminate_all_workers(false)
143 pool_type
volatile & self_ref = *
this;
175 ShutdownPolicy<pool_type>::shutdown(*
this);
241 void wait(
size_t const task_threshold = 0)
const volatile 243 const pool_type*
self =
const_cast<const pool_type*
>(
this);
244 recursive_mutex::scoped_lock lock(self->m_monitor);
246 if(0 == task_threshold)
248 while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
250 self->m_worker_idle_or_terminated_event.wait(lock);
255 while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
257 self->m_worker_idle_or_terminated_event.wait(lock);
269 bool wait(xtime
const & timestamp,
size_t const task_threshold = 0)
const volatile 271 const pool_type*
self =
const_cast<const pool_type*
>(
this);
272 recursive_mutex::scoped_lock lock(self->m_monitor);
274 if(0 == task_threshold)
276 while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
278 if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp))
return false;
283 while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
285 if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp))
return false;
298 pool_type*
self =
const_cast<pool_type*
>(
this);
299 recursive_mutex::scoped_lock lock(self->m_monitor);
301 self->m_terminate_all_workers =
true;
303 m_target_worker_count = 0;
304 self->m_task_or_terminate_workers_event.notify_all();
308 while(m_active_worker_count > 0)
310 self->m_worker_idle_or_terminated_event.wait(lock);
313 for(
typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.
begin();
314 it !=
self->m_terminated_workers.end();
319 self->m_terminated_workers.clear();
329 bool resize(
size_t const worker_count)
volatile 333 if(!m_terminate_all_workers)
335 m_target_worker_count = worker_count;
343 if(m_worker_count <= m_target_worker_count)
345 while(m_worker_count < m_target_worker_count)
351 m_active_worker_count++;
353 catch(thread_resource_error)
374 m_active_worker_count--;
377 if(m_terminate_all_workers)
383 lockedThis->
m_size_policy->worker_died_unexpectedly(m_worker_count);
391 m_active_worker_count--;
394 if(m_terminate_all_workers)
403 function0<void> task;
406 pool_type* lockedThis =
const_cast<pool_type*
>(
this);
407 recursive_mutex::scoped_lock lock(lockedThis->
m_monitor);
410 if(m_worker_count > m_target_worker_count)
420 if(m_worker_count > m_target_worker_count)
426 m_active_worker_count--;
429 m_active_worker_count++;
453 #endif // THREADPOOL_POOL_CORE_HPP_INCLUDED
volatile size_t m_worker_count
ShutdownPolicy< pool_type > shutdown_policy_type
Indicates the shutdown policy's type.
friend class ShutdownPolicy< pool_type >
The namespace threadpool contains a thread pool and related utility classes.
Smart pointer with a scoped locking mechanism.
condition m_task_or_terminate_workers_event
volatile size_t m_target_worker_count
Task task_type
Indicates the task's type.
friend class SizePolicy< pool_type >
size_t active() const volatile
Returns the number of tasks which are currently executed.
size_t pending() const volatile
Returns the number of tasks which are ready for execution.
bool execute_task() volatile
SizePolicy< pool_type > size_policy_type
Indicates the sizer's type.
recursive_mutex m_monitor
condition m_worker_idle_or_terminated_event
pool_core< Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > pool_type
Indicates the thread pool's type.
bool schedule(task_type const &task) volatile
Schedules a task for asynchronous execution.
size_controller_type size_controller()
Gets the size controller which manages the number of threads in the pool.
bool wait(xtime const ×tamp, size_t const task_threshold=0) const volatile
The current thread of execution is blocked until the timestamp is met or the sum of all active and pe...
TupleObj.h GaudiAlg/TupleObj.h namespace with few technical implementations.
static void create_and_attach(shared_ptr< pool_type > const &pool)
Constructs a new worker thread and attaches it to the pool.
scoped_ptr< size_policy_type > m_size_policy
scheduler_type m_scheduler
bool empty() const volatile
Indicates that there are no tasks pending.
void worker_died_unexpectedly(shared_ptr< worker_type > worker) volatile
SizePolicyController< pool_type > size_controller_type
BOOST_STATIC_ASSERT(function_traits< task_type()>::arity==0)
void terminate_all_workers(bool const wait) volatile
std::vector< shared_ptr< worker_type > > m_terminated_workers
SchedulingPolicy< task_type > scheduler_type
Indicates the scheduler's type.
void wait(size_t const task_threshold=0) const volatile
The current thread of execution is blocked until the sum of all active and pending tasks is equal or ...
worker_thread< pool_type > worker_type
void worker_destructed(shared_ptr< worker_type > worker) volatile
void clear() volatile
Removes all pending tasks from the pool's scheduler.
size_t size() const volatile
Gets the number of threads in the pool.
volatile size_t m_active_worker_count
bool m_terminate_all_workers
The locking_ptr is smart pointer with a scoped locking mechanism.
bool resize(size_t const worker_count) volatile
Changes the number of worker threads in the pool.