GaudiMP.GMPBase.Coord Class Reference
Inheritance diagram for GaudiMP.GMPBase.Coord:
Collaboration diagram for GaudiMP.GMPBase.Coord:

Public Member Functions

def __init__ (self, nWorkers, config, log)
 
def getSyncEvents (self, nodeID)
 
def getQueues (self, nodeID)
 
def Go (self)
 
def Terminate (self)
 
def Stop (self)
 
def SetupQueues (self)
 
def __init__ (self, nWorkers, config, log)
 
def getSyncEvents (self, nodeID)
 
def getQueues (self, nodeID)
 
def Go (self)
 
def Terminate (self)
 
def Stop (self)
 
def SetupQueues (self)
 

Public Attributes

 log
 
 config
 
 nWorkers
 
 qs
 
 hq
 
 fq
 
 sInit
 
 sRun
 
 sFin
 
 histSyncEvent
 
 subworkers
 
 reader
 
 workers
 
 writer
 
 system
 

Detailed Description

Definition at line 1285 of file GMPBase.py.

Constructor & Destructor Documentation

def GaudiMP.GMPBase.Coord.__init__ (   self,
  nWorkers,
  config,
  log 
)

Definition at line 1286 of file GMPBase.py.

1286  def __init__( self, nWorkers, config, log ) :
1287 
1288  self.log = log
1289  self.config = config
1290  # set up Logging
1291  self.log.name = 'GaudiPython-Parallel-Logger'
1292  self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
1293 
1294  if nWorkers == -1 :
1295  # The user has requested all available cpus in the machine
1296  self.nWorkers = cpu_count()
1297  else :
1298  self.nWorkers = nWorkers
1299 
1300 
1301  self.qs = self.SetupQueues( ) # a dictionary of queues (for Events)
1302  self.hq = JoinableQueue( ) # for Histogram data
1303  self.fq = JoinableQueue( ) # for FileRecords data
1304 
1305  # Make a Syncer for Initalise, Run, and Finalise
1306  self.sInit = Syncer( self.nWorkers, self.log,
1307  limit=WAIT_INITIALISE,
1308  step=STEP_INITIALISE )
1309  self.sRun = Syncer( self.nWorkers, self.log,
1310  manyEvents=True,
1311  limit=WAIT_SINGLE_EVENT,
1312  step=STEP_EVENT,
1313  firstEvent=WAIT_FIRST_EVENT )
1314  self.sFin = Syncer( self.nWorkers, self.log,
1315  limit=WAIT_FINALISE,
1316  step=STEP_FINALISE )
1317  # and one final one for Histogram Transfer
1318  self.histSyncEvent = Event()
1319 
1320  # params are common to al subprocesses
1321  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1322 
1323  self.subworkers = []
1324  # Declare SubProcesses!
1325  for i in range(1, self.nWorkers ) :
1326  sub = Subworker( i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers )
1327  self.subworkers.append( sub )
1328  self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers)
1329  self.workers = []
1330  wk = Worker( 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers )
1331  self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers)
1332 
1333  self.system = []
1334  self.system.append(self.writer)
1335  self.system.append(wk)
1336  self.system.append(self.reader)
1337 
def SetupQueues(self)
Definition: GMPBase.py:1401
NamedRange_< CONTAINER > range(const CONTAINER &cnt, std::string name)
simple function to create the named range form arbitrary container
Definition: NamedRange.h:130
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1286
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1338
def getQueues(self, nodeID)
Definition: GMPBase.py:1344
def GaudiMP.GMPBase.Coord.__init__ (   self,
  nWorkers,
  config,
  log 
)

Definition at line 1286 of file GMPBase.py.

1286  def __init__( self, nWorkers, config, log ) :
1287 
1288  self.log = log
1289  self.config = config
1290  # set up Logging
1291  self.log.name = 'GaudiPython-Parallel-Logger'
1292  self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
1293 
1294  if nWorkers == -1 :
1295  # The user has requested all available cpus in the machine
1296  self.nWorkers = cpu_count()
1297  else :
1298  self.nWorkers = nWorkers
1299 
1300 
1301  self.qs = self.SetupQueues( ) # a dictionary of queues (for Events)
1302  self.hq = JoinableQueue( ) # for Histogram data
1303  self.fq = JoinableQueue( ) # for FileRecords data
1304 
1305  # Make a Syncer for Initalise, Run, and Finalise
1306  self.sInit = Syncer( self.nWorkers, self.log,
1307  limit=WAIT_INITIALISE,
1308  step=STEP_INITIALISE )
1309  self.sRun = Syncer( self.nWorkers, self.log,
1310  manyEvents=True,
1311  limit=WAIT_SINGLE_EVENT,
1312  step=STEP_EVENT,
1313  firstEvent=WAIT_FIRST_EVENT )
1314  self.sFin = Syncer( self.nWorkers, self.log,
1315  limit=WAIT_FINALISE,
1316  step=STEP_FINALISE )
1317  # and one final one for Histogram Transfer
1318  self.histSyncEvent = Event()
1319 
1320  # params are common to al subprocesses
1321  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1322 
1323  self.subworkers = []
1324  # Declare SubProcesses!
1325  for i in range(1, self.nWorkers ) :
1326  sub = Subworker( i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers )
1327  self.subworkers.append( sub )
1328  self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers)
1329  self.workers = []
1330  wk = Worker( 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers )
1331  self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers)
1332 
1333  self.system = []
1334  self.system.append(self.writer)
1335  self.system.append(wk)
1336  self.system.append(self.reader)
1337 
def SetupQueues(self)
Definition: GMPBase.py:1401
NamedRange_< CONTAINER > range(const CONTAINER &cnt, std::string name)
simple function to create the named range form arbitrary container
Definition: NamedRange.h:130
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1286
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1338
def getQueues(self, nodeID)
Definition: GMPBase.py:1344

Member Function Documentation

def GaudiMP.GMPBase.Coord.getQueues (   self,
  nodeID 
)

Definition at line 1344 of file GMPBase.py.

1344  def getQueues( self, nodeID ) :
1345  eventQ = self.qs[ nodeID ]
1346  histQ = self.hq
1347  fsrQ = self.fq
1348  return ( eventQ, histQ, fsrQ )
1349 
def getQueues(self, nodeID)
Definition: GMPBase.py:1344
def GaudiMP.GMPBase.Coord.getQueues (   self,
  nodeID 
)

Definition at line 1344 of file GMPBase.py.

1344  def getQueues( self, nodeID ) :
1345  eventQ = self.qs[ nodeID ]
1346  histQ = self.hq
1347  fsrQ = self.fq
1348  return ( eventQ, histQ, fsrQ )
1349 
def getQueues(self, nodeID)
Definition: GMPBase.py:1344
def GaudiMP.GMPBase.Coord.getSyncEvents (   self,
  nodeID 
)

Definition at line 1338 of file GMPBase.py.

1338  def getSyncEvents( self, nodeID ) :
1339  init = self.sInit.d[nodeID].event
1340  run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1341  fin = self.sFin.d[nodeID].event
1342  return ( init, run, fin )
1343 
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1338
def GaudiMP.GMPBase.Coord.getSyncEvents (   self,
  nodeID 
)

Definition at line 1338 of file GMPBase.py.

1338  def getSyncEvents( self, nodeID ) :
1339  init = self.sInit.d[nodeID].event
1340  run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1341  fin = self.sFin.d[nodeID].event
1342  return ( init, run, fin )
1343 
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1338
def GaudiMP.GMPBase.Coord.Go (   self)

Definition at line 1350 of file GMPBase.py.

1350  def Go( self ) :
1351 
1352  # Initialise
1353  self.log.name = 'GaudiPython-Parallel-Logger'
1354  self.log.info( 'INITIALISING SYSTEM' )
1355 
1356  # Start reader, writer and main worker
1357  for p in self.system :
1358  p.Start()
1359 
1360  sc = self.sInit.syncAll(step="Initialise")
1361  if sc == SUCCESS: pass
1362  else : self.Terminate() ; return FAILURE
1363 
1364  # Run
1365  self.log.name = 'GaudiPython-Parallel-Logger'
1366  self.log.info( 'RUNNING SYSTEM' )
1367  sc = self.sRun.syncAll(step="Run")
1368  if sc == SUCCESS: pass
1369  else : self.Terminate() ; return FAILURE
1370 
1371  # Finalise
1372  self.log.name = 'GaudiPython-Parallel-Logger'
1373  self.log.info( 'FINALISING SYSTEM' )
1374  sc = self.sFin.syncAll(step="Finalise")
1375  if sc == SUCCESS: pass
1376  else : self.Terminate() ; return FAILURE
1377 
1378  # if we've got this far, finally report SUCCESS
1379  self.log.info( "Cleanly join all Processes" )
1380  self.Stop()
1381  self.log.info( "Report Total Success to Main.py" )
1382  return SUCCESS
1383 
def Terminate(self)
Definition: GMPBase.py:1384
def GaudiMP.GMPBase.Coord.Go (   self)

Definition at line 1350 of file GMPBase.py.

1350  def Go( self ) :
1351 
1352  # Initialise
1353  self.log.name = 'GaudiPython-Parallel-Logger'
1354  self.log.info( 'INITIALISING SYSTEM' )
1355 
1356  # Start reader, writer and main worker
1357  for p in self.system :
1358  p.Start()
1359 
1360  sc = self.sInit.syncAll(step="Initialise")
1361  if sc == SUCCESS: pass
1362  else : self.Terminate() ; return FAILURE
1363 
1364  # Run
1365  self.log.name = 'GaudiPython-Parallel-Logger'
1366  self.log.info( 'RUNNING SYSTEM' )
1367  sc = self.sRun.syncAll(step="Run")
1368  if sc == SUCCESS: pass
1369  else : self.Terminate() ; return FAILURE
1370 
1371  # Finalise
1372  self.log.name = 'GaudiPython-Parallel-Logger'
1373  self.log.info( 'FINALISING SYSTEM' )
1374  sc = self.sFin.syncAll(step="Finalise")
1375  if sc == SUCCESS: pass
1376  else : self.Terminate() ; return FAILURE
1377 
1378  # if we've got this far, finally report SUCCESS
1379  self.log.info( "Cleanly join all Processes" )
1380  self.Stop()
1381  self.log.info( "Report Total Success to Main.py" )
1382  return SUCCESS
1383 
def Terminate(self)
Definition: GMPBase.py:1384
def GaudiMP.GMPBase.Coord.SetupQueues (   self)

Definition at line 1401 of file GMPBase.py.

1401  def SetupQueues( self ) :
1402  # This method will set up the network of Queues needed
1403  # N Queues = nWorkers + 1
1404  # Each Worker has a Queue in, and a Queue out
1405  # Reader has Queue out only
1406  # Writer has nWorkers Queues in
1407 
1408  # one queue from Reader-Workers
1409  rwk = JoinableQueue()
1410  # one queue from each worker to writer
1411  workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
1412  d = {}
1413  d[-1] = (None, rwk) # Reader
1414  d[-2] = (workersWriter, None) # Writer
1415  for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
1416  return d
1417 
1418 # ============================= EOF ===========================================
1419 
def SetupQueues(self)
Definition: GMPBase.py:1401
def GaudiMP.GMPBase.Coord.SetupQueues (   self)

Definition at line 1401 of file GMPBase.py.

1401  def SetupQueues( self ) :
1402  # This method will set up the network of Queues needed
1403  # N Queues = nWorkers + 1
1404  # Each Worker has a Queue in, and a Queue out
1405  # Reader has Queue out only
1406  # Writer has nWorkers Queues in
1407 
1408  # one queue from Reader-Workers
1409  rwk = JoinableQueue()
1410  # one queue from each worker to writer
1411  workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
1412  d = {}
1413  d[-1] = (None, rwk) # Reader
1414  d[-2] = (workersWriter, None) # Writer
1415  for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
1416  return d
1417 
1418 # ============================= EOF ===========================================
1419 
def SetupQueues(self)
Definition: GMPBase.py:1401
def GaudiMP.GMPBase.Coord.Stop (   self)

Definition at line 1394 of file GMPBase.py.

1394  def Stop( self ) :
1395  # procs should be joined in reverse order to launch
1396  self.system.reverse()
1397  for s in self.system :
1398  s.proc.join()
1399  return SUCCESS
1400 
def GaudiMP.GMPBase.Coord.Stop (   self)

Definition at line 1394 of file GMPBase.py.

1394  def Stop( self ) :
1395  # procs should be joined in reverse order to launch
1396  self.system.reverse()
1397  for s in self.system :
1398  s.proc.join()
1399  return SUCCESS
1400 
def GaudiMP.GMPBase.Coord.Terminate (   self)

Definition at line 1384 of file GMPBase.py.

1384  def Terminate( self ) :
1385  # Brutally kill sub-processes
1386  children = multiprocessing.active_children()
1387  for i in children:
1388  i.terminate()
1389 
1390  #self.writer.proc.terminate()
1391  #[ w.proc.terminate() for w in self.workers]
1392  #self.reader.proc.terminate()
1393 
def Terminate(self)
Definition: GMPBase.py:1384
def GaudiMP.GMPBase.Coord.Terminate (   self)

Definition at line 1384 of file GMPBase.py.

1384  def Terminate( self ) :
1385  # Brutally kill sub-processes
1386  children = multiprocessing.active_children()
1387  for i in children:
1388  i.terminate()
1389 
1390  #self.writer.proc.terminate()
1391  #[ w.proc.terminate() for w in self.workers]
1392  #self.reader.proc.terminate()
1393 
def Terminate(self)
Definition: GMPBase.py:1384

Member Data Documentation

GaudiMP.GMPBase.Coord.config

Definition at line 1289 of file GMPBase.py.

GaudiMP.GMPBase.Coord.fq

Definition at line 1303 of file GMPBase.py.

GaudiMP.GMPBase.Coord.histSyncEvent

Definition at line 1318 of file GMPBase.py.

GaudiMP.GMPBase.Coord.hq

Definition at line 1302 of file GMPBase.py.

GaudiMP.GMPBase.Coord.log

Definition at line 1288 of file GMPBase.py.

GaudiMP.GMPBase.Coord.nWorkers

Definition at line 1296 of file GMPBase.py.

GaudiMP.GMPBase.Coord.qs

Definition at line 1301 of file GMPBase.py.

GaudiMP.GMPBase.Coord.reader

Definition at line 1328 of file GMPBase.py.

GaudiMP.GMPBase.Coord.sFin

Definition at line 1314 of file GMPBase.py.

GaudiMP.GMPBase.Coord.sInit

Definition at line 1306 of file GMPBase.py.

GaudiMP.GMPBase.Coord.sRun

Definition at line 1309 of file GMPBase.py.

GaudiMP.GMPBase.Coord.subworkers

Definition at line 1323 of file GMPBase.py.

GaudiMP.GMPBase.Coord.system

Definition at line 1333 of file GMPBase.py.

GaudiMP.GMPBase.Coord.workers

Definition at line 1329 of file GMPBase.py.

GaudiMP.GMPBase.Coord.writer

Definition at line 1331 of file GMPBase.py.


The documentation for this class was generated from the following file:
  • InstallArea/x86_64-slc6-gcc48-opt/python/GaudiMP/GMPBase.py