The Gaudi Framework  v36r16 (ea80daf8)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
HiveDataBroker.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "HiveDataBroker.h"
14 #include "GaudiKernel/System.h"
15 #include "boost/lexical_cast.hpp"
16 #include "boost/tokenizer.hpp"
17 #include <Gaudi/Algorithm.h>
18 #include <algorithm>
19 #include <iomanip>
20 #ifdef __cpp_lib_ranges
21 # include <ranges>
22 namespace ranges = std::ranges;
23 #else
24 # include "range/v3/algorithm/for_each.hpp"
25 # include "range/v3/view/filter.hpp"
26 # include "range/v3/view/reverse.hpp"
27 # include "range/v3/view/transform.hpp"
28 // upstream has renamed namespace ranges::view ranges::views
29 # if RANGE_V3_VERSION < 900
30 namespace ranges::views {
31  using namespace ranges::view;
32 }
33 # endif
34 #endif
35 
37 
38 namespace {
39  struct AlgorithmRepr {
40  const Gaudi::Algorithm& parent;
41 
42  friend std::ostream& operator<<( std::ostream& s, const AlgorithmRepr& a ) {
43  std::string typ = System::typeinfoName( typeid( a.parent ) );
44  s << typ;
45  if ( a.parent.name() != typ ) s << "/" << a.parent.name();
46  return s;
47  }
48  };
49 
50  struct DataObjIDSorter {
51  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
52  };
53 
54  // Sort a DataObjIDColl in a well-defined, reproducible manner.
55  // Used for making debugging dumps.
56  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
58  v.reserve( coll.size() );
59  for ( const DataObjID& id : coll ) v.push_back( &id );
60  std::sort( v.begin(), v.end(), DataObjIDSorter() );
61  return v;
62  }
63 
64  SmartIF<IAlgorithm> createAlgorithm( IAlgManager& am, const std::string& type, const std::string& name ) {
65  // Maybe modify the AppMgr interface to return Algorithm* ??
66  IAlgorithm* tmp = nullptr;
67  StatusCode sc = am.createAlgorithm( type, name, tmp );
68  return sc.isSuccess() ? dynamic_cast<Gaudi::Algorithm*>( tmp ) : nullptr;
69  }
70 } // namespace
71 
73  return Service::initialize().andThen( [&] {
74  // populate m_algorithms
75  m_algorithms = instantiateAndInitializeAlgorithms( m_producers );
76 
77  // warn about non-reentrant algorithms
78  ranges::for_each( m_algorithms | ranges::views::transform( []( const auto& entry ) { return entry.alg; } ) |
79  ranges::views::filter( []( const auto* alg ) { return alg->cardinality() > 0; } ),
80  [&]( const Gaudi::Algorithm* alg ) {
81  this->warning() << "non-reentrant algorithm: " << AlgorithmRepr{ *alg } << endmsg;
82  } );
83  //== Print the list of the created algorithms
84  if ( msgLevel( MSG::DEBUG ) ) {
85  MsgStream& msg = debug();
86  msg << "Available DataProducers: ";
88  msg, m_algorithms, ", ",
89  []( auto& os, const AlgEntry& e ) -> decltype( auto ) { return os << AlgorithmRepr{ *e.alg }; } );
90  msg << endmsg;
91  }
92 
93  // populate m_dependencies
94  m_dependencies = mapProducers( m_algorithms );
95  } );
96 }
97 
99 
100  StatusCode ss = Service::start();
101  if ( !ss.isSuccess() ) return ss;
102 
103  // sysStart for m_algorithms
104  for ( AlgEntry& algEntry : m_algorithms ) {
105  ss = algEntry.alg->sysStart();
106  if ( ss.isFailure() ) {
107  error() << "Unable to start Algorithm: " << algEntry.alg->name() << endmsg;
108  return ss;
109  }
110  }
111  // sysStart for m_cfnodes
112  for ( AlgEntry& algEntry : m_cfnodes ) {
113  ss = algEntry.alg->sysStart();
114  if ( ss.isFailure() ) {
115  error() << "Unable to start Algorithm: " << algEntry.alg->name() << endmsg;
116  return ss;
117  }
118  }
119  return ss;
120 }
121 
123  StatusCode ss = Service::stop();
124  if ( !ss.isSuccess() ) return ss;
125 
126  // sysStart for m_algorithms
127  for ( AlgEntry& algEntry : m_algorithms ) {
128  ss = algEntry.alg->sysStop();
129  if ( ss.isFailure() ) {
130  error() << "Unable to stop Algorithm: " << algEntry.alg->name() << endmsg;
131  return ss;
132  }
133  }
134  // sysStart for m_cfnodes
135  for ( AlgEntry& algEntry : m_cfnodes ) {
136  ss = algEntry.alg->sysStop();
137  if ( ss.isFailure() ) {
138  error() << "Unable to stop Algorithm: " << algEntry.alg->name() << endmsg;
139  return ss;
140  }
141  }
142  return ss;
143 }
144 
146  ranges::for_each( m_algorithms | ranges::views::transform( &AlgEntry::alg ), []( Gaudi::Algorithm* alg ) {
147  alg->sysFinalize().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
148  } );
149  m_algorithms.clear();
150  return Service::finalize();
151 }
152 
153 // populate m_algorithms
157 
158  //= Get the Application manager, to see if algorithm exist
159  auto appMgr = service<IAlgManager>( "ApplicationMgr" );
160  for ( const Gaudi::Utils::TypeNameString item : names ) {
161  const std::string& theName = item.name();
162  const std::string& theType = item.type();
163 
164  //== Check wether the specified algorithm already exists. If not, create it
165  SmartIF<IAlgorithm> myIAlg = appMgr->algorithm( item, false ); // do not create it now
166  if ( !myIAlg ) {
167  myIAlg = createAlgorithm( *appMgr, theType, theName );
168  } else {
169  // when the algorithm is not created, the ref count is short by one, so we
170  // have to fix it.
171  myIAlg->addRef();
172  }
173 
174  if ( !myIAlg ) {
175  throw GaudiException{ "Failed to create " + boost::lexical_cast<std::string>( item ), __func__,
177  }
178 
179  // propagate the sub-algorithm into own state.
180  StatusCode sc = myIAlg->sysInitialize();
181  if ( sc.isFailure() ) {
182  throw GaudiException{ "Failed to initialize " + boost::lexical_cast<std::string>( item ), __func__,
184  }
185 
186  algorithms.emplace_back( std::move( myIAlg ) );
187  }
188 
189  return algorithms;
190 }
191 
194  if ( msgLevel( MSG::DEBUG ) ) {
195  debug() << "Data Dependencies for Algorithms:";
196  for ( const auto& entry : m_algorithms ) {
197  debug() << "\n " << entry.alg->name() << " :";
198  for ( const auto* id : sortedDataObjIDColl( entry.alg->inputDataObjs() ) ) {
199  debug() << "\n o INPUT " << id->key();
200  }
201  for ( const auto* id : sortedDataObjIDColl( entry.alg->outputDataObjs() ) ) {
202  debug() << "\n o OUTPUT " << id->key();
203  }
204  }
205  debug() << endmsg;
206  }
207 
208  // figure out all outputs
210  for ( AlgEntry& alg : algorithms ) {
211  const auto& output = alg.alg->outputDataObjs();
212  if ( output.empty() ) { continue; }
213  for ( auto id : output ) {
214  if ( id.key().find( ":" ) != std::string::npos ) {
215  error() << " in Alg " << AlgorithmRepr{ *alg.alg } << " alternatives are NOT allowed for outputs! id: " << id
216  << endmsg;
217  }
218 
219  auto r = producers.emplace( id, &alg );
220  if ( !r.second ) {
221  throw GaudiException( "multiple algorithms declare " + id.key() + " as output (" + alg.alg->name() + " and " +
222  producers[id]->alg->name() + " at least). This is not allowed",
223  __func__, StatusCode::FAILURE );
224  }
225  }
226  }
227 
228  // resolve dependencies
229  for ( auto& algEntry : algorithms ) {
230  auto input = sortedDataObjIDColl( algEntry.alg->inputDataObjs() );
231  for ( const DataObjID* idp : input ) {
232  DataObjID id = *idp;
233  if ( id.key().find( ":" ) != std::string::npos ) {
234  warning() << AlgorithmRepr{ *( algEntry.alg ) } << " contains alternatives which require resolution...\n";
235  auto tokens = boost::tokenizer<boost::char_separator<char>>{ id.key(), boost::char_separator<char>{ ":" } };
236  auto itok = std::find_if( tokens.begin(), tokens.end(),
237  [&]( DataObjID t ) { return producers.find( t ) != producers.end(); } );
238  if ( itok != tokens.end() ) {
239  warning() << "found matching output for " << *itok << " -- updating info\n";
240  id.updateKey( *itok );
241  warning() << "Please update input to not require alternatives, and "
242  "instead properly configure the dataloader"
243  << endmsg;
244  } else {
245  error() << "failed to find alternate in global output list"
246  << " for id: " << id << " in Alg " << algEntry.alg << endmsg;
247  }
248  }
249  auto iproducer = producers.find( id );
250  if ( iproducer != producers.end() ) {
251  algEntry.dependsOn.insert( iproducer->second );
252  } else {
253  std::ostringstream error_message;
254  error_message << "\nUnknown requested input by " << AlgorithmRepr{ *( algEntry.alg ) } << " : "
255  << std::quoted( id.key(), '\'' ) << ".\n";
256  error_message << "You can set the OutputLevel of HiveDataBrokerSvc to DEBUG to get a list of inputs and "
257  "outputs of every registered algorithm.\n";
258  throw GaudiException( error_message.str(), __func__, StatusCode::FAILURE );
259  // TODO: assign to dataloader!
260  // algEntry.dependsOn.insert(dataloader.alg);
261  // dataloader.data.emplace( id ); // TODO: we may ask to much of the
262  // dataloader this way...
263  }
264  }
265  }
266  return producers;
267 }
268 
271  const std::vector<std::string>& stoppers ) const {
273 
275  deps.reserve( requested.size() );
276 
277  // start with seeding from the initial request
278  for ( const auto& req : requested ) {
279  DataObjID id = req;
280  if ( id.key().find( ":" ) != std::string::npos ) {
281  warning() << req.key() << " contains alternatives which require resolution...\n";
282  auto tokens = boost::tokenizer<boost::char_separator<char>>{ id.key(), boost::char_separator<char>{ ":" } };
283  auto itok = std::find_if( tokens.begin(), tokens.end(),
284  [&]( DataObjID t ) { return m_dependencies.find( t ) != m_dependencies.end(); } );
285  if ( itok != tokens.end() ) {
286  warning() << "found matching output for " << *itok << " -- updating info\n";
287  id.updateKey( *itok );
288  warning() << "Please update input to not require alternatives, and "
289  "instead properly configure the dataloader"
290  << endmsg;
291  } else {
292  error() << "failed to find alternate in global output list"
293  << " for id: " << id << endmsg;
294  }
295  }
296  auto i = m_dependencies.find( id );
297  if ( i == m_dependencies.end() )
298  throw GaudiException( "unknown requested input: " + id.key(), __func__, StatusCode::FAILURE );
299  deps.push_back( i->second );
300  }
301  // producers may be responsible for multiple requested DataObjID -- make sure they are only mentioned once
302  std::sort( deps.begin(), deps.end() );
303  deps.erase( std::unique( deps.begin(), deps.end() ), deps.end() );
304 
305  // insert the (direct) dependencies of 'current' right after 'current', and
306  // interate until done...
307  for ( auto current = deps.begin(); current != deps.end(); ++current ) {
308  if ( std::any_of( std::begin( stoppers ), std::end( stoppers ),
309  [current]( auto& stopper ) { return ( *current )->alg->name() == stopper; } ) ) {
310  continue;
311  }
312  for ( auto* entry : ( *current )->dependsOn ) {
313  if ( std::find( std::next( current ), deps.end(), entry ) != deps.end() ) continue; // already there downstream...
314 
315  auto dup = std::find( deps.begin(), current, entry );
316  // if present upstream, move it downstream. Otherwise, insert
317  // downstream...
318  current = std::prev( dup != current ? std::rotate( dup, std::next( dup ), std::next( current ) )
319  : deps.insert( std::next( current ), entry ) );
320  }
321  }
322  auto range = ( deps | ranges::views::transform( []( auto& i ) { return i->alg; } ) | ranges::views::reverse );
323  return { begin( range ), end( range ) };
324 }
325 
328  const std::vector<std::string>& stoppers ) const {
330 
331  auto alg = std::find_if( begin( m_cfnodes ), end( m_cfnodes ),
332  [&]( const AlgEntry& ae ) { return ae.alg->name() == requested.name(); } );
333 
334  if ( alg != end( m_cfnodes ) && alg->alg->type() != requested.type() ) {
335  error() << "requested " << requested << " but have matching name with different type: " << alg->alg->type()
336  << endmsg;
337  }
338  if ( alg == end( m_cfnodes ) ) {
339  auto av = instantiateAndInitializeAlgorithms( { requested.type() + '/' + requested.name() } );
340  assert( av.size() == 1 );
341  m_cfnodes.push_back( std::move( av.front() ) );
342  alg = std::next( m_cfnodes.rbegin() ).base();
343  }
344  assert( alg != end( m_cfnodes ) );
345  assert( alg->alg != nullptr );
346  if ( std::find_if( std::begin( stoppers ), std::end( stoppers ),
347  [&requested]( auto& stopper ) { return requested.name() == stopper; } ) == std::end( stoppers ) ) {
348  result = algorithmsRequiredFor( alg->alg->inputDataObjs(), stoppers );
349  }
350  result.push_back( alg->alg );
351  if ( msgLevel( MSG::DEBUG ) ) {
352  debug() << std::endl << "requested " << requested << " returning " << std::endl << " ";
354  debug(), result, ",\n ",
355  []( auto& os, const Gaudi::Algorithm* a ) -> decltype( auto ) { return os << AlgorithmRepr{ *a }; } );
356  debug() << std::endl << endmsg;
357  }
358  return result;
359 }
MSG::DEBUG
@ DEBUG
Definition: IMessageSvc.h:25
Histograms_with_global.algorithms
algorithms
Definition: Histograms_with_global.py:19
HiveDataBrokerSvc::initialize
StatusCode initialize() override
Definition: HiveDataBroker.cpp:72
IAlgManager.h
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
std::string
STL class.
Gaudi::Utils::TypeNameString::name
const std::string & name() const
Definition: TypeNameString.h:49
StatusCode::andThen
StatusCode andThen(F &&f, ARGS &&... args) const
Chain code blocks making the execution conditional a success result.
Definition: StatusCode.h:163
std::move
T move(T... args)
Gaudi::Algorithm::name
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:528
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
Service::start
StatusCode start() override
Definition: Service.cpp:187
std::unordered_set< DataObjID, DataObjID_Hasher >
System.h
std::vector::reserve
T reserve(T... args)
GaudiException.h
reverse
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:59
gaudirun.s
string s
Definition: gaudirun.py:348
std::vector
STL class.
std::find_if
T find_if(T... args)
std::unordered_set::size
T size(T... args)
GaudiException
Definition: GaudiException.h:31
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
std::map::emplace
T emplace(T... args)
ranges
Definition: FunctionalDetails.h:34
std::rotate
T rotate(T... args)
std::any_of
T any_of(T... args)
ranges::views
Definition: FunctionalDetails.h:34
System::typeinfoName
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
Definition: System.cpp:313
gaudirun.output
output
Definition: gaudirun.py:523
HiveDataBrokerSvc::stop
StatusCode stop() override
Definition: HiveDataBroker.cpp:122
std::sort
T sort(T... args)
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:222
basic.alg
alg
Definition: basic.py:15
DataObjID::fullKey
std::string fullKey() const
combination of the key and the ClassName, mostly for debugging
Definition: DataObjID.cpp:99
std::vector::push_back
T push_back(T... args)
bug_34121.t
t
Definition: bug_34121.py:30
IAlgManager
Definition: IAlgManager.h:37
GaudiAlg::operator<<
std::ostream & operator<<(std::ostream &str, const GaudiAlg::ID &id)
Operator overloading for ostream.
Definition: GaudiHistoID.h:141
Gaudi::Utils::TypeNameString
Helper class to parse a string of format "type/name".
Definition: TypeNameString.h:20
TimingHistograms.name
name
Definition: TimingHistograms.py:25
StatusCode
Definition: StatusCode.h:65
HistoDumpEx.r
r
Definition: HistoDumpEx.py:20
IAlgorithm
Definition: IAlgorithm.h:38
HiveDataBrokerSvc::instantiateAndInitializeAlgorithms
std::vector< AlgEntry > instantiateAndInitializeAlgorithms(const std::vector< std::string > &names) const
Definition: HiveDataBroker.cpp:155
std::ostream
STL class.
CLHEP::begin
double * begin(CLHEP::HepVector &v)
Definition: TupleAlg.cpp:45
HiveDataBrokerSvc::start
StatusCode start() override
Definition: HiveDataBroker.cpp:98
HiveDataBrokerSvc::algorithmsRequiredFor
std::vector< Gaudi::Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const override
Definition: HiveDataBroker.cpp:270
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:90
std::vector::erase
T erase(T... args)
Algorithm.h
SmartIF< IAlgorithm >
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
std::map< DataObjID, HiveDataBrokerSvc::AlgEntry * >
MsgStream
Definition: MsgStream.h:34
IAlgManager::createAlgorithm
virtual StatusCode createAlgorithm(std::string algtype, std::string algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name.
Gaudi::Utils::TypeNameString::type
const std::string & type() const
Definition: TypeNameString.h:48
DataObjID
Definition: DataObjID.h:47
HiveDataBrokerSvc::mapProducers
std::map< DataObjID, AlgEntry * > mapProducers(std::vector< AlgEntry > &algorithms) const
Definition: HiveDataBroker.cpp:193
HistoDumpEx.v
v
Definition: HistoDumpEx.py:27
std::ostringstream
STL class.
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
gaudirun.type
type
Definition: gaudirun.py:162
std::endl
T endl(T... args)
Service::stop
StatusCode stop() override
Definition: Service.cpp:181
HiveDataBrokerSvc
Definition: HiveDataBroker.h:17
std::vector::begin
T begin(T... args)
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
std::vector::insert
T insert(T... args)
std::unique
T unique(T... args)
DataObjID::key
const std::string & key() const
only return the last part of the key
Definition: DataObjID.h:60
std::ostringstream::str
T str(T... args)
std::map::end
T end(T... args)
IOTest.end
end
Definition: IOTest.py:123
HiveDataBrokerSvc::AlgEntry
Definition: HiveDataBroker.h:37
HiveDataBrokerSvc::AlgEntry::alg
Gaudi::Algorithm * alg
Definition: HiveDataBroker.h:39
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::prev
T prev(T... args)
HiveDataBrokerSvc::finalize
StatusCode finalize() override
Definition: HiveDataBroker.cpp:145
HiveDataBroker.h
ProduceConsume.key
key
Definition: ProduceConsume.py:81
IOTest.appMgr
appMgr
Definition: IOTest.py:103
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
GaudiUtils::details::ostream_joiner
Stream & ostream_joiner(Stream &os, Iterator first, Iterator last, Separator sep, OutputElement output=OutputElement{})
Definition: SerializeSTL.h:73
std::next
T next(T... args)