The Gaudi Framework  v30r3 (a5ef0a68)
AlgResourcePool.cpp
Go to the documentation of this file.
1 #include "AlgResourcePool.h"
4 
5 // C++
6 #include <functional>
7 #include <queue>
8 
9 // DP TODO: Manage smartifs and not pointers to algos
10 
11 // Instantiation of a static factory class used by clients to create instances of this service
13 
14 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
15 #define DEBUG_MSG ON_DEBUG debug()
16 
17 //---------------------------------------------------------------------------
18 
19 // destructor
21 {
22 
23  for ( auto& algoId_algoQueue : m_algqueue_map ) {
24  auto* queue = algoId_algoQueue.second;
25  delete queue;
26  }
27 }
28 
29 //---------------------------------------------------------------------------
30 
31 // initialize the pool with the list of algos known to the IAlgManager
33 {
34 
36  if ( !sc.isSuccess() ) warning() << "Base class could not be started" << endmsg;
37 
38  // Try to recover the topAlgList from the ApplicationManager for backward-compatibility
39  if ( m_topAlgNames.value().empty() ) {
40  info() << "TopAlg list empty. Recovering the one of Application Manager" << endmsg;
41  const Gaudi::Utils::TypeNameString appMgrName( "ApplicationMgr/ApplicationMgr" );
42  SmartIF<IProperty> appMgrProps( serviceLocator()->service( appMgrName ) );
43  m_topAlgNames.assign( appMgrProps->getProperty( "TopAlg" ) );
44  }
45 
46  sc = decodeTopAlgs();
47  if ( sc.isFailure() ) warning() << "Algorithms could not be properly decoded." << endmsg;
48 
49  // let's assume all resources are there
51  return StatusCode::SUCCESS;
52 }
53 
54 //---------------------------------------------------------------------------
55 
57 {
58 
59  StatusCode startSc = Service::start();
60  if ( !startSc.isSuccess() ) return startSc;
61 
62  // sys-Start the algos
63  for ( auto& ialgo : m_algList ) {
64  startSc = ialgo->sysStart();
65  if ( startSc.isFailure() ) {
66  error() << "Unable to start Algorithm: " << ialgo->name() << endmsg;
67  return startSc;
68  }
69  }
70  return StatusCode::SUCCESS;
71 }
72 
73 //---------------------------------------------------------------------------
74 
76 {
77 
78  std::hash<std::string> hash_function;
79  size_t algo_id = hash_function( name );
80  auto itQueueIAlgPtr = m_algqueue_map.find( algo_id );
81 
82  if ( itQueueIAlgPtr == m_algqueue_map.end() ) {
83  error() << "Algorithm " << name << " requested, but not recognised" << endmsg;
84  algo = nullptr;
85  return StatusCode::FAILURE;
86  }
87 
88  StatusCode sc;
89  if ( blocking ) {
90  itQueueIAlgPtr->second->pop( algo );
91  } else {
92  if ( !itQueueIAlgPtr->second->try_pop( algo ) ) {
94  }
95  }
96 
97  // Note that reentrant algos are not consumed so we put them
98  // back immediately in the queue at the end of this function.
99  // Now we may still be called again in between and get this
100  // error. In such a case, the Scheduler will retry later.
101  // This is of course not optimal, but should only happen very
102  // seldom and thud won't affect the global efficiency
103  if ( sc.isFailure() )
104  DEBUG_MSG << "No instance of algorithm " << name << " could be retrieved in non-blocking mode" << endmsg;
105 
106  // if (m_lazyCreation ) {
107  // TODO: fill the lazyCreation part
108  // }
109  if ( sc.isSuccess() ) {
110  state_type requirements = m_resource_requirements[algo_id];
112  if ( requirements.is_subset_of( m_available_resources ) ) {
113  m_available_resources ^= requirements;
114  } else {
115  sc = StatusCode::FAILURE;
116  error() << "Failure to allocate resources of algorithm " << name << endmsg;
117  // in case of not reentrant, push it back. Reentrant ones are pushed back
118  // in all cases further down
119  if ( 0 != algo->cardinality() ) {
120  itQueueIAlgPtr->second->push( algo );
121  }
122  }
124  if ( 0 == algo->cardinality() ) {
125  // push back reentrant algos immediately as it can be reused
126  itQueueIAlgPtr->second->push( algo );
127  }
128  }
129  return sc;
130 }
131 
132 //---------------------------------------------------------------------------
133 
135 {
136 
137  std::hash<std::string> hash_function;
138  size_t algo_id = hash_function( name );
139 
140  // release resources used by the algorithm
144 
145  // release algorithm itself if not reentrant
146  if ( 0 != algo->cardinality() ) {
147  m_algqueue_map[algo_id]->push( algo );
148  }
149  return StatusCode::SUCCESS;
150 }
151 
152 //---------------------------------------------------------------------------
153 
155 {
159  return StatusCode::SUCCESS;
160 }
161 
162 //---------------------------------------------------------------------------
163 
165 {
169  return StatusCode::SUCCESS;
170 }
171 
172 //---------------------------------------------------------------------------
173 
174 StatusCode AlgResourcePool::flattenSequencer( Algorithm* algo, ListAlg& alglist, unsigned int recursionDepth )
175 {
176 
178 
179  bool isGaudiSequencer( false );
180  bool isAthSequencer( false );
181 
182  if ( algo->isSequence() ) {
183  if ( algo->hasProperty( "ShortCircuit" ) )
184  isGaudiSequencer = true;
185  else if ( algo->hasProperty( "StopOverride" ) )
186  isAthSequencer = true;
187  }
188 
189  std::vector<Algorithm*>* subAlgorithms = algo->subAlgorithms();
190  if ( // we only want to add basic algorithms -> have no subAlgs
191  // and exclude the case of empty sequencers
192  ( subAlgorithms->empty() && !( isGaudiSequencer || isAthSequencer ) ) ) {
193 
194  alglist.emplace_back( algo );
195  return sc;
196  }
197 
198  // Recursively unroll
199  ++recursionDepth;
200 
201  for ( Algorithm* subalgo : *subAlgorithms ) {
202  sc = flattenSequencer( subalgo, alglist, recursionDepth );
203  if ( sc.isFailure() ) {
204  error() << "Algorithm " << subalgo->name() << " could not be flattened" << endmsg;
205  return sc;
206  }
207  }
208  return sc;
209 }
210 
211 //---------------------------------------------------------------------------
212 
214 {
215 
217  if ( !algMan.isValid() ) {
218  error() << "Algorithm manager could not be properly fetched." << endmsg;
219  return StatusCode::FAILURE;
220  }
221 
222  // Useful lambda not to repeat ourselves --------------------------
223  auto createAlg = [&algMan, this]( const std::string& item_type, const std::string& item_name, IAlgorithm*& algo ) {
224  StatusCode createAlgSc = algMan->createAlgorithm( item_type, item_name, algo, true, false );
225  if ( createAlgSc.isFailure() )
226  this->warning() << "Algorithm " << item_type << "/" << item_name << " could not be created." << endmsg;
227  };
228  // End of lambda --------------------------------------------------
229 
231 
232  // Fill the top alg list ----
233  const std::vector<std::string>& topAlgNames = m_topAlgNames.value();
234  for ( auto& name : topAlgNames ) {
235  IAlgorithm* algo( nullptr );
236 
238  const std::string& item_name = item.name();
239  const std::string& item_type = item.type();
240  SmartIF<IAlgorithm> algoSmartIF( algMan->algorithm( item_name, false ) );
241 
242  if ( !algoSmartIF.isValid() ) {
243  createAlg( item_type, item_name, algo );
244  algoSmartIF = algo;
245  }
246  // Init and start
247  algoSmartIF->sysInitialize().ignore();
248  m_topAlgList.push_back( algoSmartIF );
249  }
250  // Top Alg list filled ----
251 
252  // Now we unroll it ----
253  for ( auto& algoSmartIF : m_topAlgList ) {
254  Algorithm* algorithm = dynamic_cast<Algorithm*>( algoSmartIF.get() );
255  if ( !algorithm ) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
256  sc = flattenSequencer( algorithm, m_flatUniqueAlgList );
257  }
258  // stupid O(N^2) unique-ification..
259  for ( auto i = begin( m_flatUniqueAlgList ); i != end( m_flatUniqueAlgList ); ++i ) {
260  auto n = next( i );
261  while ( n != end( m_flatUniqueAlgList ) ) {
262  if ( *n == *i )
264  else
265  ++n;
266  }
267  }
268  if ( msgLevel( MSG::DEBUG ) ) {
269  debug() << "List of algorithms is: " << endmsg;
270  for ( auto& algo : m_flatUniqueAlgList )
271  debug() << " o " << algo->type() << "/" << algo->name() << " @ " << algo << endmsg;
272  }
273 
274  // Unrolled ---
275 
276  // Now let's manage the clones
277  unsigned int resource_counter( 0 );
278  std::hash<std::string> hash_function;
279  for ( auto& ialgoSmartIF : m_flatUniqueAlgList ) {
280 
281  const std::string& item_name = ialgoSmartIF->name();
282 
283  verbose() << "Treating resource management and clones of " << item_name << endmsg;
284 
285  Algorithm* algo = dynamic_cast<Algorithm*>( ialgoSmartIF.get() );
286  if ( !algo ) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
287  const std::string& item_type = algo->type();
288 
289  size_t algo_id = hash_function( item_name );
291  m_algqueue_map[algo_id] = queue;
292 
293  // DP TODO Do it properly with SmartIFs, also in the queues
294  IAlgorithm* ialgo( ialgoSmartIF.get() );
295 
296  queue->push( ialgo );
297  m_algList.push_back( ialgo );
298  if ( ialgo->isClonable() ) {
299  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
300  } else {
301  if ( ialgo->cardinality() == 1 ) {
302  m_n_of_allowed_instances[algo_id] = 1;
303  } else {
304  if ( !m_overrideUnClonable ) {
305  info() << "Algorithm " << ialgo->name() << " is un-Clonable but Cardinality was set to "
306  << ialgo->cardinality() << ". Only creating 1 instance" << endmsg;
307  m_n_of_allowed_instances[algo_id] = 1;
308  } else {
309  warning() << "Overriding UnClonability of Algorithm " << ialgo->name() << ". Setting Cardinality to "
310  << ialgo->cardinality() << endmsg;
311  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
312  }
313  }
314  }
315  m_n_of_created_instances[algo_id] = 1;
316 
317  state_type requirements( 0 );
318 
319  for ( auto& resource_name : ialgo->neededResources() ) {
320  auto ret = m_resource_indices.emplace( resource_name, resource_counter );
321  // insert successful means == wasn't known before. So increment counter
322  if ( ret.second ) ++resource_counter;
323  // Resize for every algo according to the found resources
324  requirements.resize( resource_counter );
325  // in any case the return value holds the proper product index
326  requirements[ret.first->second] = true;
327  }
328 
329  m_resource_requirements[algo_id] = requirements;
330 
331  // potentially create clones; if not lazy creation we have to do it now
332  if ( !m_lazyCreation ) {
333  for ( unsigned int i = 1, end = m_n_of_allowed_instances[algo_id]; i < end; ++i ) {
334  debug() << "type/name to create clone of: " << item_type << "/" << item_name << endmsg;
335  IAlgorithm* ialgoClone( nullptr );
336  createAlg( item_type, item_name, ialgoClone );
337  ialgoClone->setIndex( i );
338  if ( ialgoClone->sysInitialize().isFailure() ) {
339  error() << "unable to initialize Algorithm clone " << ialgoClone->name() << endmsg;
340  sc = StatusCode::FAILURE;
341  // FIXME: should we delete this failed clone?
342  } else {
343  queue->push( ialgoClone );
344  m_n_of_created_instances[algo_id] += 1;
345  }
346  }
347  }
348  }
349 
350  // Now resize all the requirement bitsets to the same size
351  for ( auto& kv : m_resource_requirements ) {
352  kv.second.resize( resource_counter );
353  }
354 
355  // Set all resources to be available
356  m_available_resources.resize( resource_counter );
357  m_available_resources.set();
358 
359  return sc;
360 }
361 
362 //---------------------------------------------------------------------------
363 
365 {
367  for ( auto algoSmartIF : m_flatUniqueAlgList )
368  m_flatUniqueAlgPtrList.push_back( const_cast<IAlgorithm*>( algoSmartIF.get() ) );
369  return m_flatUniqueAlgPtrList;
370 }
371 
372 //---------------------------------------------------------------------------
373 
375 {
377  for ( auto algoSmartIF : m_topAlgList ) m_topAlgPtrList.push_back( const_cast<IAlgorithm*>( algoSmartIF.get() ) );
378  return m_topAlgPtrList;
379 }
380 
381 //---------------------------------------------------------------------------
382 
384 {
385 
386  StatusCode stopSc = Service::stop();
387  if ( !stopSc.isSuccess() ) return stopSc;
388 
389  // sys-Stop the algos
390  for ( auto& ialgo : m_algList ) {
391  stopSc = ialgo->sysStop();
392  if ( stopSc.isFailure() ) {
393  error() << "Unable to stop Algorithm: " << ialgo->name() << endmsg;
394  return stopSc;
395  }
396  }
397  return StatusCode::SUCCESS;
398 }
ListAlg m_flatUniqueAlgList
The flat list of algorithms w/o clones.
virtual SmartIF< IAlgorithm > & algorithm(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
constexpr static const auto FAILURE
Definition: StatusCode.h:88
StatusCode initialize() override
Definition: Service.cpp:63
T unlock(T...args)
T empty(T...args)
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:288
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
StatusCode start() override
Definition: Service.cpp:136
std::map< std::string, unsigned int > m_resource_indices
bool isSuccess() const
Definition: StatusCode.h:287
bool hasProperty(const std::string &name) const override
Return true if we have a property with the given name.
StatusCode flattenSequencer(Algorithm *sequencer, ListAlg &alglist, unsigned int recursionDepth=0)
Recursively flatten an algList.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::mutex m_resource_mutex
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name...
StatusCode stop() override
T end(T...args)
virtual StatusCode getProperty(Gaudi::Details::PropertyBase *p) const =0
Get the property by property.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
bool isFailure() const
Definition: StatusCode.h:139
state_type m_available_resources
Gaudi::Property< bool > m_overrideUnClonable
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
STL class.
#define DECLARE_COMPONENT(type)
boost::dynamic_bitset state_type
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Helper class to parse a string of format "type/name".
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< IAlgorithm * > concurrentQueueIAlgPtr
std::list< IAlgorithm * > m_flatUniqueAlgPtrList
The flat list of algorithms w/o clones which is returned.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
std::map< size_t, state_type > m_resource_requirements
T erase(T...args)
StatusCode start() override
T lock(T...args)
~AlgResourcePool() override
std::list< IAlgorithm * > m_topAlgPtrList
The top list of algorithms.
virtual unsigned int cardinality() const =0
Cardinality (Maximum number of clones that can exist) special value 0 means that algorithm is reentra...
virtual void setIndex(const unsigned int &idx)=0
Set instantiation index of Alg.
std::list< IAlgorithm * > getFlatAlgList() override
StatusCode releaseResource(const std::string &name) override
Release a certrain resource.
Gaudi::Property< std::vector< std::string > > m_topAlgNames
std::list< IAlgorithm * > getTopAlgList() override
StatusCode stop() override
Definition: Service.cpp:129
T clear(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
StatusCode acquireResource(const std::string &name) override
Acquire a certain resource.
std::map< size_t, concurrentQueueIAlgPtr * > m_algqueue_map
const std::vector< Algorithm * > * subAlgorithms() const
List of sub-algorithms. Returns a pointer to a vector of (sub) Algorithms.
Definition: Algorithm.cpp:801
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
StatusCode decodeTopAlgs()
Decode the top alg list.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
bool isSequence() const override
Are we a Sequence?
Definition: Algorithm.h:218
const std::string & type() const
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:165
T emplace(T...args)
StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false) override
Acquire a certain algorithm using its name.
StatusCode service(const std::string &name, const T *&psvc, bool createIf=true) const
Access a service by name, creating it if it doesn&#39;t already exist.
Definition: Service.h:84
std::map< size_t, unsigned int > m_n_of_created_instances
#define DEBUG_MSG
AttribStringParser::Iterator begin(const AttribStringParser &parser)
const std::string & name() const
StatusCode initialize() override
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
ListAlg m_topAlgList
The list of top algorithms.
const std::string & type() const override
The type of the algorithm object.
Definition: Algorithm.h:166
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:291
std::map< size_t, size_t > m_n_of_allowed_instances
ListAlg m_algList
The list of all algorithms created withing the Pool which are not top.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo) override
Release a certain algorithm.
Gaudi::Property< bool > m_lazyCreation
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
T emplace_back(T...args)