The Gaudi Framework  master (9e5914fb)
Loading...
Searching...
No Matches
HiveDataBroker.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2026 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
11#include "GraphDumper.h"
12
13#include <Gaudi/Algorithm.h>
17#include <GaudiKernel/Service.h>
18#include <GaudiKernel/System.h>
19#include <algorithm>
20#include <boost/lexical_cast.hpp>
21#include <boost/tokenizer.hpp>
22#include <iomanip>
23#include <ranges>
24#include <stdexcept>
25
26namespace {
27 struct AlgEntry {
28 size_t index;
29 SmartIF<IAlgorithm> ialg;
30 Gaudi::Algorithm* alg;
31 std::set<AlgEntry const*> dependsOn;
32 std::vector<DataObjID const*> inputs;
33 std::vector<DataObjID const*> outputs;
34
35 friend bool operator<( AlgEntry const& lhs, AlgEntry const& rhs ) { return lhs.index < rhs.index; }
36
37 friend bool operator==( AlgEntry const& lhs, AlgEntry const& rhs ) { return lhs.index == rhs.index; }
38
39 AlgEntry( size_t i, SmartIF<IAlgorithm>&& p )
40 : index{ i }, ialg{ std::move( p ) }, alg{ dynamic_cast<Gaudi::Algorithm*>( ialg.get() ) } {
41 if ( !alg ) throw std::runtime_error( "algorithm pointer == nullptr???" );
42 // gather _all_ the inputs and outputs in a well-defined, reproducible manner,
43 // removing any duplication (sometimes, extra{Out,In}putDeps entries will already appear in {Out,In}putDataObjs)
44 constexpr auto gather = []( auto& c, auto const& in1, auto const& in2 ) {
45 for ( const DataObjID& id : in1 ) c.push_back( &id );
46 for ( const DataObjID& id : in2 ) c.push_back( &id );
47 constexpr auto by_key = []( const DataObjID* id ) { return id->fullKey(); };
48 std::ranges::sort( c, std::less{}, by_key );
49 auto od = std::ranges::unique( c, std::equal_to{}, by_key );
50 c.erase( od.begin(), od.end() );
51 };
52 gather( outputs, alg->outputDataObjs(), alg->extraOutputDeps() );
53 gather( inputs, alg->inputDataObjs(), alg->extraInputDeps() );
54 }
55 };
56
57 template <std::ranges::range R>
58 requires std::common_reference_with<std::ranges::range_reference_t<R>, const AlgEntry&>
59 void dumpDataDepsGraphFile( std::string const& fname, R const& algorithms ) {
60 Gaudi::Hive::Graph g{ fname };
61
62 std::size_t algoIndex = 0ul;
63
64 // define algs and objects
65 std::set<std::size_t> definedObjects;
66
67 // Remember algorithm indices by their name
68 std::map<std::string, std::string> indexByName;
69
70 // loop over all algorithms to generate indexes
71 std::unordered_map<std::string, std::string> output2Idx;
72 for ( const AlgEntry& entry : algorithms ) {
73 std::string algIndex = "Alg_" + std::to_string( algoIndex );
74 indexByName[entry.alg->name()] = algIndex;
75 ++algoIndex;
76 }
77
78 // loop over all algorithms to create nodes and edges
79 for ( const AlgEntry& entry : algorithms ) {
80 // Create algorithm node
81 g.addNode( indexByName[entry.alg->name()], entry.alg->name() );
82
83 // inputs
84 for ( const auto* id : entry.inputs ) {
85 const auto [itr, inserted] = definedObjects.insert( id->hash() );
86 std::string objIndex = "obj_" + std::to_string( id->hash() );
87 if ( inserted ) g.addNode( objIndex, id->key() );
88
89 // the object is the source, the algorithm taking it as an input is the target
90 g.addEdge( objIndex, indexByName[entry.alg->name()] );
91 }
92
93 // outputs
94 for ( const auto* id : entry.outputs ) {
95 const auto [itr, inserted] = definedObjects.insert( id->hash() );
96 std::string objIndex = "obj_" + std::to_string( id->hash() );
97 if ( inserted ) g.addNode( objIndex, id->key() );
98
99 // the algorithm that generated the object is the source, the object is the target
100 g.addEdge( indexByName[entry.alg->name()], objIndex );
101 }
102 }
103 }
104
105 struct AlgorithmRepr {
106 const Gaudi::Algorithm& parent;
107
108 friend std::ostream& operator<<( std::ostream& s, const AlgorithmRepr& a ) {
109 std::string typ = System::typeinfoName( typeid( a.parent ) );
110 s << typ;
111 if ( a.parent.name() != typ ) s << "/" << a.parent.name();
112 return s;
113 }
114 };
115
116 // Used for making debugging dumps.
117 template <typename T>
118 std::vector<const T*> sorted_( const std::set<T*>& s ) {
119 std::vector<const T*> v{ s.begin(), s.end() };
120 std::sort( v.begin(), v.end(), []( const auto* lhs, const auto* rhs ) { return *lhs < *rhs; } );
121 return v;
122 }
123
124 SmartIF<IAlgorithm> createAlgorithm( IAlgManager& am, const std::string& type, const std::string& name ) {
125 // Maybe modify the AppMgr interface to return Algorithm* ??
126 IAlgorithm* tmp = nullptr;
127 StatusCode sc = am.createAlgorithm( type, name, tmp );
128 return sc.isSuccess() ? dynamic_cast<Gaudi::Algorithm*>( tmp ) : nullptr;
129 }
130} // namespace
131
132class HiveDataBrokerSvc final : public extends<Service, IDataBroker> {
133public:
134 using extends::extends;
135
136 std::vector<Gaudi::Algorithm*> algorithmsRequiredFor( const DataObjIDColl& requested,
137 const std::vector<std::string>& stoppers = {} ) const override;
138 std::vector<Gaudi::Algorithm*> algorithmsRequiredFor( const Gaudi::Utils::TypeNameString& alg,
139 const std::vector<std::string>& stoppers = {} ) const override;
140
141 StatusCode initialize() override;
142 StatusCode start() override;
143 StatusCode stop() override;
144 StatusCode finalize() override;
145
146private:
148 "Attribute any unmet input dependencies to this Algorithm" };
150 this, "DataProducers", {}, "List of algorithms to be used to resolve data dependencies" };
151
152 Gaudi::Property<std::string> m_dataDepsGraphFile{ this, "DataDepsGraphFile", "",
153 "Name of the output file (.dot, .md or .graphml extensions "
154 "allowed) containing the data dependency graph. "
155 "If empty, no graph is dumped" };
156
157 std::map<std::string, AlgEntry>
158 instantiateAndInitializeAlgorithms( const std::vector<std::string>& names ) const; // algorithms must be fully
159 // initialized first, as
160 // doing so may create
161 // additional data
162 // dependencies...
163
164 std::map<std::string, AlgEntry> m_algorithms;
165
166 std::map<DataObjID, AlgEntry const*> mapProducers( std::map<std::string, AlgEntry>& algorithms ) const;
167
168 std::map<DataObjID, AlgEntry const*> m_dependencies;
169
170 void visit( AlgEntry const& alg, std::vector<std::string> const& stoppers, std::vector<Gaudi::Algorithm*>& sorted,
171 std::vector<bool>& visited, std::vector<bool>& visiting ) const;
172};
173
175
177 return Service::initialize().andThen( [&] {
178 // populate m_algorithms
180
181 // warn about non-reentrant algorithms
182 std::ranges::for_each( m_algorithms | std::ranges::views::transform( []( const auto& entry ) -> decltype( auto ) {
183 return entry.second.alg;
184 } ) |
185 std::ranges::views::filter( []( const auto* alg ) { return alg->cardinality() > 0; } ),
186 [&]( const Gaudi::Algorithm* alg ) {
187 this->warning() << "non-reentrant algorithm: " << AlgorithmRepr{ *alg } << endmsg;
188 } );
189 //== Print the list of the created algorithms
190 if ( msgLevel( MSG::DEBUG ) ) {
191 debug() << "Available DataProducers:\n";
192 std::ranges::for_each( m_algorithms | std::ranges::views::transform( []( const auto& entry ) -> decltype( auto ) {
193 return entry.second.alg;
194 } ),
195 [&]( const Gaudi::Algorithm* alg ) {
196 this->debug() << " " << AlgorithmRepr{ *alg } << " " << alg->outputDataObjs() << " "
197 << alg->extraOutputDeps() << endmsg;
198 } );
199 }
200
201 // populate m_dependencies and set AlgEntry::dependsOn
203 } );
204}
205
207
209 if ( !ss.isSuccess() ) return ss;
210
211 // sysStart for m_algorithms
212 for ( auto& [name, algEntry] : m_algorithms ) {
213 ss = algEntry.alg->sysStart();
214 if ( ss.isFailure() ) {
215 error() << "Unable to start Algorithm: " << name << endmsg;
216 return ss;
217 }
218 }
219 return ss;
220}
221
224 if ( !ss.isSuccess() ) return ss;
225
226 // sysStart for m_algorithms
227 for ( auto& [name, algEntry] : m_algorithms ) {
228 ss = algEntry.alg->sysStop();
229 if ( ss.isFailure() ) {
230 error() << "Unable to stop Algorithm: " << name << endmsg;
231 return ss;
232 }
233 }
234 return ss;
235}
236
238 for ( auto& [name, algEntry] : m_algorithms ) {
239 algEntry.alg->sysFinalize().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
240 }
241 m_algorithms.clear();
242 return Service::finalize();
243}
244
245std::map<std::string, AlgEntry>
246HiveDataBrokerSvc::instantiateAndInitializeAlgorithms( const std::vector<std::string>& names ) const {
247 std::map<std::string, AlgEntry> algorithms;
248
249 //= Get the Application manager, to see if algorithm exist
250 auto appMgr = service<IAlgManager>( "ApplicationMgr" );
251 size_t index = 0;
252 for ( const std::string& item : names ) {
253 const Gaudi::Utils::TypeNameString tn( item );
254
255 //== Check wether the specified algorithm already exists. If not, create it
256 SmartIF<IAlgorithm> myIAlg = appMgr->algorithm( item, false ); // do not create it now
257 if ( !myIAlg ) {
258 myIAlg = createAlgorithm( *appMgr, tn.type(), tn.name() );
259 } else {
260 // when the algorithm is not created, the ref count is short by one, so we
261 // have to fix it.
262 myIAlg->addRef();
263 }
264
265 if ( !myIAlg ) {
266 throw GaudiException{ "Failed to create " + boost::lexical_cast<std::string>( item ), __func__,
268 }
269
270 // propagate the sub-algorithm into own state.
271 StatusCode sc = myIAlg->sysInitialize();
272 if ( sc.isFailure() ) {
273 throw GaudiException{ "Failed to initialize " + boost::lexical_cast<std::string>( item ), __func__,
275 }
276
277 algorithms.emplace( tn.name(), AlgEntry{ index++, std::move( myIAlg ) } );
278 }
279
280 return algorithms;
281}
282
283std::map<DataObjID, AlgEntry const*>
284HiveDataBrokerSvc::mapProducers( std::map<std::string, AlgEntry>& algorithms ) const {
285 if ( msgLevel( MSG::DEBUG ) ) {
286 debug() << "Data Dependencies for Algorithms:";
287 for ( const auto& [name, entry] : m_algorithms ) {
288 debug() << "\n " << name << " :";
289 for ( const auto* id : entry.inputs ) { debug() << "\n o INPUT " << id->key(); }
290 for ( const auto* id : entry.outputs ) { debug() << "\n o OUTPUT " << id->key(); }
291 }
292 debug() << endmsg;
293 }
294
295 // If requested, dump a graph of the data dependencies in a .dot, .md or .graphml file
296 if ( !m_dataDepsGraphFile.empty() ) {
297 info() << "Dumping data dependencies graph to file: " << m_dataDepsGraphFile.value() << endmsg;
298 dumpDataDepsGraphFile( m_dataDepsGraphFile, m_algorithms | std::views::values );
299 }
300
301 // figure out all outputs
302 std::map<DataObjID, const AlgEntry*> producers;
303 for ( auto& [name, alg] : algorithms ) {
304 for ( auto id : alg.outputs ) {
305 auto r = producers.emplace( *id, &alg );
306 if ( !r.second ) {
307 throw GaudiException( "multiple algorithms declare " + id->key() + " as output (" + name + " and " +
308 producers[*id]->alg->name() + " at least). This is not allowed",
309 __func__, StatusCode::FAILURE );
310 }
311 }
312 }
313
314 // resolve dependencies
315 for ( auto& [name, algEntry] : algorithms ) {
316 for ( const DataObjID* idp : algEntry.inputs ) {
317 DataObjID id = *idp;
318 auto iproducer = producers.find( id );
319 if ( iproducer != producers.end() ) {
320 algEntry.dependsOn.insert( iproducer->second );
321 } else {
322 std::ostringstream error_message;
323 error_message << "\nUnknown requested input by " << AlgorithmRepr{ *( algEntry.alg ) } << " : "
324 << std::quoted( id.key(), '\'' ) << ".\n";
325 error_message << "You can set the OutputLevel of HiveDataBrokerSvc to DEBUG to get a list of inputs and "
326 "outputs of every registered algorithm.\n";
327 throw GaudiException( error_message.str(), __func__, StatusCode::FAILURE );
328 // TODO: assign to dataloader!
329 // algEntry.dependsOn.insert(dataloader.alg);
330 // dataloader.data.emplace( id ); // TODO: we may ask to much of the
331 // dataloader this way...
332 }
333 }
334 }
335 return producers;
336}
337
339void HiveDataBrokerSvc::visit( AlgEntry const& alg, std::vector<std::string> const& stoppers,
340 std::vector<Gaudi::Algorithm*>& sorted, std::vector<bool>& visited,
341 std::vector<bool>& visiting ) const {
342 assert( visited.size() == m_algorithms.size() );
343 assert( visiting.size() == m_algorithms.size() );
344 if ( visited[alg.index] ) { return; }
345 if ( visiting[alg.index] ) { throw GaudiException( "Cycle detected ", __func__, StatusCode::FAILURE ); }
346
347 if ( std::none_of( std::begin( stoppers ), std::end( stoppers ),
348 [alg]( auto& stopper ) { return alg.alg->name() == stopper; } ) ) {
349 visiting[alg.index] = true;
350 for ( auto* dep : sorted_( alg.dependsOn ) ) { visit( *dep, stoppers, sorted, visited, visiting ); }
351 visiting[alg.index] = false;
352 }
353
354 visited[alg.index] = true;
355 sorted.push_back( alg.alg );
356}
357
358std::vector<Gaudi::Algorithm*>
360 const std::vector<std::string>& stoppers ) const {
361 std::vector<Gaudi::Algorithm*> result;
362
363 std::vector<const AlgEntry*> deps;
364 deps.reserve( requested.size() );
365
366 // start with seeding from the initial request
367 for ( const auto& id : requested ) {
368 auto i = m_dependencies.find( id );
369 if ( i == m_dependencies.end() )
370 throw GaudiException( "unknown requested input: " + id.key(), __func__, StatusCode::FAILURE );
371 deps.push_back( i->second );
372 }
373 // producers may be responsible for multiple requested DataObjID -- make sure they are only mentioned once
374 std::sort( deps.begin(), deps.end(), []( auto const* lhs, auto const* rhs ) { return *lhs < *rhs; } );
375 deps.erase( std::unique( deps.begin(), deps.end(), []( auto const& lhs, auto const& rhs ) { return *lhs == *rhs; } ),
376 deps.end() );
377
378 std::vector<bool> visited( m_algorithms.size() );
379 std::vector<bool> visiting( m_algorithms.size() );
380 for ( auto* alg : deps ) { visit( *alg, stoppers, result, visited, visiting ); }
381 return result;
382}
383
384std::vector<Gaudi::Algorithm*>
386 const std::vector<std::string>& stoppers ) const {
387 std::vector<Gaudi::Algorithm*> result;
388
389 auto it = m_algorithms.find( requested.name() );
390 if ( it == end( m_algorithms ) ) {
391 throw GaudiException{ "No algorithm with name " + requested.name() + " in DataProducers. Type is " +
392 ( requested.haveType() ? requested.type() : "not specified" ),
393 __func__, StatusCode::FAILURE };
394 }
395 auto const& alg = it->second;
396 if ( requested.haveType() && alg.alg->type() != requested.type() ) {
397 error() << "requested " << requested << " but have matching name with different type: " << alg.alg->type()
398 << endmsg;
399 }
400 assert( alg.alg != nullptr );
401
402 std::vector<bool> visited( m_algorithms.size() );
403 std::vector<bool> visiting( m_algorithms.size() );
404 visit( alg, stoppers, result, visited, visiting );
405
406 if ( msgLevel( MSG::DEBUG ) ) {
407 debug() << std::endl << "requested " << requested << " returning " << std::endl << " ";
409 debug(), result, ",\n ",
410 []( auto& os, const Gaudi::Algorithm* a ) -> decltype( auto ) { return os << AlgorithmRepr{ *a }; } );
411 debug() << std::endl << endmsg;
412 }
413 return result;
414}
std::ostream & operator<<(std::ostream &s, AlgsExecutionStates::State x)
Streaming of State values.
bool operator==(const GaudiUtils::Allocator< T1 > &, const GaudiUtils::Allocator< T2 > &)
Definition Allocator.h:224
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
Definition DataObjID.h:121
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define DECLARE_COMPONENT(type)
bool operator<(backwards_compatibility_hack_time_timespan, backwards_compatibility_hack_time_timespan)
Definition Time.h:518
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
Base class from which all concrete algorithm classes should be derived.
Definition Algorithm.h:87
const std::string & name() const override
The identifying name of the algorithm object.
utilities to dump graphs in different formats
Definition GraphDumper.h:30
Implementation of property with value of concrete type.
Definition PropertyFwd.h:27
Helper class to parse a string of format "type/name".
const std::string & type() const
const std::string & name() const
Define general base for Gaudi exception.
StatusCode initialize() override
std::map< std::string, AlgEntry > instantiateAndInitializeAlgorithms(const std::vector< std::string > &names) const
Gaudi::Property< std::vector< std::string > > m_producers
std::map< std::string, AlgEntry > m_algorithms
StatusCode stop() override
Gaudi::Property< std::string > m_dataDepsGraphFile
StatusCode finalize() override
Gaudi::Property< std::string > m_dataLoader
std::map< DataObjID, AlgEntry const * > mapProducers(std::map< std::string, AlgEntry > &algorithms) const
void visit(AlgEntry const &alg, std::vector< std::string > const &stoppers, std::vector< Gaudi::Algorithm * > &sorted, std::vector< bool > &visited, std::vector< bool > &visiting) const
Implements DFS topological sorting.
std::vector< Gaudi::Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const override
StatusCode start() override
std::map< DataObjID, AlgEntry const * > m_dependencies
The IAlgManager is the interface implemented by the Algorithm Factory in the Application Manager to s...
Definition IAlgManager.h:34
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition IAlgorithm.h:36
StatusCode finalize() override
Definition Service.cpp:223
const std::string & name() const override
Retrieve name of the service.
Definition Service.cpp:333
StatusCode stop() override
Definition Service.cpp:181
SmartIF< IFace > service(const std::string &name, bool createIf=true) const
Definition Service.h:79
StatusCode start() override
Definition Service.cpp:187
StatusCode initialize() override
Definition Service.cpp:118
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
bool isFailure() const
Definition StatusCode.h:129
StatusCode andThen(F &&f, ARGS &&... args) const
Chain code blocks making the execution conditional a success result.
Definition StatusCode.h:163
bool isSuccess() const
Definition StatusCode.h:314
constexpr static const auto FAILURE
Definition StatusCode.h:100
Base class used to extend a class implementing other interfaces.
Definition extends.h:19
get
decorate the vector of properties
Definition decorators.py:94
Stream & ostream_joiner(Stream &os, Iterator first, Iterator last, Separator sep, OutputElement output=OutputElement{})
@ DEBUG
Definition IMessageSvc.h:22
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
Definition System.cpp:260
dict g
Definition gaudirun.py:582