All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
AlgResourcePool.cpp
Go to the documentation of this file.
1 // Include Files
2 
3 // Framework
4 #include "AlgResourcePool.h"
5 #include "GaudiKernel/ISvcLocator.h"
6 #include "GaudiKernel/SvcFactory.h"
7 #include "GaudiKernel/ThreadGaudi.h"
8 
9 // C++
10 #include <functional>
11 #include <queue>
12 
13 // DP TODO: Manage smartifs and not pointers to algos
14 
15 // Instantiation of a static factory class used by clients to create instances of this service
17 
18 // constructor
20  base_class(name,svc), m_available_resources(0), m_EFGraph(0)
21 {
22  declareProperty("CreateLazily", m_lazyCreation = false );
23  declareProperty("TopAlg", m_topAlgNames );
24 }
25 
26 //---------------------------------------------------------------------------
27 
28 // destructor
30 
31  for (auto& algoId_algoQueue : m_algqueue_map){
32  auto* queue = algoId_algoQueue.second;
33  delete queue;
34  }
35 
36  delete m_EFGraph;
37 }
38 
39 //---------------------------------------------------------------------------
40 
41 // initialize the pool with the list of algos known to the IAlgManager
43 
45  if (!sc.isSuccess())
46  warning () << "Base class could not be started" << endmsg;
47 
48  // Try to recover the topAlgList from the ApplicationManager for backward-compatibility
49  if (m_topAlgNames.value().empty()){
50  info() << "TopAlg list empty. Recovering the one of Application Manager" << endmsg;
51  const Gaudi::Utils::TypeNameString appMgrName("ApplicationMgr/ApplicationMgr");
52  SmartIF<IProperty> appMgrProps (serviceLocator()->service(appMgrName));
53  m_topAlgNames.assign(appMgrProps->getProperty("TopAlg"));
54  }
55 
56  // XXX: Prepare empty Control Flow graph
57  const std::string& name = "ExecutionFlowGraph";
58  SmartIF<ISvcLocator> svc = serviceLocator();
60 
61  sc = decodeTopAlgs();
62  if (sc.isFailure())
63  warning() << "Algorithms could not be properly decoded." << endmsg;
64 
65  // let's assume all resources are there
67  return StatusCode::SUCCESS;
68 }
69 
70 //---------------------------------------------------------------------------
71 
73 
74  StatusCode startSc = Service::start();
75  if ( ! startSc.isSuccess() ) return startSc;
76 
77  // sys-Start the algos
78  for (auto& ialgo : m_algList){
79  startSc = ialgo->sysStart();
80  if (startSc.isFailure()){
81  error() << "Unable to start Algorithm: " << ialgo->name() << endmsg;
82  return startSc;
83  }
84  }
85  return StatusCode::SUCCESS;
86 }
87 
88 //---------------------------------------------------------------------------
89 
90 StatusCode AlgResourcePool::acquireAlgorithm(const std::string& name, IAlgorithm*& algo, bool blocking){
91 
92  std::hash<std::string> hash_function;
93  size_t algo_id = hash_function(name);
94  auto itQueueIAlgPtr = m_algqueue_map.find(algo_id);
95 
96  if (itQueueIAlgPtr == m_algqueue_map.end()) {
97  error() << "Algorithm " << name << " requested, but not recognised"
98  << endmsg;
99  algo = nullptr;
100  return StatusCode::FAILURE;
101  }
102 
103  StatusCode sc;
104  if (blocking) {
105  itQueueIAlgPtr->second->pop(algo);
106  sc = StatusCode::SUCCESS;
107  } else {
108  sc = itQueueIAlgPtr->second->try_pop(algo);
109  }
110 
111  if(sc.isFailure())
112  if (msgLevel(MSG::DEBUG))
113  debug() << "No instance of algorithm " << name << " could be retrieved in non-blocking mode" << endmsg;
114 
115  // if (m_lazyCreation ) {
116  // TODO: fill the lazyCreation part
117  //}
118  if (sc.isSuccess()){
119  algo->resetExecuted();
120  state_type requirements = m_resource_requirements[algo_id];
121  m_resource_mutex.lock();
122  if (requirements.is_subset_of(m_available_resources)) {
123  m_available_resources^=requirements;
124  } else {
125  sc = StatusCode::FAILURE;
126  error() << "Failure to allocate resources of algorithm " << name << endmsg;
127  itQueueIAlgPtr->second->push(algo);
128  }
129  m_resource_mutex.unlock();
130  }
131  return sc;
132 }
133 
134 //---------------------------------------------------------------------------
135 
137 
138  std::hash<std::string> hash_function;
139  size_t algo_id = hash_function(name);
140 
141  // release resources used by the algorithm
142  m_resource_mutex.lock();
144  m_resource_mutex.unlock();
145 
146  //release algorithm itself
147  m_algqueue_map[algo_id]->push(algo);
148  return StatusCode::SUCCESS;
149  }
150 
151 //---------------------------------------------------------------------------
152 
154  m_resource_mutex.lock();
156  m_resource_mutex.unlock();
157  return StatusCode::SUCCESS;
158 }
159 
160 //---------------------------------------------------------------------------
161 
163  m_resource_mutex.lock();
165  m_resource_mutex.unlock();
166  return StatusCode::SUCCESS;
167 }
168 
169 //---------------------------------------------------------------------------
170 
171 StatusCode AlgResourcePool::flattenSequencer(Algorithm* algo, ListAlg& alglist, const std::string& parentName, unsigned int recursionDepth){
172 
174 
175  std::vector<Algorithm*>* subAlgorithms = algo->subAlgorithms();
176  if ( //we only want to add basic algorithms -> have no subAlgs
177  // and exclude the case of empty GaudiSequencers
178  (subAlgorithms->empty() and not (algo->type() == "GaudiSequencer"))
179  // we want to add non-empty GaudiAtomicSequencers
180  or (algo->type() == "GaudiAtomicSequencer" and not subAlgorithms->empty())){
181 
182  debug() << std::string(recursionDepth, ' ') << algo->name() << " is " <<
183  (algo->type() != "GaudiAtomicSequencer" ? "not a sequencer" : "an atomic sequencer")
184  << ". Appending it" << endmsg;
185 
186  alglist.emplace_back(algo);
187  m_EFGraph->addAlgorithmNode(algo,parentName,false,false);
188  return sc;
189  }
190 
191  // Recursively unroll
192  ++recursionDepth;
193  debug() << std::string(recursionDepth, ' ') << algo->name() << " is a sequencer. Flattening it." << endmsg;
194  bool modeOR = false;
195  bool allPass = false;
196  bool isLazy = false;
197  if ("GaudiSequencer" == algo->type()) {
198  modeOR = (algo->getProperty("ModeOR").toString() == "True")? true : false;
199  allPass = (algo->getProperty("IgnoreFilterPassed").toString() == "True")? true : false;
200  isLazy = (algo->getProperty("ShortCircuit").toString() == "True")? true : false;
201  if (allPass) isLazy = false; // standard GaudiSequencer behavior on all pass is to execute everything
202  }
203  sc = m_EFGraph->addDecisionHubNode(algo,parentName,modeOR,allPass,isLazy);
204  if (sc.isFailure()) {
205  error() << "Failed to add DecisionHub " << algo->name() << " to execution flow graph" << endmsg;
206  return sc;
207  }
208 
209  for (Algorithm* subalgo : *subAlgorithms ) {
210  sc = flattenSequencer(subalgo,alglist,algo->name(),recursionDepth);
211  if (sc.isFailure()) {
212  error() << "Algorithm " << subalgo->name() << " could not be flattened" << endmsg;
213  return sc;
214  }
215  }
216  return sc;
217 }
218 
219 //---------------------------------------------------------------------------
220 
222 
223  SmartIF<IAlgManager> algMan ( serviceLocator() );
224  if (!algMan.isValid()){
225  error() << "Algorithm manager could not be properly fetched." << endmsg;
226  return StatusCode::FAILURE;
227  }
228 
229  // Useful lambda not to repeat ourselves --------------------------
230  auto createAlg = [&algMan,this] (const std::string& item_type,
231  const std::string& item_name,
232  IAlgorithm*& algo){
233  StatusCode createAlgSc = algMan->createAlgorithm(item_type,
234  item_name,
235  algo,
236  true,
237  false);
238  if (createAlgSc.isFailure())
239  this->warning() << "Algorithm " << item_type << "/" << item_name
240  << " could not be created." << endmsg;
241  };
242  // End of lambda --------------------------------------------------
243 
245 
246  // Fill the top alg list ----
247  const std::vector<std::string>& topAlgNames = m_topAlgNames.value();
248  for (auto& name : topAlgNames) {
249  IAlgorithm* algo(nullptr);
250 
252  const std::string& item_name = item.name();
253  const std::string& item_type = item.type();
254  SmartIF<IAlgorithm> algoSmartIF (algMan->algorithm(item_name,false));
255 
256  if (!algoSmartIF.isValid()){
257  createAlg(item_type,item_name,algo);
258  algoSmartIF = algo;
259  }
260  // Init and start
261  algoSmartIF->sysInitialize();
262  m_topAlgList.push_back(algoSmartIF);
263  }
264  // Top Alg list filled ----
265 
266  // start forming the control flow graph by adding the head node
267  m_EFGraph->addHeadNode("EVENT LOOP",true,true,false);
268 
269  // Now we unroll it ----
270  for (auto& algoSmartIF : m_topAlgList){
271  Algorithm* algorithm = dynamic_cast<Algorithm*> (algoSmartIF.get());
272  if (!algorithm) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
273  sc = flattenSequencer(algorithm, m_flatUniqueAlgList, "EVENT LOOP");
274  }
275  if (outputLevel() <= MSG::DEBUG){
276  debug() << "List of algorithms is: " << endmsg;
277  for (auto& algo : m_flatUniqueAlgList)
278  debug() << " o " << algo->type() << "/" << algo->name() << " @ " << algo << endmsg;
279  }
280 
281  // Unrolled ---
282 
283  // Now let's manage the clones
284  unsigned int resource_counter(0);
285  std::hash<std::string> hash_function;
286  for (auto& ialgoSmartIF : m_flatUniqueAlgList) {
287 
288  const std::string& item_name = ialgoSmartIF->name();
289 
290  verbose() << "Treating resource management and clones of " << item_name << endmsg;
291 
292  Algorithm* algo = dynamic_cast<Algorithm*> ( ialgoSmartIF.get() );
293  if (!algo) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
294  const std::string& item_type = algo->type();
295 
296  size_t algo_id = hash_function(item_name);
298  m_algqueue_map[algo_id] = queue;
299 
300  // DP TODO Do it properly with SmartIFs, also in the queues
301  IAlgorithm* ialgo(ialgoSmartIF.get());
302 
303  queue->push(ialgo);
304  m_algList.push_back(ialgo);
305  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
306  m_n_of_created_instances[algo_id] = 1;
307 
308  state_type requirements(0);
309 
310  for (auto& resource_name : ialgo->neededResources()){
311  auto ret = m_resource_indices.insert(std::pair<std::string, unsigned int>(resource_name,resource_counter));
312  // insert successful means == wasn't known before. So increment counter
313  if (ret.second==true) {
314  ++resource_counter;
315  }
316  // Resize for every algo according to the found resources
317  requirements.resize(resource_counter);
318  // in any case the return value holds the proper product index
319  requirements[ret.first->second] = true;
320 
321  }
322 
323  m_resource_requirements[algo_id] = requirements;
324 
325  // potentially create clones; if not lazy creation we have to do it now
326  if (!m_lazyCreation) {
327  for (unsigned int i =1, end =ialgo->cardinality();i<end; ++i){
328  debug() << "type/name to create clone of: " << item_type << "/" << item_name << endmsg;
329  IAlgorithm* ialgoClone(nullptr);
330  createAlg(item_type,item_name,ialgoClone);
331  ialgoClone->sysInitialize();
332  queue->push(ialgoClone);
333  m_n_of_created_instances[algo_id]+=1;
334  }
335  }
336 
338 
339  }
340 
341  // Now resize all the requirement bitsets to the same size
342  for (auto& kv : m_resource_requirements) {
343  kv.second.resize(resource_counter);
344  }
345 
346  // Set all resources to be available
347  m_available_resources.resize(resource_counter);
348  m_available_resources.set();
349 
350  return sc;
351 }
352 
353 //---------------------------------------------------------------------------
354 
355 std::list<IAlgorithm*> AlgResourcePool::getFlatAlgList(){
356  m_flatUniqueAlgPtrList.clear();
357  for (auto algoSmartIF :m_flatUniqueAlgList )
358  m_flatUniqueAlgPtrList.push_back(const_cast<IAlgorithm*>(algoSmartIF.get()));
359  return m_flatUniqueAlgPtrList;
360 }
361 
362 //---------------------------------------------------------------------------
363 
364 std::list<IAlgorithm*> AlgResourcePool::getTopAlgList(){
365  m_topAlgPtrList.clear();
366  for (auto algoSmartIF :m_topAlgList )
367  m_topAlgPtrList.push_back(const_cast<IAlgorithm*>(algoSmartIF.get()));
368  return m_topAlgPtrList;
369 }
370 
371 //---------------------------------------------------------------------------
372 
374  auto algBeginRun = [&] (SmartIF<IAlgorithm>& algoSmartIF) -> StatusCode {
375  StatusCode sc = algoSmartIF->sysBeginRun();
376  if (!sc.isSuccess()) {
377  warning() << "beginRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
378  return StatusCode::FAILURE;
379  }
380  return StatusCode::SUCCESS;
381  };
382  // Call the beginRun() method of all algorithms
383  for (auto& algoSmartIF : m_flatUniqueAlgList ) {
384  if (algBeginRun(algoSmartIF).isFailure())
385  return StatusCode::FAILURE;
386  }
387 
388  return StatusCode::SUCCESS;
389 }
390 
391 //---------------------------------------------------------------------------
392 
394 
395  auto algEndRun = [&] (SmartIF<IAlgorithm>& algoSmartIF) -> StatusCode {
396  StatusCode sc = algoSmartIF->sysEndRun();
397  if (!sc.isSuccess()) {
398  warning() << "endRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
399  return StatusCode::FAILURE;
400  }
401  return StatusCode::SUCCESS;
402  };
403  // Call the beginRun() method of all top algorithms
404  for (auto& algoSmartIF : m_flatUniqueAlgList ) {
405  if (algEndRun(algoSmartIF).isFailure())
406  return StatusCode::FAILURE;
407  }
408  for (auto& algoSmartIF : m_topAlgList ) {
409  if (algEndRun(algoSmartIF).isFailure())
410  return StatusCode::FAILURE;
411  }
412  return StatusCode::SUCCESS;
413 }
414 
415 //---------------------------------------------------------------------------
416 
418 
419  StatusCode stopSc = Service::stop();
420  if ( ! stopSc.isSuccess() ) return stopSc;
421 
422  // sys-Stop the algos
423  for (auto& ialgo : m_algList){
424  stopSc = ialgo->sysStop();
425  if (stopSc.isFailure()){
426  error() << "Unable to stop Algorithm: " << ialgo->name() << endmsg;
427  return stopSc;
428  }
429  }
430  return StatusCode::SUCCESS;
431 }
ListAlg m_flatUniqueAlgList
The flat list of algorithms w/o clones.
virtual SmartIF< IAlgorithm > & algorithm(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
virtual StatusCode releaseResource(const std::string &name)
Release a certrain resource.
StringArrayProperty m_topAlgNames
The names of the algorithms to be passed to the algorithm manager.
StatusCode initialize() override
Definition: Service.cpp:63
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
virtual std::list< IAlgorithm * > getTopAlgList()
virtual StatusCode getProperty(Property *p) const =0
Get the property by property.
std::map< size_t, concurrentQueueIAlgPtr * > m_algqueue_map
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
Add a node, which aggregates decisions of direct daughter nodes.
StatusCode start() override
Definition: Service.cpp:147
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
std::mutex m_resource_mutex
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name...
STL namespace.
std::list< SmartIF< IAlgorithm > > ListAlg
virtual StatusCode initialize()
const std::string & type() const override
The type of the algorithm object.
Definition: Algorithm.h:167
virtual void resetExecuted()=0
Reset the Algorithm executed state for the current event.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
state_type m_available_resources
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:919
boost::dynamic_bitset state_type
Helper class to parse a string of format "type/name".
Definition: TypeNameString.h:9
std::map< size_t, size_t > m_n_of_allowed_instances
StatusCode getProperty(Property *p) const override
Implementation of IProperty::getProperty.
Definition: Algorithm.cpp:1104
tbb::concurrent_bounded_queue< IAlgorithm * > concurrentQueueIAlgPtr
std::list< IAlgorithm * > m_flatUniqueAlgPtrList
The flat list of algorithms w/o clones which is returned.
auto end(reverse_wrapper< T > &w)
Definition: reverse.h:47
virtual StatusCode acquireResource(const std::string &name)
Acquire a certain resource.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
concurrency::ExecutionFlowGraph * m_EFGraph
OMG yet another hack.
std::list< IAlgorithm * > m_topAlgPtrList
The top list of algorithms.
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:354
StatusCode stop() override
Definition: Service.cpp:141
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
virtual StatusCode start()
const TYPE & value() const
explicit conversion
Definition: Property.h:341
virtual StatusCode endRun()
const std::vector< Algorithm * > * subAlgorithms() const
List of sub-algorithms. Returns a pointer to a vector of (sub) Algorithms.
Definition: Algorithm.cpp:956
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
bool assign(const Property &source) override
get the value from another property
Definition: Property.h:269
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
StatusCode decodeTopAlgs()
Decode the top alg list.
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
virtual std::list< IAlgorithm * > getFlatAlgList()
const std::string & type() const
StatusCode flattenSequencer(Algorithm *sequencer, ListAlg &alglist, const std::string &parentName, unsigned int recursionDepth=0)
Recursively flatten an algList.
std::map< size_t, state_type > m_resource_requirements
tuple item
print s1,s2
Definition: ana.py:146
void addHeadNode(const std::string &headName, bool modeOR, bool allPass, bool isLazy)
Add a node, which has no parents.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)
Release a certain algorithm.
const std::string & name() const
ListAlg m_topAlgList
The list of top algorithms.
virtual StatusCode stop()
std::map< std::string, unsigned int > m_resource_indices
void attachAlgorithmsToNodes(const std::string &algo_name, const T &container)
Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph.
list i
Definition: ana.py:128
virtual StatusCode beginRun()
std::map< size_t, unsigned int > m_n_of_created_instances
ListAlg m_algList
The list of all algorithms created withing the Pool which are not top.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)
Acquire a certain algorithm using its name.