The Gaudi Framework  v39r0 (5b8b5eda)
AsynchronousAlgorithm.h
Go to the documentation of this file.
1 
2 /***********************************************************************************\
3 * (c) Copyright 2023-2024 CERN for the benefit of the LHCb and ATLAS collaborations *
4 * *
5 * This software is distributed under the terms of the Apache version 2 licence, *
6 * copied verbatim in the file "LICENSE". *
7 * *
8 * In applying this licence, CERN does not waive the privileges and immunities *
9 * granted to it by virtue of its status as an Intergovernmental Organization *
10 * or submit itself to any jurisdiction. *
11 \***********************************************************************************/
12 #pragma once
13 // ============================================================================
14 // Include files
15 // ============================================================================
16 // Gaudi
17 #include <Gaudi/Algorithm.h>
19 // Gaudi CUDA
20 #ifdef GAUDI_USE_CUDA
21 # include <Gaudi/CUDAAsynchronousAlgHelper.cuh>
22 #endif
23 // Others
24 #include <atomic>
25 #include <boost/fiber/all.hpp>
26 #include <boost/unordered/unordered_flat_set.hpp>
27 #include <chrono>
28 #include <fmt/format.h>
29 
30 namespace Gaudi {
41 #define ACCALG_AWAIT( stmt ) \
45  stmt; \
46  if ( restoreAfterSuspend().isFailure() ) return StatusCode::FAILURE;
47 
49  protected:
51  boost::fibers::fiber_specific_ptr<std::size_t> s_currentSlot{};
52 
54  virtual StatusCode restoreAfterSuspend() const {
55  if ( !whiteboard()->selectStore( *s_currentSlot ).isSuccess() ) {
56  msg() << MSG::ERROR << "Resetting slot from fiber_specific_ptr failed" << endmsg;
57  return StatusCode::FAILURE;
58  }
59  return StatusCode::SUCCESS;
60  }
61 
62  public:
64  setAsynchronous( true );
65  msg() << MSG::DEBUG << "Starting sysInitialize for AsynchronousAlgorithm" << endmsg;
67  }
68 
69  StatusCode sysExecute( const EventContext& ctx ) override {
70  msg() << MSG::DEBUG << "Starting sysExecute for AsynchronousAlgorithm on slot " << ctx.slot()
71  << "with s_currentSlot = " << fmt::to_string( fmt::ptr( s_currentSlot.get() ) ) << endmsg;
72  if ( s_currentSlot.get() == nullptr ) {
73  s_currentSlot.reset( new std::size_t( ctx.slot() ) );
74  } else if ( *s_currentSlot != ctx.slot() ) {
75  error() << "Current slot is " << ctx.slot() << " but s_currentSlot exists and is " << *s_currentSlot << endmsg;
76  return StatusCode::FAILURE;
77  }
79  }
80 
82  StatusCode yield() const {
83  boost::this_fiber::yield();
84  return restoreAfterSuspend();
85  }
86 
88  template <typename Clock, typename Duration>
90  boost::this_fiber::sleep_until( sleep_time );
91  return restoreAfterSuspend();
92  }
93 
95  template <typename Rep, typename Period>
97  boost::this_fiber::sleep_for( dur );
98  return restoreAfterSuspend();
99  }
100 
101 #ifdef GAUDI_USE_CUDA
102  StatusCode cuda_stream_await( cudaStream_t cudaStream ) const {
104  CUDA_CHECK( Gaudi::CUDA::cuda_stream_await( cudaStream ) );
105  return restoreAfterSuspend();
106  }
107 
109  void print_cuda_error( std::string msg_ ) const {
110  msg() << MSG::ERROR << msg_ << endmsg;
111  throw GaudiException( msg_, "CUDA_EXCEPTION", StatusCode::FAILURE );
112  }
113 #endif
114  };
115 
116 #ifdef GAUDI_USE_CUDA
117  namespace CUDA {
118  using namespace std::chrono_literals;
119 
120  class CUDAStream {
121  private:
122  cudaStream_t stream;
123  const Gaudi::AsynchronousAlgorithm* parent;
124  int nth_stream = 0;
125  boost::unordered_flat_set<void*> allocations{};
126 
127  public:
128  CUDAStream( const Gaudi::AsynchronousAlgorithm* parent, std::string file = __FILE__, int line = __LINE__ );
129 
130  operator cudaStream_t() { return stream; }
131 
132  template <typename T>
133  T* malloc( std::size_t len ) {
134  void* devPtr = nullptr;
135  cudaError_t err = cudaSuccess;
136  if constexpr ( !std::is_same_v<T, void> ) { len *= sizeof( T ); }
137  const auto starttime = std::chrono::steady_clock::now();
138  do {
139  err = cudaMallocAsync( &devPtr, len, stream );
140  if ( err == cudaErrorMemoryAllocation ) {
141  StatusCode sc = parent->sleep_for( 10ms );
142  if ( sc.isFailure() ) { parent->print_cuda_error( "Yield error" ); }
143  }
144  } while ( err == cudaErrorMemoryAllocation );
145  const double waittime =
146  std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::steady_clock::now() - starttime )
147  .count() /
148  1e6;
149  if ( waittime >= 0.01 ) {
150  fmt::print( "Waited {} to allocate {} of GPU memory\n", SI( waittime, "s" ), SI( len, "B" ) );
151  }
152  allocations.insert( devPtr );
153  return static_cast<T*>( devPtr );
154  }
155 
156  template <typename T>
157  void free( T* d_ptr ) {
158  auto iter = allocations.find( d_ptr );
159  if ( iter == allocations.end() ) {
160  parent->print_cuda_error( "Called stream.free on an allocation not created by this stream" );
161  }
162  cudaFreeAsync( static_cast<void*>( d_ptr ), stream );
163  allocations.erase( iter );
164  }
165 
166  ~CUDAStream();
167  };
168  } // namespace CUDA
169 #endif
170 } // namespace Gaudi
MSG::DEBUG
@ DEBUG
Definition: IMessageSvc.h:25
Gaudi::Algorithm::sysExecute
StatusCode sysExecute(const EventContext &ctx) override
The actions to be performed by the algorithm on an event.
Definition: Algorithm.cpp:327
AtlasMCRecoFullPrecedenceDump.whiteboard
whiteboard
Definition: AtlasMCRecoFullPrecedenceDump.py:41
Write.stream
stream
Definition: Write.py:32
std::string
STL class.
GaudiException
Definition: GaudiException.h:31
std::chrono::duration
e6
HepRndm::Engine< HepJamesRandom > e6
Definition: HepRndmEngines.cpp:211
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
Gaudi::AsynchronousAlgorithm::sleep_until
StatusCode sleep_until(std::chrono::time_point< Clock, Duration > const &sleep_time) const
Forwards to boost::this_fiber::sleep_until.
Definition: AsynchronousAlgorithm.h:89
Gaudi::AsynchronousAlgorithm::restoreAfterSuspend
virtual StatusCode restoreAfterSuspend() const
Restore after suspend.
Definition: AsynchronousAlgorithm.h:54
Gaudi::Units::ms
constexpr double ms
Definition: SystemOfUnits.h:154
GaudiPython.Pythonizations.ctx
ctx
Definition: Pythonizations.py:578
StatusCode
Definition: StatusCode.h:65
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:90
Gaudi::Algorithm::sysInitialize
StatusCode sysInitialize() override
Initialization method invoked by the framework.
Definition: Algorithm.cpp:58
std::chrono::time_point
Gaudi::AsynchronousAlgorithm
Definition: AsynchronousAlgorithm.h:48
Algorithm.h
IHiveWhiteBoard.h
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
Gaudi
This file provides a Grammar for the type Gaudi::Accumulators::Axis It allows to use that type from p...
Definition: __init__.py:1
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:25
Gaudi::AsynchronousAlgorithm::sysExecute
StatusCode sysExecute(const EventContext &ctx) override
Definition: AsynchronousAlgorithm.h:69
EventContext
Definition: EventContext.h:34
std::free
T free(T... args)
plotSpeedupsPyRoot.line
line
Definition: plotSpeedupsPyRoot.py:198
Gaudi::AsynchronousAlgorithm::sysInitialize
StatusCode sysInitialize() override
Definition: AsynchronousAlgorithm.h:63
std::size_t
std::malloc
T malloc(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
Gaudi::AsynchronousAlgorithm::yield
StatusCode yield() const
Forwards to boost::this_fiber::yield.
Definition: AsynchronousAlgorithm.h:82
Gaudi::AsynchronousAlgorithm::sleep_for
StatusCode sleep_for(std::chrono::duration< Rep, Period > const &dur) const
Forwards to boost::this_fiber::sleep_for.
Definition: AsynchronousAlgorithm.h:96
GAUDI_API
#define GAUDI_API
Definition: Kernel.h:81
std::chrono::steady_clock::now
T now(T... args)