The Gaudi Framework  master (82fdf313)
Loading...
Searching...
No Matches
OutputStream.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
11// Framework include files
22
28
29#include "OutputStream.h"
30
31#include <set>
32#include <strings.h>
33
34// Define the algorithm factory for the standard output data writer
36
37#define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
38
39namespace {
40 bool passed( const Gaudi::Algorithm* alg ) {
41 const auto& algState = alg->execState( Gaudi::Hive::currentContext() );
42 return algState.state() == AlgExecState::Done && algState.filterPassed();
43 }
44} // namespace
45
47 return Algorithm::start().andThen( [this]() {
48 // Open the output file now to ensure it is always written even if empty
49 return m_pConversionSvc->connectOutput( m_outputName, m_outputType );
50 } );
51}
52
53// initialize data writer
55 auto status = Algorithm::initialize();
56 if ( status.isFailure() ) return status;
57
58 if ( m_output.empty() ) {
59 fatal() << "property Output not set. OutputStream instances require an output" << endmsg;
61 }
62
63 // Reset the number of events written
64 m_events = 0;
65 // Get access to the DataManagerSvc
67 if ( !m_pDataManager ) {
68 fatal() << "Unable to locate IDataManagerSvc interface" << endmsg;
70 }
71 // Get access to the IncidentService
72 m_incidentSvc = serviceLocator()->service( "IncidentSvc" );
73 if ( !m_incidentSvc ) {
74 warning() << "Error retrieving IncidentSvc." << endmsg;
76 }
77 // Get access to the assigned data service
79 if ( !m_pDataProvider ) {
80 fatal() << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
82 }
83 if ( hasInput() ) {
85 if ( !status.isSuccess() ) {
86 fatal() << "Unable to connect to conversion service." << endmsg;
87 if ( m_outputName != "" && m_fireIncidents )
88 m_incidentSvc->fireIncident( Incident( m_outputName, IncidentType::FailOutputFile ) );
89 return status;
90 }
91 }
92
93 // Clear the list with optional items
95 // Clear the item list
97
98 // Take the new item list from the properties.
99 ON_DEBUG debug() << "ItemList : " << m_itemNames.value() << endmsg;
100 for ( const auto& i : m_itemNames ) addItem( m_itemList, i );
101
102 // Take the new item list from the properties.
103 ON_DEBUG debug() << "OptItemList : " << m_optItemNames.value() << endmsg;
104 for ( const auto& i : m_optItemNames ) addItem( m_optItemList, i );
105
106 // prepare the algorithm selected dependent locations
107 ON_DEBUG debug() << "AlgDependentItemList : " << m_algDependentItemList.value() << endmsg;
108 for ( const auto& a : m_algDependentItemList ) {
109 // Get the algorithm pointer
110 auto theAlgorithm = decodeAlgorithm( a.first );
111 if ( theAlgorithm ) {
112 // Get the item list for this alg
113 auto& items = m_algDependentItems[theAlgorithm];
114 // Clear the list for this alg
115 clearItems( items );
116 // fill the list again
117 for ( const auto& i : a.second ) addItem( items, i );
118 }
119 }
120
121 // Take the item list to the data service preload list.
122 if ( m_doPreLoad ) {
123 for ( auto& j : m_itemList ) m_pDataProvider->addPreLoadItem( *j ).ignore();
124 // Not working: bad reference counting! pdataSvc->release();
125 }
126
127 if ( m_doPreLoadOpt ) {
128 for ( auto& j : m_optItemList ) m_pDataProvider->addPreLoadItem( *j ).ignore();
129 }
130 info() << "Data source: " << m_storeName.value() << " output: " << m_output.value() << endmsg;
131
132 // Decode the accept, required and veto Algorithms. The logic is the following:
133 // a. The event is accepted if all lists are empty.
134 // b. The event is provisionally accepted if any Algorithm in the accept list
135 // has been executed and has indicated that its filter is passed. This
136 // provisional acceptance can be overridden by the other lists.
137 // c. The event is rejected unless all Algorithms in the required list have
138 // been executed and have indicated that their filter passed.
139 // d. The event is rejected if any Algorithm in the veto list has been
140 // executed and has indicated that its filter has passed.
141 m_acceptNames.useUpdateHandler();
142 m_requireNames.useUpdateHandler();
143 m_vetoNames.useUpdateHandler();
144
145 return StatusCode::SUCCESS;
146}
147
148// terminate data writer
150 info() << "Events output: " << m_events << endmsg;
151 if ( m_fireIncidents ) m_incidentSvc->fireIncident( Incident( m_outputName, IncidentType::EndOutputFile ) );
152 m_incidentSvc.reset();
153 m_pDataProvider.reset();
154 m_pDataManager.reset();
155 m_pConversionSvc.reset();
158 return StatusCode::SUCCESS;
159}
160
161// Work entry point
163 // Clear any previously existing item list
165 // Test whether this event should be output
166 if ( isEventAccepted() ) {
167 const StatusCode sc = writeObjects();
169 ++m_events;
170 if ( sc.isSuccess() && m_fireIncidents ) {
171 m_incidentSvc->fireIncident( Incident( m_outputName, IncidentType::WroteToOutputFile ) );
172 } else if ( m_fireIncidents ) {
173 m_incidentSvc->fireIncident( Incident( m_outputName, IncidentType::FailOutputFile ) );
174 }
175 return sc;
176 }
177 return StatusCode::SUCCESS;
178}
179
180// Select the different objects and write them to file
182 // Connect the output file to the service
183 StatusCode status = collectObjects();
184 if ( status.isSuccess() ) {
186 if ( sel->begin() != sel->end() ) {
187 status = m_pConversionSvc->connectOutput( m_outputName, m_outputType );
188 if ( status.isSuccess() ) {
189 // Now pass the collection to the persistency service
190 IOpaqueAddress* pAddress = nullptr;
191 for ( auto& j : *sel ) {
192 try {
193 const StatusCode iret = m_pConversionSvc->createRep( j, pAddress );
194 if ( !iret.isSuccess() ) {
195 status = iret;
196 continue;
197 }
198 IRegistry* pReg = j->registry();
199 pReg->setAddress( pAddress );
200 } catch ( const std::exception& excpt ) {
201 const std::string loc = ( j->registry() ? j->registry()->identifier() : "UnRegistered" );
202 fatal() << "std::exception during createRep for '" << loc << "' " << System::typeinfoName( typeid( *j ) )
203 << endmsg;
204 fatal() << excpt.what() << endmsg;
205 throw;
206 }
207 }
208 for ( auto& j : *sel ) {
209 try {
210 IRegistry* pReg = j->registry();
211 const StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), j );
212 if ( !iret.isSuccess() ) status = iret;
213 } catch ( const std::exception& excpt ) {
214 const std::string loc = ( j->registry() ? j->registry()->identifier() : "UnRegistered" );
215 fatal() << "std::exception during fillRepRefs for '" << loc << "'" << System::typeinfoName( typeid( *j ) )
216 << endmsg;
217 fatal() << excpt.what() << endmsg;
218 throw;
219 }
220 }
221 // Commit the data if there was no error; otherwise possibly discard
222 if ( status.isSuccess() ) {
223 status = m_pConversionSvc->commitOutput( m_outputName, true );
224 } else {
225 m_pConversionSvc->commitOutput( m_outputName, false ).ignore();
226 }
227 }
228 }
229 }
230 return status;
231}
232
233// Place holder to create configurable data store agent
234bool OutputStream::collect( IRegistry* dir, int level ) {
235 if ( level < m_currentItem->depth() ) {
236 if ( dir->object() ) {
237 /*
238 std::cout << "Analysing ("
239 << dir->name()
240 << ") Object:"
241 << ((dir->object()==0) ? "UNLOADED" : "LOADED")
242 << std::endl;
243 */
244 m_objects.push_back( dir->object() );
245 return true;
246 }
247 }
248 return false;
249}
250
254
255 // Traverse the tree and collect the requested objects
256 for ( auto& i : m_itemList ) {
257 DataObject* obj = nullptr;
258 m_currentItem = i;
259 StatusCode iret = m_pDataProvider->retrieveObject( m_currentItem->path(), obj );
260 if ( iret.isSuccess() ) {
261 iret = collectFromSubTree( obj );
262 if ( !iret.isSuccess() ) status = iret;
263 } else {
264 error() << "Cannot write mandatory object(s) (Not found) " << m_currentItem->path() << endmsg;
265 status = iret;
266 }
267 }
268
269 // Traverse the tree and collect the requested objects (tolerate missing items here)
270 for ( auto& i : m_optItemList ) {
271 DataObject* obj = nullptr;
272 m_currentItem = i;
273 StatusCode iret = m_pDataProvider->retrieveObject( m_currentItem->path(), obj );
274 if ( iret.isSuccess() ) iret = collectFromSubTree( obj );
275 if ( !iret.isSuccess() ) {
277 debug() << "Ignore request to write non-mandatory object(s) " << m_currentItem->path() << endmsg;
278 }
279 }
280
281 // Collect objects dependent on particular algorithms
282 for ( const auto& iAlgItems : m_algDependentItems ) {
283 auto alg = iAlgItems.first;
284 const Items& items = iAlgItems.second;
285 if ( passed( alg ) ) {
287 debug() << "Algorithm '" << alg->name() << "' fired. Adding " << items << endmsg;
288 for ( const auto& i : items ) {
289 DataObject* obj = nullptr;
290 m_currentItem = i;
291 StatusCode iret = m_pDataProvider->retrieveObject( m_currentItem->path(), obj );
292 if ( iret.isSuccess() ) {
293 iret = collectFromSubTree( obj );
294 if ( !iret.isSuccess() ) status = iret;
295 } else {
296 error() << "Cannot write mandatory (algorithm dependent) object(s) (Not found) " << m_currentItem->path()
297 << endmsg;
298 status = iret;
299 }
300 }
301 }
302 }
303
304 if ( status.isSuccess() ) {
305 // Remove duplicates from the list of objects, preserving the order in the list
306 std::set<DataObject*> unique;
307 std::vector<DataObject*> tmp; // temporary vector with the reduced list
308 tmp.reserve( m_objects.size() );
309 for ( auto& o : m_objects ) {
310 if ( !unique.count( o ) ) {
311 // if the pointer is not in the set, add it to both the set and the temporary vector
312 unique.insert( o );
313 tmp.push_back( o );
314 }
315 }
316 m_objects.swap( tmp ); // swap the data of the two vectors
317 }
318
319 return status;
320}
321
322// Clear collected object list
324
325// Remove all items from the output streamer list;
327 for ( auto& i : itms ) delete i;
328 itms.clear();
329}
330
331// Find single item identified by its path (exact match)
332DataStoreItem* OutputStream::findItem( const std::string& path ) {
333 auto matchPath = [&]( const DataStoreItem* i ) { return i->path() == path; };
334 auto i = std::find_if( m_itemList.begin(), m_itemList.end(), matchPath );
335 if ( i == m_itemList.end() ) {
336 i = std::find_if( m_optItemList.begin(), m_optItemList.end(), matchPath );
337 if ( i == m_optItemList.end() ) return nullptr;
338 }
339 return *i;
340}
341
342// Add item to output streamer list
343void OutputStream::addItem( Items& itms, const std::string& descriptor ) {
344 int level = 0;
345 auto sep = descriptor.rfind( "#" );
346 std::string obj_path = descriptor.substr( 0, sep );
347 if ( sep != std::string::npos ) {
348 std::string slevel = descriptor.substr( sep + 1 );
349 level = ( slevel == "*" ) ? 9999999 : std::stoi( slevel );
350 }
351 if ( m_verifyItems ) {
352 size_t idx = obj_path.find( "/", 1 );
353 while ( idx != std::string::npos ) {
354 std::string sub_item = obj_path.substr( 0, idx );
355 if ( !findItem( sub_item ) ) addItem( itms, sub_item + "#1" );
356 idx = obj_path.find( "/", idx + 1 );
357 }
358 }
359 itms.push_back( new DataStoreItem( obj_path, level ) );
360 const auto& item = itms.back();
362 debug() << "Adding OutputStream item " << item->path() << " with " << item->depth() << " level(s)." << endmsg;
363}
364
365// Connect to proper conversion service
368 // Get output file from input
369 std::string dbType, svc, shr;
370 for ( auto attrib : Gaudi::Utils::AttribStringParser( m_output ) ) {
371 const std::string& tag = attrib.tag;
372 const std::string& val = attrib.value;
373 switch ( ::toupper( tag[0] ) ) {
374 case 'D':
375 m_outputName = val;
376 break;
377 case 'T':
378 dbType = val;
379 break;
380 case 'S':
381 switch ( ::toupper( tag[1] ) ) {
382 case 'V':
383 svc = val;
384 break;
385 case 'H':
386 shr = "YES";
387 break;
388 }
389 break;
390 case 'O': // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>'
391 switch ( ::toupper( val[0] ) ) {
392 case 'R':
393 if ( ::strncasecmp( val.c_str(), "RECREATE", 3 ) == 0 )
394 m_outputType = "RECREATE";
395 else if ( ::strncasecmp( val.c_str(), "READ", 3 ) == 0 )
396 m_outputType = "READ";
397 break;
398 case 'C':
399 case 'N':
400 case 'W':
401 m_outputType = "NEW";
402 break;
403 case 'U':
404 m_outputType = "UPDATE";
405 break;
406 default:
407 m_outputType = "???";
408 break;
409 }
410 break;
411 default:
412 break;
413 }
414 }
415 if ( !shr.empty() ) m_outputType += "|SHARED";
416 // Get access to the default Persistency service
417 // The default service is the same for input as for output.
418 // If this is not desired, then a specialized OutputStream must overwrite
419 // this value.
420 if ( !dbType.empty() || !svc.empty() ) {
421 std::string typ = !dbType.empty() ? dbType : svc;
423 if ( !ipers ) {
424 fatal() << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
425 return StatusCode::FAILURE;
426 }
427 IConversionSvc* cnvSvc = nullptr;
428 status = ipers->getService( typ, cnvSvc );
429 if ( !status.isSuccess() ) {
430 fatal() << "Unable to locate IConversionSvc interface of database type " << typ << endmsg;
431 return status;
432 }
433 // Increase reference count and keep service.
434 m_pConversionSvc = cnvSvc;
435 } else {
436 fatal() << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
437 << "You either have to specify a technology name or a service name!" << endmsg
438 << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
439 return StatusCode::FAILURE;
440 }
441 return StatusCode::SUCCESS;
442}
443
444Gaudi::Algorithm* OutputStream::decodeAlgorithm( const std::string& theName ) {
445 Gaudi::Algorithm* theAlgorithm = nullptr;
446
447 auto theAlgMgr = serviceLocator()->as<IAlgManager>();
448 if ( theAlgMgr ) {
449 // Check whether the supplied name corresponds to an existing
450 // Algorithm object.
451 SmartIF<IAlgorithm>& theIAlg = theAlgMgr->algorithm( theName );
452 if ( theIAlg ) {
453 try {
454 theAlgorithm = dynamic_cast<Gaudi::Algorithm*>( theIAlg.get() );
455 } catch ( ... ) {
456 // do nothing
457 }
458 }
459 } else {
460 fatal() << "Can't locate ApplicationMgr!!!" << endmsg;
461 }
462
463 if ( !theAlgorithm ) { warning() << "Failed to decode Algorithm name " << theName << endmsg; }
464
465 return theAlgorithm;
466}
467
468void OutputStream::decodeAlgorithms( Gaudi::Property<std::vector<std::string>>& theNames,
469 std::vector<Gaudi::Algorithm*>& theAlgs ) {
470 // Reset the list of Algorithms
471 theAlgs.clear();
472
473 // Build the list of Algorithms from the names list
474 for ( const auto& it : theNames.value() ) {
475
476 auto theAlgorithm = decodeAlgorithm( it );
477 if ( theAlgorithm ) {
478 // Check that the specified algorithm doesn't already exist in the list
479 if ( std::find( std::begin( theAlgs ), std::end( theAlgs ), theAlgorithm ) == std::end( theAlgs ) ) {
480 theAlgorithm->addRef();
481 theAlgs.push_back( theAlgorithm );
482 }
483 } else {
484 info() << it << " doesn't exist - ignored" << endmsg;
485 }
486 }
487}
488
490 // Loop over all Algorithms in the accept list to see
491 // whether any have been executed and have their filter
492 // passed flag set. Any match causes the event to be
493 // provisionally accepted.
494 bool result = m_acceptAlgs.empty() || std::any_of( std::begin( m_acceptAlgs ), std::end( m_acceptAlgs ), passed );
495
496 // Loop over all Algorithms in the required list to see
497 // whether all have been executed and have their filter
498 // passed flag set. Any mismatch causes the event to be
499 // rejected.
500 if ( result && !m_requireAlgs.empty() ) {
501 result = std::all_of( std::begin( m_requireAlgs ), std::end( m_requireAlgs ), passed );
502 }
503
504 // Loop over all Algorithms in the veto list to see
505 // whether any have been executed and have their filter
506 // passed flag set. Any match causes the event to be
507 // rejected.
508 if ( result && !m_vetoAlgs.empty() ) {
509 result = std::none_of( std::begin( m_vetoAlgs ), std::end( m_vetoAlgs ), passed );
510 }
511 return result;
512}
513
515 return !( m_itemNames.empty() && m_optItemNames.empty() && m_algDependentItemList.empty() );
516}
517
519 return m_pDataManager->traverseSubTree( pObj,
520 [this]( IRegistry* r, int level ) { return this->collect( r, level ); } );
521}
void toupper(std::string &s)
std::vector< DataObject * > IDataSelector
This is only a placeholder to allow me compiling until the responsible guy does his work!
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define ON_DEBUG
#define DECLARE_COMPONENT(type)
Provide serialization function (output only) for some common STL classes (vectors,...
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
A DataObject is the base class of any identifiable object on any data store.
Definition DataObject.h:37
Description of the DataStoreItem class.
Base class from which all concrete algorithm classes should be derived.
Definition Algorithm.h:87
StatusCode initialize() override
the default (empty) implementation of IStateful::initialize() method
Definition Algorithm.h:175
SmartIF< ISvcLocator > & serviceLocator() const override
The standard service locator.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode start() override
the default (empty) implementation of IStateful::start() method
Definition Algorithm.h:177
Implementation of property with value of concrete type.
Definition PropertyFwd.h:27
Parse attribute strings allowing iteration over the various attributes.
The IAlgManager is the interface implemented by the Algorithm Factory in the Application Manager to s...
Definition IAlgManager.h:34
Opaque address interface definition.
Data persistency service interface.
The IRegistry represents the entry door to the environment any data object residing in a transient da...
Definition IRegistry.h:29
virtual DataObject * object() const =0
Retrieve object behind the link.
virtual IOpaqueAddress * address() const =0
Retrieve opaque storage address.
virtual void setAddress(IOpaqueAddress *pAddress)=0
Set/Update Opaque storage address.
virtual SmartIF< IService > & service(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
SmartIF< IFace > as()
Definition ISvcLocator.h:64
Base class for all Incidents (computing events).
Definition Incident.h:24
A small to stream Data I/O.
Gaudi::Property< std::vector< std::string > > m_requireNames
Items m_optItemList
Vector of optional items to be saved to this stream.
StatusCode execute() override
Working entry point.
Gaudi::Property< bool > m_doPreLoadOpt
Items m_itemList
Vector of items to be saved to this stream.
void clearSelection()
Clear list of selected objects.
DataStoreItem * findItem(const std::string &path)
Find single item identified by its path (exact match)
StatusCode collectFromSubTree(DataObject *)
SmartIF< IConversionSvc > m_pConversionSvc
Keep reference to the data conversion service.
std::vector< DataStoreItem * > Items
int m_events
Number of events written to this output stream.
std::vector< Gaudi::Algorithm * > m_vetoAlgs
Vector of Algorithms that this stream is vetoed by.
std::vector< Gaudi::Algorithm * > m_acceptAlgs
Vector of Algorithms that this stream accepts.
SmartIF< IDataProviderSvc > m_pDataProvider
Keep reference to the data provider service.
StatusCode finalize() override
Terminate OutputStream.
Gaudi::Property< ItemNames > m_optItemNames
Gaudi::Property< std::string > m_outputName
virtual StatusCode collectObjects()
Collect all objects to be written to the output stream.
std::vector< Gaudi::Algorithm * > m_requireAlgs
Vector of Algorithms that this stream requires.
virtual StatusCode connectConversionSvc()
AlgDependentItems m_algDependentItems
Items to be saved for specific algorithms.
DataStoreItem * m_currentItem
Keep track of the current item.
Gaudi::Property< ItemNames > m_itemNames
Gaudi::Property< std::vector< std::string > > m_acceptNames
IDataSelector m_objects
Collection of objects being selected.
SmartIF< IDataManagerSvc > m_pDataManager
Keep reference to the data manager service.
IDataSelector * selectedObjects()
Return the list of selected objects.
bool m_fireIncidents
should I fire incidents for writing opening/closing etc?
virtual StatusCode writeObjects()
Select the different objects and write them to file.
bool isEventAccepted() const
Test whether this event should be output.
StatusCode initialize() override
Initialize OutputStream.
void decodeAlgorithms(Gaudi::Property< std::vector< std::string > > &theNames, std::vector< Gaudi::Algorithm * > &theAlgs)
Decode specified list of Algorithms.
void clearItems(Items &itms)
Clear item list.
SmartIF< IIncidentSvc > m_incidentSvc
Reference to the incident service.
Gaudi::Property< bool > m_doPreLoad
Gaudi::Property< std::string > m_persName
Gaudi::Property< bool > m_verifyItems
virtual bool collect(IRegistry *dir, int level)
Store agent's classback.
Gaudi::Property< std::string > m_output
Gaudi::Algorithm * decodeAlgorithm(const std::string &theName)
Decode a single algorithm name.
Gaudi::Property< AlgDependentItemNames > m_algDependentItemList
virtual bool hasInput() const
Tell if the instance has been configured with input items or not.
void addItem(Items &itms, const std::string &descriptor)
Add item to output streamer list.
Gaudi::Property< std::string > m_storeName
Gaudi::Property< std::vector< std::string > > m_vetoNames
std::string m_outputType
Output type: NEW(NEW,CREATE,WRITE,RECREATE), UPDATE)
StatusCode start() override
Start OutputStream.
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
TYPE * get() const
Get interface pointer.
Definition SmartIF.h:82
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
StatusCode andThen(F &&f, ARGS &&... args) const
Chain code blocks making the execution conditional a success result.
Definition StatusCode.h:163
bool isSuccess() const
Definition StatusCode.h:314
constexpr static const auto SUCCESS
Definition StatusCode.h:99
constexpr static const auto FAILURE
Definition StatusCode.h:100
GAUDI_API const EventContext & currentContext()
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
Definition System.cpp:260