GaudiMP.GMPBase.Coord Class Reference
Inheritance diagram for GaudiMP.GMPBase.Coord:
Collaboration diagram for GaudiMP.GMPBase.Coord:

Public Member Functions

def __init__ (self, nWorkers, config, log)
 
def getSyncEvents (self, nodeID)
 
def getQueues (self, nodeID)
 
def Go (self)
 
def Terminate (self)
 
def Stop (self)
 
def SetupQueues (self)
 
def __init__ (self, nWorkers, config, log)
 
def getSyncEvents (self, nodeID)
 
def getQueues (self, nodeID)
 
def Go (self)
 
def Terminate (self)
 
def Stop (self)
 
def SetupQueues (self)
 

Public Attributes

 log
 
 config
 
 nWorkers
 
 qs
 
 hq
 
 fq
 
 sInit
 
 sRun
 
 sFin
 
 histSyncEvent
 
 subworkers
 
 reader
 
 workers
 
 writer
 
 system
 

Detailed Description

Definition at line 1286 of file GMPBase.py.

Constructor & Destructor Documentation

def GaudiMP.GMPBase.Coord.__init__ (   self,
  nWorkers,
  config,
  log 
)

Definition at line 1287 of file GMPBase.py.

1287  def __init__( self, nWorkers, config, log ) :
1288 
1289  self.log = log
1290  self.config = config
1291  # set up Logging
1292  self.log.name = 'GaudiPython-Parallel-Logger'
1293  self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
1294 
1295  if nWorkers == -1 :
1296  # The user has requested all available cpus in the machine
1297  self.nWorkers = cpu_count()
1298  else :
1299  self.nWorkers = nWorkers
1300 
1301 
1302  self.qs = self.SetupQueues( ) # a dictionary of queues (for Events)
1303  self.hq = JoinableQueue( ) # for Histogram data
1304  self.fq = JoinableQueue( ) # for FileRecords data
1305 
1306  # Make a Syncer for Initalise, Run, and Finalise
1307  self.sInit = Syncer( self.nWorkers, self.log,
1308  limit=WAIT_INITIALISE,
1309  step=STEP_INITIALISE )
1310  self.sRun = Syncer( self.nWorkers, self.log,
1311  manyEvents=True,
1312  limit=WAIT_SINGLE_EVENT,
1313  step=STEP_EVENT,
1314  firstEvent=WAIT_FIRST_EVENT )
1315  self.sFin = Syncer( self.nWorkers, self.log,
1316  limit=WAIT_FINALISE,
1317  step=STEP_FINALISE )
1318  # and one final one for Histogram Transfer
1319  self.histSyncEvent = Event()
1320 
1321  # params are common to al subprocesses
1322  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1323 
1324  self.subworkers = []
1325  # Declare SubProcesses!
1326  for i in range(1, self.nWorkers ) :
1327  sub = Subworker( i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers )
1328  self.subworkers.append( sub )
1329  self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers)
1330  self.workers = []
1331  wk = Worker( 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers )
1332  self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers)
1333 
1334  self.system = []
1335  self.system.append(self.writer)
1336  self.system.append(wk)
1337  self.system.append(self.reader)
1338 
def SetupQueues(self)
Definition: GMPBase.py:1402
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1287
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1339
def getQueues(self, nodeID)
Definition: GMPBase.py:1345
NamedRange_< CONTAINER > range(const CONTAINER &cnt, const std::string &name)
simple function to create the named range form arbitrary container
Definition: NamedRange.h:133
def GaudiMP.GMPBase.Coord.__init__ (   self,
  nWorkers,
  config,
  log 
)

Definition at line 1287 of file GMPBase.py.

1287  def __init__( self, nWorkers, config, log ) :
1288 
1289  self.log = log
1290  self.config = config
1291  # set up Logging
1292  self.log.name = 'GaudiPython-Parallel-Logger'
1293  self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
1294 
1295  if nWorkers == -1 :
1296  # The user has requested all available cpus in the machine
1297  self.nWorkers = cpu_count()
1298  else :
1299  self.nWorkers = nWorkers
1300 
1301 
1302  self.qs = self.SetupQueues( ) # a dictionary of queues (for Events)
1303  self.hq = JoinableQueue( ) # for Histogram data
1304  self.fq = JoinableQueue( ) # for FileRecords data
1305 
1306  # Make a Syncer for Initalise, Run, and Finalise
1307  self.sInit = Syncer( self.nWorkers, self.log,
1308  limit=WAIT_INITIALISE,
1309  step=STEP_INITIALISE )
1310  self.sRun = Syncer( self.nWorkers, self.log,
1311  manyEvents=True,
1312  limit=WAIT_SINGLE_EVENT,
1313  step=STEP_EVENT,
1314  firstEvent=WAIT_FIRST_EVENT )
1315  self.sFin = Syncer( self.nWorkers, self.log,
1316  limit=WAIT_FINALISE,
1317  step=STEP_FINALISE )
1318  # and one final one for Histogram Transfer
1319  self.histSyncEvent = Event()
1320 
1321  # params are common to al subprocesses
1322  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1323 
1324  self.subworkers = []
1325  # Declare SubProcesses!
1326  for i in range(1, self.nWorkers ) :
1327  sub = Subworker( i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers )
1328  self.subworkers.append( sub )
1329  self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers)
1330  self.workers = []
1331  wk = Worker( 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers )
1332  self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers)
1333 
1334  self.system = []
1335  self.system.append(self.writer)
1336  self.system.append(wk)
1337  self.system.append(self.reader)
1338 
def SetupQueues(self)
Definition: GMPBase.py:1402
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1287
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1339
def getQueues(self, nodeID)
Definition: GMPBase.py:1345
NamedRange_< CONTAINER > range(const CONTAINER &cnt, const std::string &name)
simple function to create the named range form arbitrary container
Definition: NamedRange.h:133

Member Function Documentation

def GaudiMP.GMPBase.Coord.getQueues (   self,
  nodeID 
)

Definition at line 1345 of file GMPBase.py.

1345  def getQueues( self, nodeID ) :
1346  eventQ = self.qs[ nodeID ]
1347  histQ = self.hq
1348  fsrQ = self.fq
1349  return ( eventQ, histQ, fsrQ )
1350 
def getQueues(self, nodeID)
Definition: GMPBase.py:1345
def GaudiMP.GMPBase.Coord.getQueues (   self,
  nodeID 
)

Definition at line 1345 of file GMPBase.py.

1345  def getQueues( self, nodeID ) :
1346  eventQ = self.qs[ nodeID ]
1347  histQ = self.hq
1348  fsrQ = self.fq
1349  return ( eventQ, histQ, fsrQ )
1350 
def getQueues(self, nodeID)
Definition: GMPBase.py:1345
def GaudiMP.GMPBase.Coord.getSyncEvents (   self,
  nodeID 
)

Definition at line 1339 of file GMPBase.py.

1339  def getSyncEvents( self, nodeID ) :
1340  init = self.sInit.d[nodeID].event
1341  run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1342  fin = self.sFin.d[nodeID].event
1343  return ( init, run, fin )
1344 
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1339
def GaudiMP.GMPBase.Coord.getSyncEvents (   self,
  nodeID 
)

Definition at line 1339 of file GMPBase.py.

1339  def getSyncEvents( self, nodeID ) :
1340  init = self.sInit.d[nodeID].event
1341  run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1342  fin = self.sFin.d[nodeID].event
1343  return ( init, run, fin )
1344 
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1339
def GaudiMP.GMPBase.Coord.Go (   self)

Definition at line 1351 of file GMPBase.py.

1351  def Go( self ) :
1352 
1353  # Initialise
1354  self.log.name = 'GaudiPython-Parallel-Logger'
1355  self.log.info( 'INITIALISING SYSTEM' )
1356 
1357  # Start reader, writer and main worker
1358  for p in self.system :
1359  p.Start()
1360 
1361  sc = self.sInit.syncAll(step="Initialise")
1362  if sc == SUCCESS: pass
1363  else : self.Terminate() ; return FAILURE
1364 
1365  # Run
1366  self.log.name = 'GaudiPython-Parallel-Logger'
1367  self.log.info( 'RUNNING SYSTEM' )
1368  sc = self.sRun.syncAll(step="Run")
1369  if sc == SUCCESS: pass
1370  else : self.Terminate() ; return FAILURE
1371 
1372  # Finalise
1373  self.log.name = 'GaudiPython-Parallel-Logger'
1374  self.log.info( 'FINALISING SYSTEM' )
1375  sc = self.sFin.syncAll(step="Finalise")
1376  if sc == SUCCESS: pass
1377  else : self.Terminate() ; return FAILURE
1378 
1379  # if we've got this far, finally report SUCCESS
1380  self.log.info( "Cleanly join all Processes" )
1381  self.Stop()
1382  self.log.info( "Report Total Success to Main.py" )
1383  return SUCCESS
1384 
def Terminate(self)
Definition: GMPBase.py:1385
def GaudiMP.GMPBase.Coord.Go (   self)

Definition at line 1351 of file GMPBase.py.

1351  def Go( self ) :
1352 
1353  # Initialise
1354  self.log.name = 'GaudiPython-Parallel-Logger'
1355  self.log.info( 'INITIALISING SYSTEM' )
1356 
1357  # Start reader, writer and main worker
1358  for p in self.system :
1359  p.Start()
1360 
1361  sc = self.sInit.syncAll(step="Initialise")
1362  if sc == SUCCESS: pass
1363  else : self.Terminate() ; return FAILURE
1364 
1365  # Run
1366  self.log.name = 'GaudiPython-Parallel-Logger'
1367  self.log.info( 'RUNNING SYSTEM' )
1368  sc = self.sRun.syncAll(step="Run")
1369  if sc == SUCCESS: pass
1370  else : self.Terminate() ; return FAILURE
1371 
1372  # Finalise
1373  self.log.name = 'GaudiPython-Parallel-Logger'
1374  self.log.info( 'FINALISING SYSTEM' )
1375  sc = self.sFin.syncAll(step="Finalise")
1376  if sc == SUCCESS: pass
1377  else : self.Terminate() ; return FAILURE
1378 
1379  # if we've got this far, finally report SUCCESS
1380  self.log.info( "Cleanly join all Processes" )
1381  self.Stop()
1382  self.log.info( "Report Total Success to Main.py" )
1383  return SUCCESS
1384 
def Terminate(self)
Definition: GMPBase.py:1385
def GaudiMP.GMPBase.Coord.SetupQueues (   self)

Definition at line 1402 of file GMPBase.py.

1402  def SetupQueues( self ) :
1403  # This method will set up the network of Queues needed
1404  # N Queues = nWorkers + 1
1405  # Each Worker has a Queue in, and a Queue out
1406  # Reader has Queue out only
1407  # Writer has nWorkers Queues in
1408 
1409  # one queue from Reader-Workers
1410  rwk = JoinableQueue()
1411  # one queue from each worker to writer
1412  workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
1413  d = {}
1414  d[-1] = (None, rwk) # Reader
1415  d[-2] = (workersWriter, None) # Writer
1416  for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
1417  return d
1418 
1419 # ============================= EOF ===========================================
1420 
def SetupQueues(self)
Definition: GMPBase.py:1402
def GaudiMP.GMPBase.Coord.SetupQueues (   self)

Definition at line 1402 of file GMPBase.py.

1402  def SetupQueues( self ) :
1403  # This method will set up the network of Queues needed
1404  # N Queues = nWorkers + 1
1405  # Each Worker has a Queue in, and a Queue out
1406  # Reader has Queue out only
1407  # Writer has nWorkers Queues in
1408 
1409  # one queue from Reader-Workers
1410  rwk = JoinableQueue()
1411  # one queue from each worker to writer
1412  workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
1413  d = {}
1414  d[-1] = (None, rwk) # Reader
1415  d[-2] = (workersWriter, None) # Writer
1416  for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
1417  return d
1418 
1419 # ============================= EOF ===========================================
1420 
def SetupQueues(self)
Definition: GMPBase.py:1402
def GaudiMP.GMPBase.Coord.Stop (   self)

Definition at line 1395 of file GMPBase.py.

1395  def Stop( self ) :
1396  # procs should be joined in reverse order to launch
1397  self.system.reverse()
1398  for s in self.system :
1399  s.proc.join()
1400  return SUCCESS
1401 
def GaudiMP.GMPBase.Coord.Stop (   self)

Definition at line 1395 of file GMPBase.py.

1395  def Stop( self ) :
1396  # procs should be joined in reverse order to launch
1397  self.system.reverse()
1398  for s in self.system :
1399  s.proc.join()
1400  return SUCCESS
1401 
def GaudiMP.GMPBase.Coord.Terminate (   self)

Definition at line 1385 of file GMPBase.py.

1385  def Terminate( self ) :
1386  # Brutally kill sub-processes
1387  children = multiprocessing.active_children()
1388  for i in children:
1389  i.terminate()
1390 
1391  #self.writer.proc.terminate()
1392  #[ w.proc.terminate() for w in self.workers]
1393  #self.reader.proc.terminate()
1394 
def Terminate(self)
Definition: GMPBase.py:1385
def GaudiMP.GMPBase.Coord.Terminate (   self)

Definition at line 1385 of file GMPBase.py.

1385  def Terminate( self ) :
1386  # Brutally kill sub-processes
1387  children = multiprocessing.active_children()
1388  for i in children:
1389  i.terminate()
1390 
1391  #self.writer.proc.terminate()
1392  #[ w.proc.terminate() for w in self.workers]
1393  #self.reader.proc.terminate()
1394 
def Terminate(self)
Definition: GMPBase.py:1385

Member Data Documentation

GaudiMP.GMPBase.Coord.config

Definition at line 1290 of file GMPBase.py.

GaudiMP.GMPBase.Coord.fq

Definition at line 1304 of file GMPBase.py.

GaudiMP.GMPBase.Coord.histSyncEvent

Definition at line 1319 of file GMPBase.py.

GaudiMP.GMPBase.Coord.hq

Definition at line 1303 of file GMPBase.py.

GaudiMP.GMPBase.Coord.log

Definition at line 1289 of file GMPBase.py.

GaudiMP.GMPBase.Coord.nWorkers

Definition at line 1297 of file GMPBase.py.

GaudiMP.GMPBase.Coord.qs

Definition at line 1302 of file GMPBase.py.

GaudiMP.GMPBase.Coord.reader

Definition at line 1329 of file GMPBase.py.

GaudiMP.GMPBase.Coord.sFin

Definition at line 1315 of file GMPBase.py.

GaudiMP.GMPBase.Coord.sInit

Definition at line 1307 of file GMPBase.py.

GaudiMP.GMPBase.Coord.sRun

Definition at line 1310 of file GMPBase.py.

GaudiMP.GMPBase.Coord.subworkers

Definition at line 1324 of file GMPBase.py.

GaudiMP.GMPBase.Coord.system

Definition at line 1334 of file GMPBase.py.

GaudiMP.GMPBase.Coord.workers

Definition at line 1330 of file GMPBase.py.

GaudiMP.GMPBase.Coord.writer

Definition at line 1332 of file GMPBase.py.


The documentation for this class was generated from the following file:
  • InstallArea/x86_64-slc6-gcc48-opt/python/GaudiMP/GMPBase.py