Skip to content

Commit

Permalink
fix the parallel evaluation strategy to avoid dangling processes
Browse files Browse the repository at this point in the history
  • Loading branch information
pcarruscag committed Nov 13, 2020
1 parent 3b74cc8 commit c106102
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 32 deletions.
12 changes: 6 additions & 6 deletions drivers/base_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ def _handleVariableChange(self, x):
self._setCurrent(x)
self._x[()] = x

# trigger evaluations
self._funReady = False
self._jacReady = False
self._resetAllValueEvaluations()
self._resetAllGradientEvaluations()

# manage working directories
os.chdir(self._userDir)
if os.path.isdir(self._workDir):
Expand All @@ -314,12 +320,6 @@ def _handleVariableChange(self, x):
#end
os.mkdir(self._workDir)

# trigger evaluations
self._funReady = False
self._jacReady = False
self._resetAllValueEvaluations()
self._resetAllGradientEvaluations()

return True
#end
#end
Expand Down
32 changes: 23 additions & 9 deletions drivers/parallel_eval_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _addDependencies(flist,funGraph,jacGraph):
evals = obj.function.getValueEvalChain()
for i in range(1,len(evals)):
funGraph[evals[i]].add(evals[i-1])

evals = obj.function.getGradientEvalChain()
for i in range(1,len(evals)):
jacGraph[evals[i]].add(evals[i-1])
Expand All @@ -95,6 +95,10 @@ def _addDependencies(flist,funGraph,jacGraph):

# run the active evaluations of a dependency graph
def _evalInParallel(self,dependGraph,active):
# to avoid exiting with dangling evaluations we need to catch
# all exceptions and throw when the infinite loop finishes
error = False
completed = lambda evl: evl.isRun() or evl.isError()
while True:
allRun = True
for evl,depList in dependGraph.items():
Expand All @@ -105,24 +109,34 @@ def _evalInParallel(self,dependGraph,active):
active[dep] = True

# either running or finished, move on
if evl.isIni() or evl.isRun():
evl.poll() # (starts or updates internal state)
allRun &= evl.isRun()
if evl.isIni() or completed(evl):
try:
evl.poll() # (starts or updates internal state)
allRun &= completed(evl)
except:
error = True
#end
continue
#end
allRun &= evl.isRun()
allRun &= completed(evl)

# if dependencies are met, start evaluation
# if dependencies are met start evaluation, error is considered
# as "met" otherwise the outer loop would never exit
for dep in depList:
if not dep.isRun(): break
if not completed(dep): break
else:
evl.initialize()
evl.poll()
try:
evl.initialize()
evl.poll()
except:
error = True
#end
#end
#end
if allRun: break
time.sleep(self._waitTime)
#end
if error: raise RuntimeError("Evaluations failed.")
#end

# run evaluations extracting maximum parallelism
Expand Down
48 changes: 31 additions & 17 deletions evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,26 +113,33 @@ def initialize(self):
"""
Initialize the run, create the subdirectory, copy/symlink the data and
configuration files, and write the parameters and variables to the latter.
Creates the process object, starting it in detached mode.
"""
if self._isIni: return

os.mkdir(self._workDir)
for file in self._dataFiles:
target = os.path.join(self._workDir,os.path.basename(file))
(shutil.copy,os.symlink)[self._symLinks](os.path.abspath(file),target)

for file in self._confFiles:
target = os.path.join(self._workDir,os.path.basename(file))
shutil.copy(file,target)
for par in self._parameters:
par.writeToFile(target)
for var in self._variables:
var.writeToFile(target)

self._createProcess()
self._isIni = True
self._isRun = False
self._numTries = 0
try:
os.mkdir(self._workDir)
for file in self._dataFiles:
target = os.path.join(self._workDir,os.path.basename(file))
(shutil.copy,os.symlink)[self._symLinks](os.path.abspath(file),target)

for file in self._confFiles:
target = os.path.join(self._workDir,os.path.basename(file))
shutil.copy(file,target)
for par in self._parameters:
par.writeToFile(target)
for var in self._variables:
var.writeToFile(target)

self._createProcess()
self._isIni = True
self._isRun = False
self._isError = False
self._numTries = 0
except:
self._isError = True
raise
#end
#end

def _createProcess(self):
Expand All @@ -154,8 +161,10 @@ def poll(self):
# Common implementation of "run" and "poll"
def _exec(self,wait,timeout):
if not self._isIni:
self._isError = True
raise RuntimeError("Run was not initialized.")
if self._numTries == self._maxTries:
self._isError = True
raise RuntimeError("Run failed.")
if self._isRun:
return self._retcode
Expand Down Expand Up @@ -195,6 +204,10 @@ def isRun(self):
"""Return True if the run has finished."""
return self._isRun

def isError(self):
"""Return True if the run has failed."""
return self._isError

def finalize(self):
"""Reset "lazy" flags, close the stdout and stderr of the process."""
try:
Expand All @@ -204,6 +217,7 @@ def finalize(self):
pass
self._isIni = False
self._isRun = False
self._isError = False
self._retcode = -100
#end

Expand Down
11 changes: 11 additions & 0 deletions function.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ def addGradientEvalStep(self,evaluation):
"""Add a required step to compute the function gradient."""
self._gradEval.append(evaluation)

# check if any evaluation is in error state
def _checkError(self,evals):
for evl in evals:
if evl.isError(): raise RuntimeError("Evaluations failed.")
#end
#end

def getValue(self):
"""
Get the function value, i.e. apply the parser to the output file.
Expand All @@ -136,6 +143,8 @@ def getValue(self):
is set via the Variable objects.
"""
# check if we can retrive the value
self._checkError(self._funEval)

for evl in self._funEval:
if not evl.isRun():
self._sequentialEval(self._funEval)
Expand All @@ -157,6 +166,8 @@ def getGradient(self,mask=None):
getGradient({x : 0, z : 3}) -> [0, 0, 0, 2, 2]
"""
# check if we can retrive the gradient
self._checkError(self._gradEval)

for evl in self._gradEval:
if not evl.isRun():
self._sequentialEval(self._gradEval)
Expand Down

0 comments on commit c106102

Please sign in to comment.