The Gaudi Framework  v33r0 (d5ea422b)
IOBoundAlgSchedulerSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include <algorithm>
12 #include <map>
13 #include <queue>
14 #include <sstream>
15 #include <thread>
16 #include <unordered_set>
17 
18 // Framework includes
20 #include "GaudiKernel/IAlgorithm.h"
22 #include <Gaudi/Algorithm.h> // will be IAlgorithm if context getter promoted to interface
23 
24 // Local
25 #include "IOBoundAlgSchedulerSvc.h"
26 
27 // Instantiation of a static factory class used by clients to create instances of this service
29 
30 
35 
36  // Initialise mother class (read properties, ...)
38  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
39 
40  // Activate the scheduler in another thread.
41  info() << "Activating scheduler in a separate thread" << endmsg;
43 
44  return sc;
45 }
46 //---------------------------------------------------------------------------
47 
52 
54  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
55 
56  sc = deactivate();
57  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
58 
59  info() << "Joining preemptive scheduler's thread" << endmsg;
60  m_thread.join();
61 
62  return sc;
63 }
64 //---------------------------------------------------------------------------
72 
73  // Now it's running
74  m_isActive = true;
75 
76  // Wait for actions pushed into the queue by finishing tasks.
77  action thisAction;
79 
80  // Continue to wait if the scheduler is running or there is something to do
81  info() << "Start checking the queue of I/O-bound algorithm tasks.." << endmsg;
82  while ( m_isActive or m_actionsQueue.size() > 0 ) {
83  m_actionsQueue.pop( thisAction );
84  std::thread th( thisAction );
85  th.detach();
86  }
87 }
88 
89 //---------------------------------------------------------------------------
90 
98 
99  if ( m_isActive ) {
100  // Drain the scheduler
101  // m_actionsQueue.push(std::bind(&IOBoundSchedulerSvc::m_drain,
102  // this));
103  // we set the flag in this thread, not in the last action, to avoid stall,
104  // since we execute tasks asynchronously, in a detached thread, and it's possible that
105  // the task doesn't complete by the time the last while-iteration is entered
106  m_isActive = false;
107  // This would be the last (empty) action, just to trigger one last while-iteration
108  m_actionsQueue.push( []() -> StatusCode { return StatusCode::SUCCESS; } );
109  }
110 
111  return StatusCode::SUCCESS;
112 }
113 
115 
116  // the temporary lambda should be moved into the queue in here
117  auto actionn = [&]() {
118  debug() << " .. launching I/O-bound algo-closure .. " << endmsg;
119  return task.execute();
120  };
121 
122  m_actionsQueue.push( actionn );
123 
124  // until the queue to accelerator becomes limited
125  return StatusCode::SUCCESS;
126 }
127 
128 //===========================================================================
StatusCode initialize() override
Definition: Service.cpp:70
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
bool m_isActive
Flag to track if the scheduler is active or not.
Header file for class GaudiAlgorithm.
std::thread m_thread
The thread in which the activate function runs.
constexpr static const auto SUCCESS
Definition: StatusCode.h:96
virtual StatusCode execute()=0
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
#define DECLARE_COMPONENT(type)
General interface for a wrapper around Gaudi algorithm.
Definition: IAlgTask.h:25
T join(T... args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
StatusCode deactivate()
Deactivate scheduler.
T bind(T... args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isSuccess() const
Definition: StatusCode.h:361
void activate()
Activate scheduler.
StatusCode push(IAlgTask &task) override
Add an algorithm to local queue to run on accelerator.
T detach(T... args)
StatusCode finalize() override
Finalise.
Please refer to the full documentation of the methods for more details.
tbb::concurrent_bounded_queue< action > m_actionsQueue
This is done since the copy of the lambda storage is too expensive.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202