Skip to content

Commit

Permalink
Support for -pe (parallel environment) option (#71)
Browse files Browse the repository at this point in the history
* add pe option

* default pe value

* renamed pe to par_env

* documentation
  • Loading branch information
jackkamm authored and dan-blanchard committed Apr 14, 2016
1 parent 7d82de3 commit b9ddf47
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
5 changes: 5 additions & 0 deletions gridmap/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
:var HEARTBEAT_FREQUENCY: How many seconds pass before jobs on the cluster send
back heart beats to the submission host.
(Default: 10)
:var DEFAULT_PAR_ENV: Default parallel environment to use
(Default: ``smp``)
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
Expand Down Expand Up @@ -130,3 +132,6 @@

# Where shall we use as temporary_directory
DEFAULT_TEMP_DIR = os.getenv('DEFAULT_TEMP_DIR', '/scratch/')

# Which parallel environment to use by default
DEFAULT_PAR_ENV = os.getenv('DEFAULT_PAR_ENV', "smp")
17 changes: 11 additions & 6 deletions gridmap/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
IDLE_THRESHOLD, MAX_IDLE_HEARTBEATS,
MAX_TIME_BETWEEN_HEARTBEATS, NUM_RESUBMITS,
SEND_ERROR_MAIL, SMTP_SERVER, USE_MEM_FREE,
DEFAULT_TEMP_DIR)
DEFAULT_TEMP_DIR, DEFAULT_PAR_ENV)
from gridmap.data import zdumps, zloads
from gridmap.runner import _heart_beat

Expand Down Expand Up @@ -116,11 +116,11 @@ class Job(object):
'cause_of_death', 'num_resubmits', 'home_address',
'log_stderr_fn', 'log_stdout_fn', 'timestamp', 'host_name',
'heart_beat', 'track_mem', 'track_cpu', 'interpreting_shell',
'copy_env')
'copy_env','par_env')

def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE,
interpreting_shell=None, copy_env=True, add_env=None):
interpreting_shell=None, copy_env=True, add_env=None, par_env=DEFAULT_PAR_ENV):
"""
Initializes a new Job.
Expand Down Expand Up @@ -149,6 +149,8 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
Overwrites variables which already exist due to
``copy_env=True``.
:type add_env: dict
:param par_env: parallel environment to use.
:type par_env: str
"""
self.track_mem = []
self.track_cpu = []
Expand Down Expand Up @@ -195,6 +197,7 @@ def _add_env(env_vars):
if add_env is not None:
_add_env(add_env)
self.working_dir = os.getcwd()
self.par_env = par_env

@property
def function(self):
Expand Down Expand Up @@ -263,7 +266,7 @@ def native_specification(self):
if self.mem_free and USE_MEM_FREE:
ret += " -l mem_free={}".format(self.mem_free)
if self.num_slots and self.num_slots > 1:
ret += " -pe smp {}".format(self.num_slots)
ret += " -pe {} {}".format(self.par_env, self.num_slots)
if self.white_list:
ret += " -l h={}".format('|'.join(self.white_list))
if self.queue:
Expand Down Expand Up @@ -929,7 +932,7 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
num_slots=1, temp_dir=DEFAULT_TEMP_DIR, white_list=None,
queue=DEFAULT_QUEUE, quiet=True, local=False, max_processes=1,
interpreting_shell=None, copy_env=True, add_env=None,
completion_mail=False, require_cluster=False):
completion_mail=False, require_cluster=False, par_env=DEFAULT_PAR_ENV):
"""
Maps a function onto the cluster.
Expand Down Expand Up @@ -978,6 +981,8 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
Overwrites variables which already exist due to
``copy_env=True``.
:type add_env: dict
:param par_env: parallel environment to use.
:type par_env: str
:param completion_mail: whether to send an e-mail upon completion of all
jobs
:type completion_mail: boolean
Expand All @@ -993,7 +998,7 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
cleanup=cleanup, mem_free=mem_free,
name='{}{}'.format(name, job_num), num_slots=num_slots,
queue=queue, interpreting_shell=interpreting_shell,
copy_env=copy_env, add_env=add_env)
copy_env=copy_env, add_env=add_env, par_env=par_env)
for job_num, args in enumerate(args_list)]

# process jobs
Expand Down

0 comments on commit b9ddf47

Please sign in to comment.