The Gaudi Framework  v30r4 (9b837755)
HiveDataBroker.cpp
Go to the documentation of this file.
1 #include "HiveDataBroker.h"
6 #include "GaudiKernel/System.h"
7 #include "boost/lexical_cast.hpp"
8 #include "boost/tokenizer.hpp"
9 #include "range/v3/algorithm/for_each.hpp"
10 #include "range/v3/view/remove_if.hpp"
11 #include "range/v3/view/reverse.hpp"
12 #include "range/v3/view/transform.hpp"
13 #include <algorithm>
14 
16 
17 namespace
18 {
19  struct AlgorithmRepr {
20  const Algorithm& parent;
21 
22  friend std::ostream& operator<<( std::ostream& s, const AlgorithmRepr& a )
23  {
24  std::string typ = System::typeinfoName( typeid( a.parent ) );
25  s << typ;
26  if ( a.parent.name() != typ ) s << "/" << a.parent.name();
27  return s;
28  }
29  };
30 
31  struct DataObjIDSorter {
32  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
33  };
34 
35  // Sort a DataObjIDColl in a well-defined, reproducible manner.
36  // Used for making debugging dumps.
37  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll )
38  {
40  v.reserve( coll.size() );
41  for ( const DataObjID& id : coll ) v.push_back( &id );
42  std::sort( v.begin(), v.end(), DataObjIDSorter() );
43  return v;
44  }
45 
46  SmartIF<IAlgorithm> createAlgorithm( IAlgManager& am, const std::string& type, const std::string& name )
47  {
48  // Maybe modify the AppMgr interface to return Algorithm* ??
49  IAlgorithm* tmp;
50  StatusCode sc = am.createAlgorithm( type, name, tmp );
51  return {sc.isSuccess() ? dynamic_cast<Algorithm*>( tmp ) : nullptr};
52  }
53 
54  using AccessMode = Gaudi::v2::DataHandle::AccessMode;
55 }
56 
58 {
59  auto sc = Service::initialize();
60  if ( sc.isFailure() ) return sc;
61  // populate m_algorithms
62  m_algorithms = instantiateAndInitializeAlgorithms( m_producers );
63  if ( sc.isFailure() ) return sc;
64 
65  // warn about non-reentrant algorithms
66  ranges::for_each( m_algorithms | ranges::view::transform( []( const auto& entry ) { return entry.alg; } ) |
67  ranges::view::remove_if( []( const auto* alg ) { return alg->cardinality() == 0; } ),
68  [&]( const Algorithm* alg ) {
69  this->warning() << "non-reentrant algorithm: " << AlgorithmRepr{*alg} << endmsg;
70  } );
71  //== Print the list of the created algorithms
72  if ( msgLevel( MSG::DEBUG ) ) {
73  MsgStream& msg = debug();
74  msg << "Available DataProducers: ";
75  GaudiUtils::details::ostream_joiner( msg, m_algorithms, ", ", []( auto& os, const AlgEntry& e ) -> decltype(
76  auto ) { return os << AlgorithmRepr{*e.alg}; } );
77  msg << endmsg;
78  }
79 
80  // populate m_dependencies
81  m_dependencies = mapProducers( m_algorithms );
82 
83  return sc;
84 }
85 
87 {
88 
90  if ( !ss.isSuccess() ) return ss;
91 
92  // sysStart for m_algorithms
93  for ( AlgEntry& algEntry : m_algorithms ) {
94  ss = algEntry.alg->sysStart();
95  if ( ss.isFailure() ) {
96  error() << "Unable to start Algorithm: " << algEntry.alg->name() << endmsg;
97  return ss;
98  }
99  }
100  // sysStart for m_cfnodes
101  for ( AlgEntry& algEntry : m_cfnodes ) {
102  ss = algEntry.alg->sysStart();
103  if ( ss.isFailure() ) {
104  error() << "Unable to start Algorithm: " << algEntry.alg->name() << endmsg;
105  return ss;
106  }
107  }
108  return ss;
109 }
110 
112 {
113  StatusCode ss = Service::stop();
114  if ( !ss.isSuccess() ) return ss;
115 
116  // sysStart for m_algorithms
117  for ( AlgEntry& algEntry : m_algorithms ) {
118  ss = algEntry.alg->sysStop();
119  if ( ss.isFailure() ) {
120  error() << "Unable to stop Algorithm: " << algEntry.alg->name() << endmsg;
121  return ss;
122  }
123  }
124  // sysStart for m_cfnodes
125  for ( AlgEntry& algEntry : m_cfnodes ) {
126  ss = algEntry.alg->sysStop();
127  if ( ss.isFailure() ) {
128  error() << "Unable to stop Algorithm: " << algEntry.alg->name() << endmsg;
129  return ss;
130  }
131  }
132  return ss;
133 }
134 
136 {
137  ranges::for_each( m_algorithms | ranges::view::transform( &AlgEntry::alg ),
138  []( Algorithm* alg ) { alg->sysFinalize(); } );
139 
140  ranges::for_each(
141  m_algorithms | ranges::view::remove_if( []( const auto& entry ) { return entry.requestCount != 0; } ) |
142  ranges::view::transform( &AlgEntry::alg ),
143  [&]( Algorithm* alg ) { this->warning() << "Unused algorithm: " << AlgorithmRepr{*alg} << endmsg; } );
144  m_algorithms.clear();
145  return Service::finalize();
146 }
147 
148 // populate m_algorithms
151 {
152  std::vector<AlgEntry> algorithms;
153 
154  //= Get the Application manager, to see if algorithm exist
155  auto appMgr = service<IAlgManager>( "ApplicationMgr" );
156  for ( const Gaudi::Utils::TypeNameString item : names ) {
157  const std::string& theName = item.name();
158  const std::string& theType = item.type();
159 
160  //== Check wether the specified algorithm already exists. If not, create it
161  SmartIF<IAlgorithm> myIAlg = appMgr->algorithm( item, false ); // do not create it now
162  if ( !myIAlg ) {
163  myIAlg = createAlgorithm( *appMgr, theType, theName );
164  } else {
165  // when the algorithm is not created, the ref count is short by one, so we
166  // have to fix it.
167  myIAlg->addRef();
168  }
169 
170  if ( !myIAlg ) {
171  throw GaudiException{"Failed to create " + boost::lexical_cast<std::string>( item ), __func__,
173  }
174 
175  // propagate the sub-algorithm into own state.
176  StatusCode sc = myIAlg->sysInitialize();
177  if ( sc.isFailure() ) {
178  throw GaudiException{"Failed to initialize " + boost::lexical_cast<std::string>( item ), __func__,
180  }
181 
182  algorithms.emplace_back( std::move( myIAlg ) );
183  }
184 
185  return algorithms;
186 }
187 
190 {
191  // figure out all outputs
193  for ( AlgEntry& alg : algorithms ) {
194  const auto& output = alg.alg->dataDependencies( AccessMode::Write );
195  if ( output.empty() ) {
196  error() << AlgorithmRepr{*algorithms.back().alg} << " does not produce any data -- should not appear in list of "
197  "producers. Ignoring."
198  << endmsg;
199  continue;
200  }
201  for ( auto id : output ) {
202  if ( id.key().find( ":" ) != std::string::npos ) {
203  error() << " in Alg " << AlgorithmRepr{*alg.alg} << " alternatives are NOT allowed for outputs! id: " << id
204  << endmsg;
205  }
206 
207  auto r = producers.emplace( id, &alg );
208  if ( !r.second ) {
209  if ( output.size() == 1 ) {
210  warning() << "multiple algorithms declare " << id << " as output! -- IGNORING " << AlgorithmRepr{*alg.alg}
211  << endmsg;
212  } else {
213  error() << "multiple algorithms declate " << id << " as output; given that " << AlgorithmRepr{*alg.alg}
214  << " produces multiple outputs ";
215  //<< output <<
216  error() << " this could lead to clashes in case any of the other "
217  "items is ever requested"
218  << endmsg;
219  }
220  }
221  }
222  }
223 
224  // resolve dependencies
225  for ( auto& algEntry : algorithms ) {
226  auto input = sortedDataObjIDColl( algEntry.alg->dataDependencies( AccessMode::Read ) );
227  for ( const DataObjID* idp : input ) {
228  DataObjID id = *idp;
229  if ( id.key().find( ":" ) != std::string::npos ) {
230  warning() << " contains alternatives which require resolution...\n";
231  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
232  auto itok = std::find_if( tokens.begin(), tokens.end(),
233  [&]( DataObjID t ) { return producers.find( t ) != producers.end(); } );
234  if ( itok != tokens.end() ) {
235  warning() << "found matching output for " << *itok << " -- updating info\n";
236  id.updateKey( *itok );
237  warning() << "Please update input to not require alternatives, and "
238  "instead properly configure the dataloader"
239  << endmsg;
240  } else {
241  error() << "failed to find alternate in global output list"
242  << " for id: " << id << " in Alg " << algEntry.alg << endmsg;
243  }
244  }
245  auto iproducer = producers.find( id );
246  if ( iproducer != producers.end() ) {
247  algEntry.dependsOn.insert( iproducer->second );
248  } else {
249  // TODO: assign to dataloader!
250  // algEntry.dependsOn.insert(dataloader.alg);
251  // dataloader.data.emplace( id ); // TODO: we may ask to much of the
252  // dataloader this way...
253  }
254  }
255  }
256  return producers;
257 }
258 
260  const std::vector<std::string>& stoppers ) const
261 {
263 
265  deps.reserve( requested.size() );
266 
267  // start with seeding from the initial request
268  for ( const auto& req : requested ) {
269  DataObjID id = req;
270  if ( id.key().find( ":" ) != std::string::npos ) {
271  warning() << req.key() << " contains alternatives which require resolution...\n";
272  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
273  auto itok = std::find_if( tokens.begin(), tokens.end(),
274  [&]( DataObjID t ) { return m_dependencies.find( t ) != m_dependencies.end(); } );
275  if ( itok != tokens.end() ) {
276  warning() << "found matching output for " << *itok << " -- updating info\n";
277  id.updateKey( *itok );
278  warning() << "Please update input to not require alternatives, and "
279  "instead properly configure the dataloader"
280  << endmsg;
281  } else {
282  error() << "failed to find alternate in global output list"
283  << " for id: " << id << endmsg;
284  }
285  }
286  auto i = m_dependencies.find( id );
287  if ( i == m_dependencies.end() )
288  throw GaudiException( "unknown requested input: " + id.key(), __func__, StatusCode::FAILURE );
289  deps.push_back( i->second );
290  }
291  // insert the (direct) dependencies of 'current' right after 'current', and
292  // interate until done...
293  for ( auto current = deps.begin(); current != deps.end(); ++current ) {
294  if ( std::any_of( std::begin( stoppers ), std::end( stoppers ),
295  [current]( auto& stopper ) { return ( *current )->alg->name() == stopper; } ) ) {
296  continue;
297  }
298  for ( auto* entry : ( *current )->dependsOn ) {
299  if ( std::find( std::next( current ), deps.end(), entry ) != deps.end() ) continue; // already there downstream...
300 
301  auto dup = std::find( deps.begin(), current, entry );
302  // if present upstream, move it downstream. Otherwise, insert
303  // downstream...
304  current = std::prev( dup != current ? std::rotate( dup, std::next( dup ), std::next( current ) )
305  : deps.insert( std::next( current ), entry ) );
306  }
307  }
308  auto range = ( deps | ranges::view::transform( []( auto& i ) { return i->alg; } ) | ranges::view::reverse );
309  return {begin( range ), end( range )};
310 }
311 
313  const std::vector<std::string>& stoppers ) const
314 {
316 
317  auto alg = std::find_if( begin( m_cfnodes ), end( m_cfnodes ),
318  [&]( const AlgEntry& ae ) { return ae.alg->name() == requested.name(); } );
319 
320  if ( alg != end( m_cfnodes ) && alg->alg->type() != requested.type() ) {
321  error() << "requested " << requested << " but have matching name with different type: " << alg->alg->type()
322  << endmsg;
323  }
324  if ( alg == end( m_cfnodes ) ) {
325  auto av = instantiateAndInitializeAlgorithms( {requested.type() + '/' + requested.name()} );
326  assert( av.size() == 1 );
327  m_cfnodes.push_back( std::move( av.front() ) );
328  alg = std::next( m_cfnodes.rbegin() ).base();
329  }
330  assert( alg != end( m_cfnodes ) );
331  assert( alg->alg != nullptr );
332  if ( std::find_if( std::begin( stoppers ), std::end( stoppers ),
333  [&requested]( auto& stopper ) { return requested.name() == stopper; } ) == std::end( stoppers ) ) {
334  result = algorithmsRequiredFor( alg->alg->dataDependencies( AccessMode::Read ), stoppers );
335  }
336  result.push_back( alg->alg );
337  if ( msgLevel( MSG::DEBUG ) ) {
338  debug() << std::endl << "requested " << requested << " returning " << std::endl << " ";
339  GaudiUtils::details::ostream_joiner( debug(), result, ",\n ", []( auto& os, const Algorithm* a ) -> decltype(
340  auto ) { return os << AlgorithmRepr{*a}; } );
341  debug() << std::endl << endmsg;
342  }
343  return result;
344 }
constexpr static const auto FAILURE
Definition: StatusCode.h:88
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:63
Define general base for Gaudi exception.
std::map< DataObjID, AlgEntry * > mapProducers(std::vector< AlgEntry > &algorithms) const
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:747
StatusCode finalize() override
Definition: Service.cpp:173
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
Definition: System.cpp:332
StatusCode start() override
Definition: Service.cpp:136
IDataHandleMetadata::AccessMode AccessMode
Definition: DataHandle.h:115
bool isSuccess() const
Definition: StatusCode.h:287
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:52
T endl(T...args)
The IAlgManager is the interface implemented by the Algorithm Factory in the Application Manager to s...
Definition: IAlgManager.h:27
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name...
StatusCode finalize() override
StatusCode start() override
T end(T...args)
Algorithm * alg
bool isFailure() const
Definition: StatusCode.h:139
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
T prev(T...args)
STL class.
StatusCode stop() override
#define DECLARE_COMPONENT(type)
T push_back(T...args)
Helper class to parse a string of format "type/name".
T next(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
std::vector< AlgEntry > instantiateAndInitializeAlgorithms(const std::vector< std::string > &names) const
const std::string & key() const
Definition: DataObjID.h:49
StatusCode sysFinalize() override
System finalization.
Definition: Algorithm.cpp:659
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
StatusCode stop() override
Definition: Service.cpp:129
std::vector< Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const override
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
T move(T...args)
Stream & ostream_joiner(Stream &os, Iterator first, Iterator last, Separator sep, OutputElement output=OutputElement{})
Definition: SerializeSTL.h:40
T insert(T...args)
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
T find_if(T...args)
T size(T...args)
STL class.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T begin(T...args)
const std::string & type() const
T any_of(T...args)
T emplace(T...args)
T rotate(T...args)
appMgr
Definition: IOTest.py:94
string s
Definition: gaudirun.py:253
T sort(T...args)
StatusCode initialize() override
AttribStringParser::Iterator begin(const AttribStringParser &parser)
const std::string & name() const
std::string fullKey() const
Definition: DataObjID.cpp:99
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
T reserve(T...args)
std::ostream & operator<<(std::ostream &str, const GaudiAlg::ID &id)
Operator overloading for ostream.
Definition: GaudiHistoID.h:136
T emplace_back(T...args)