The Gaudi Framework  v29r0 (ff2e7097)
pool_core.hpp
Go to the documentation of this file.
1 
22 #ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
23 #define THREADPOOL_POOL_CORE_HPP_INCLUDED
24 
25 
26 
27 
28 #include "locking_ptr.hpp"
29 #include "worker_thread.hpp"
30 
31 #include "../task_adaptors.hpp"
32 
33 #include <boost/thread.hpp>
34 #include <boost/thread/exceptions.hpp>
35 #include <boost/thread/mutex.hpp>
36 #include <boost/thread/condition.hpp>
37 #include <boost/smart_ptr.hpp>
38 #include <boost/bind.hpp>
39 #include <boost/static_assert.hpp>
40 #include <boost/type_traits.hpp>
41 
42 #include <vector>
43 
44 
46 namespace boost { namespace threadpool { namespace detail
47 {
48 
67  template <
68  typename Task,
69 
70  template <typename> class SchedulingPolicy,
71  template <typename> class SizePolicy,
72  template <typename> class SizePolicyController,
73  template <typename> class ShutdownPolicy
74  >
75  class pool_core
76  : public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
77  , private noncopyable
78  {
79 
80  public: // Type definitions
81  typedef Task task_type;
82  typedef SchedulingPolicy<task_type> scheduler_type;
83  typedef pool_core<Task,
84  SchedulingPolicy,
85  SizePolicy,
86  SizePolicyController,
87  ShutdownPolicy > pool_type;
89  //typedef typename size_policy_type::size_controller size_controller_type;
90 
91  typedef SizePolicyController<pool_type> size_controller_type;
92 
93 // typedef SizePolicy<pool_type>::size_controller size_controller_type;
95 
97 
98  // The task is required to be a nullary function.
99  BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
100 
101  // The task function's result type is required to be void.
102  BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
103 
104 
105  private: // Friends
106  friend class worker_thread<pool_type>;
107 
108 #if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
109  friend class SizePolicy;
110  friend class ShutdownPolicy;
111 #else
112  friend class SizePolicy<pool_type>;
113  friend class ShutdownPolicy<pool_type>;
114 #endif
115 
116  private: // The following members may be accessed by _multiple_ threads at the same time:
117  volatile size_t m_worker_count;
118  volatile size_t m_target_worker_count;
119  volatile size_t m_active_worker_count;
120 
121 
122 
123  private: // The following members are accessed only by _one_ thread at the same time:
124  scheduler_type m_scheduler;
125  scoped_ptr<size_policy_type> m_size_policy; // is never null
126 
127  bool m_terminate_all_workers; // Indicates if termination of all workers was triggered.
128  std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
129 
130  private: // The following members are implemented thread-safe:
131  mutable recursive_mutex m_monitor;
132  mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated.
133  mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced.
134 
135  public:
138  : m_worker_count(0)
139  , m_target_worker_count(0)
140  , m_active_worker_count(0)
141  , m_terminate_all_workers(false)
142  {
143  pool_type volatile & self_ref = *this;
144  m_size_policy.reset(new size_policy_type(self_ref));
145 
146  m_scheduler.clear();
147  }
148 
149 
152  {
153  }
154 
159  size_controller_type size_controller()
160  {
161  return size_controller_type(*m_size_policy, this->shared_from_this());
162  }
163 
167  size_t size() const volatile
168  {
169  return m_worker_count;
170  }
171 
172 // TODO is only called once
173  void shutdown()
174  {
175  ShutdownPolicy<pool_type>::shutdown(*this);
176  }
177 
182  bool schedule(task_type const & task) volatile
183  {
184  locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
185 
186  if(lockedThis->m_scheduler.push(task))
187  {
188  lockedThis->m_task_or_terminate_workers_event.notify_one();
189  return true;
190  }
191  else
192  {
193  return false;
194  }
195  }
196 
197 
201  size_t active() const volatile
202  {
203  return m_active_worker_count;
204  }
205 
206 
210  size_t pending() const volatile
211  {
212  locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
213  return lockedThis->m_scheduler.size();
214  }
215 
216 
219  void clear() volatile
220  {
221  locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
222  lockedThis->m_scheduler.clear();
223  }
224 
225 
230  bool empty() const volatile
231  {
232  locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
233  return lockedThis->m_scheduler.empty();
234  }
235 
236 
241  void wait(size_t const task_threshold = 0) const volatile
242  {
243  const pool_type* self = const_cast<const pool_type*>(this);
244  recursive_mutex::scoped_lock lock(self->m_monitor);
245 
246  if(0 == task_threshold)
247  {
248  while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
249  {
250  self->m_worker_idle_or_terminated_event.wait(lock);
251  }
252  }
253  else
254  {
255  while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
256  {
257  self->m_worker_idle_or_terminated_event.wait(lock);
258  }
259  }
260  }
261 
269  bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
270  {
271  const pool_type* self = const_cast<const pool_type*>(this);
272  recursive_mutex::scoped_lock lock(self->m_monitor);
273 
274  if(0 == task_threshold)
275  {
276  while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
277  {
278  if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
279  }
280  }
281  else
282  {
283  while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
284  {
285  if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
286  }
287  }
288 
289  return true;
290  }
291 
292 
293  private:
294 
295 
296  void terminate_all_workers(bool const wait) volatile
297  {
298  pool_type* self = const_cast<pool_type*>(this);
299  recursive_mutex::scoped_lock lock(self->m_monitor);
300 
301  self->m_terminate_all_workers = true;
302 
303  m_target_worker_count = 0;
304  self->m_task_or_terminate_workers_event.notify_all();
305 
306  if(wait)
307  {
308  while(m_active_worker_count > 0)
309  {
310  self->m_worker_idle_or_terminated_event.wait(lock);
311  }
312 
313  for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
314  it != self->m_terminated_workers.end();
315  ++it)
316  {
317  (*it)->join();
318  }
319  self->m_terminated_workers.clear();
320  }
321  }
322 
323 
329  bool resize(size_t const worker_count) volatile
330  {
331  locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
332 
333  if(!m_terminate_all_workers)
334  {
335  m_target_worker_count = worker_count;
336  }
337  else
338  {
339  return false;
340  }
341 
342 
343  if(m_worker_count <= m_target_worker_count)
344  { // increase worker count
345  while(m_worker_count < m_target_worker_count)
346  {
347  try
348  {
349  worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
350  m_worker_count++;
351  m_active_worker_count++;
352  }
353  catch(thread_resource_error)
354  {
355  return false;
356  }
357  }
358  }
359  else
360  { // decrease worker count
361  lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers
362  }
363 
364  return true;
365  }
366 
367 
368  // worker died with unhandled exception
369  void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
370  {
371  locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
372 
373  m_worker_count--;
374  m_active_worker_count--;
375  lockedThis->m_worker_idle_or_terminated_event.notify_all();
376 
377  if(m_terminate_all_workers)
378  {
379  lockedThis->m_terminated_workers.push_back(worker);
380  }
381  else
382  {
383  lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
384  }
385  }
386 
387  void worker_destructed(shared_ptr<worker_type> worker) volatile
388  {
389  locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
390  m_worker_count--;
391  m_active_worker_count--;
392  lockedThis->m_worker_idle_or_terminated_event.notify_all();
393 
394  if(m_terminate_all_workers)
395  {
396  lockedThis->m_terminated_workers.push_back(worker);
397  }
398  }
399 
400 
401  bool execute_task() volatile
402  {
403  function0<void> task;
404 
405  { // fetch task
406  pool_type* lockedThis = const_cast<pool_type*>(this);
407  recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
408 
409  // decrease number of threads if necessary
410  if(m_worker_count > m_target_worker_count)
411  {
412  return false; // terminate worker
413  }
414 
415 
416  // wait for tasks
417  while(lockedThis->m_scheduler.empty())
418  {
419  // decrease number of workers if necessary
420  if(m_worker_count > m_target_worker_count)
421  {
422  return false; // terminate worker
423  }
424  else
425  {
426  m_active_worker_count--;
427  lockedThis->m_worker_idle_or_terminated_event.notify_all();
428  lockedThis->m_task_or_terminate_workers_event.wait(lock);
429  m_active_worker_count++;
430  }
431  }
432 
433  task = lockedThis->m_scheduler.top();
434  lockedThis->m_scheduler.pop();
435  }
436 
437  // call task function
438  if(task)
439  {
440  task();
441  }
442 
443  //guard->disable();
444  return true;
445  }
446  };
447 
448 
449 
450 
451 } } } // namespace boost::threadpool::detail
452 
453 #endif // THREADPOOL_POOL_CORE_HPP_INCLUDED
ShutdownPolicy< pool_type > shutdown_policy_type
Indicates the shutdown policy&#39;s type.
Definition: pool_core.hpp:94
The namespace threadpool contains a thread pool and related utility classes.
Definition: iter_pos.hpp:13
Smart pointer with a scoped locking mechanism.
Definition: locking_ptr.hpp:37
Task task_type
Indicates the task&#39;s type.
Definition: pool_core.hpp:81
size_t active() const volatile
Returns the number of tasks which are currently executed.
Definition: pool_core.hpp:201
Thread pool worker.
size_t pending() const volatile
Returns the number of tasks which are ready for execution.
Definition: pool_core.hpp:210
SizePolicy< pool_type > size_policy_type
Indicates the sizer&#39;s type.
Definition: pool_core.hpp:88
pool_core< Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > pool_type
Indicates the thread pool&#39;s type.
Definition: pool_core.hpp:87
bool schedule(task_type const &task) volatile
Schedules a task for asynchronous execution.
Definition: pool_core.hpp:182
size_controller_type size_controller()
Gets the size controller which manages the number of threads in the pool.
Definition: pool_core.hpp:159
bool wait(xtime const &timestamp, size_t const task_threshold=0) const volatile
The current thread of execution is blocked until the timestamp is met or the sum of all active and pe...
Definition: pool_core.hpp:269
TupleObj.h GaudiAlg/TupleObj.h namespace with few technical implementations.
static void create_and_attach(shared_ptr< pool_type > const &pool)
Constructs a new worker thread and attaches it to the pool.
scoped_ptr< size_policy_type > m_size_policy
Definition: pool_core.hpp:125
bool empty() const volatile
Indicates that there are no tasks pending.
Definition: pool_core.hpp:230
void worker_died_unexpectedly(shared_ptr< worker_type > worker) volatile
Definition: pool_core.hpp:369
SizePolicyController< pool_type > size_controller_type
Definition: pool_core.hpp:91
BOOST_STATIC_ASSERT(function_traits< task_type()>::arity==0)
void terminate_all_workers(bool const wait) volatile
Definition: pool_core.hpp:296
std::vector< shared_ptr< worker_type > > m_terminated_workers
Definition: pool_core.hpp:128
SchedulingPolicy< task_type > scheduler_type
Indicates the scheduler&#39;s type.
Definition: pool_core.hpp:82
STL class.
T begin(T...args)
void wait(size_t const task_threshold=0) const volatile
The current thread of execution is blocked until the sum of all active and pending tasks is equal or ...
Definition: pool_core.hpp:241
worker_thread< pool_type > worker_type
Definition: pool_core.hpp:96
void worker_destructed(shared_ptr< worker_type > worker) volatile
Definition: pool_core.hpp:387
void clear() volatile
Removes all pending tasks from the pool&#39;s scheduler.
Definition: pool_core.hpp:219
size_t size() const volatile
Gets the number of threads in the pool.
Definition: pool_core.hpp:167
The locking_ptr is smart pointer with a scoped locking mechanism.
bool resize(size_t const worker_count) volatile
Changes the number of worker threads in the pool.
Definition: pool_core.hpp:329