Skip to content

Commit

Permalink
Merge pull request #4 from EMBL-EBI-TSI/testing
Browse files Browse the repository at this point in the history
Merge testing into master for 0.2 release
  • Loading branch information
erikvdbergh authored Mar 5, 2018
2 parents dbdf45e + ee659e5 commit a99c586
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 235 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.swp
*.pyc
6 changes: 6 additions & 0 deletions cloudbuild_testing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
steps:
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'eu.gcr.io/tes-wes/taskmaster:testing', '-f', 'dockerfiles/taskmaster/Dockerfile', '.']
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'eu.gcr.io/tes-wes/filer:testing', '-f', 'dockerfiles/filer/Dockerfile', '.']
images: ['eu.gcr.io/tes-wes/taskmaster:testing', 'eu.gcr.io/tes-wes/filer:testing']
12 changes: 9 additions & 3 deletions dockerfiles/taskmaster/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ FROM gliderlabs/alpine
RUN apk add --no-cache python py-pip curl openssl
RUN pip install kubernetes

WORKDIR /root
RUN adduser -S taskmaster

USER taskmaster

WORKDIR /home/taskmaster
COPY scripts/taskmaster.py .
COPY scripts/job.py .
COPY scripts/pvc.py .
COPY scripts/filer_class.py .

ENTRYPOINT ["/root/taskmaster.py"]
#CMD /root/taskmaster.py
ENTRYPOINT ["/home/taskmaster/taskmaster.py"]
107 changes: 107 additions & 0 deletions examples/success/inputoutput_taskmaster.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{
"outputs": [
{
"url": "ftp://ftp-private.ebi.ac.uk/upload/mouselist.txt",
"path": "/tmp/vol1/mouselist.txt",
"type": "FILE"
},
{
"url": "ftp://ftp-private.ebi.ac.uk/upload/mouse_out",
"path": "/tmp/vol2/mouse",
"type": "DIRECTORY"
}
],
"inputs": [
{
"url": "ftp://ftp-private.ebi.ac.uk/upload/input.txt",
"path": "/tmp/vol1/in.txt",
"type": "FILE"
},
{
"url": "ftp://ftp-private.ebi.ac.uk/upload/mouse",
"path": "/tmp/vol2/mouse",
"type": "DIRECTORY"
}
],
"volumes": [
"/tmp/vol1",
"/tmp/vol2"
],
"executors": [
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"annotations": {
"tes-task-name": "Hello Input"
},
"labels": {
"job-type": "executor",
"taskmaster-name": "task-7d5c53f4",
"executor-no": "0"
},
"name": "task-7d5c53f4-ex-00"
},
"spec": {
"template": {
"metadata": {
"name": "task-7d5c53f4-ex-00"
},
"spec": {
"containers": [
{
"command": [
"cat",
"/tmp/vol1/in.txt"
],
"image": "alpine",
"name": "task-7d5c53f4-ex-00",
"resources": {}
}
],
"restartPolicy": "Never"
}
}
}
},
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"annotations": {
"tes-task-name": "Hello Input"
},
"labels": {
"job-type": "executor",
"taskmaster-name": "task-7d5c53f4",
"executor-no": "1"
},
"name": "task-7d5c53f4-ex-01"
},
"spec": {
"template": {
"metadata": {
"name": "task-7d5c53f4-ex-01"
},
"spec": {
"containers": [
{
"command": [
"sh", "-c",
"find /tmp/vol2 > /tmp/vol1/mouselist.txt"
],
"image": "alpine",
"name": "task-7d5c53f4-ex-01",
"resources": {}
}
],
"restartPolicy": "Never"
}
}
}
}
],
"resources": {
"disk_gb": 0.1
}
}
66 changes: 51 additions & 15 deletions scripts/filer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import re
import os
import distutils.dir_util
import requests
import time
import logging
import traceback


debug = True

def download_ftp_file(source, target, ftp):
logging.debug('downloading ftp file: '+source+' target: '+target)
basedir = os.path.dirname(target)
distutils.dir_util.mkpath(basedir)

Expand All @@ -22,20 +26,36 @@ def download_ftp_file(source, target, ftp):

def process_upload_dir(source, target, ftp):
basename = os.path.basename(source)
logging.debug('processing upload dir src: '+source+' target: '+target)
logging.debug('dir basename: '+basename)
wd = ftp.pwd()
# does the parent dir exist?
try:
print('trying to create dir: ' + '/'+target+'/'+basename, file=sys.stderr)
ftp.cwd('/'+target)
except:
logging.error('Cannot stat parent dir: /'+target)
return 1

ftp.cwd(wd)

try:
logging.debug('trying to create dir: ' + '/'+target+'/'+basename)
ftp.mkd('/'+target+'/'+basename)
except ftplib.error_perm:
print('Directory exists, overwriting')
logging.debug('Directory exists, overwriting')

for f in os.listdir(source):
if os.path.isdir(source+'/'+f):
process_upload_dir(source+'/'+f, target+'/'+basename+'/', ftp)
elif os.path.isfile(source+'/'+f):
ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(source+'/'+f, 'r'))
path = source+'/'+f
if os.path.isdir(path):
process_upload_dir(path, target+'/'+basename+'/', ftp)
elif os.path.isfile(path):
logging.debug('Trying to upload file: '+path+' to dest: '+target+'/'+basename+'/'+f)
ftp.storbinary("STOR "+target+'/'+basename+'/'+f, open(path, 'r'))
return 0

def process_ftp_dir(source, target, ftp):
logging.debug('processing ftp dir: '+source+' target: '+target)
pwd = ftp.pwd()
ftp.cwd('/'+source)

ls = []
Expand All @@ -53,11 +73,14 @@ def process_ftp_dir(source, target, ftp):
else:
download_ftp_file(name, target+'/'+name, ftp)

ftp.cwd(pwd)

def process_ftp_file(ftype, afile):
p = re.compile('[a-z]+://([-a-z.]+)/(.*)')
ftp_baseurl = p.match(afile['url']).group(1)
ftp_path = p.match(afile['url']).group(2)

logging.debug('Connecting to FTP: '+ftp_baseurl)
ftp = FTP(ftp_baseurl)
if os.environ.get('TESK_FTP_USERNAME') is not None:
try:
Expand All @@ -84,22 +107,32 @@ def process_ftp_file(ftype, afile):
elif afile['type'] == 'DIRECTORY':
return process_upload_dir(afile['path'], ftp_path, ftp)
else:
print('Unknown file type: '+afile['type'])
logging.error('Unknown file type: '+afile['type'])
return 1
else:
print('Unknown file action: ' + ftype)
logging.error('Unknown file action: ' + ftype)
return 1

def process_http_file(ftype, afile):
if ftype == 'inputs':
r = requests.get(afile['url'])

if r.status_code != 200:
logging.error('Got status code: '+str(r.status_code))
return 1

fp = open(afile['path'], 'wb')
fp.write(r.content)
fp.close
return 0
elif ftype == 'outputs':
fp = open(afile['path'], 'r')
r = requests.put(afile['url'], data=fp.read())

if r.status_code != 200 or r.status_code != 201:
logging.error('Got status code: '+str(r.status_code))
return 1

fp.close
return 0
else:
Expand All @@ -109,7 +142,7 @@ def process_http_file(ftype, afile):
def filefromcontent(afile):
content = afile.get('content')
if content is None:
print('Incorrect file spec format, no content or url specified', file=sys.stderr)
logging.error('Incorrect file spec format, no content or url specified')
return 1

fh = open(afile['path'], 'w')
Expand All @@ -125,7 +158,7 @@ def process_file(ftype, afile):

p = re.compile('([a-z]+)://')
protocol = p.match(url).group(1)
debug('protocol is: '+protocol)
logging.debug('protocol is: '+protocol)

if protocol == 'ftp':
return process_ftp_file(ftype, afile)
Expand All @@ -140,6 +173,8 @@ def debug(msg):
print(msg, file=sys.stderr)

def main(argv):
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S', level=logging.DEBUG)
logging.debug('Starting filer...')
parser = argparse.ArgumentParser(description='Filer script for down- and uploading files')
parser.add_argument('filetype', help='filetype to handle, either \'inputs\' or \'outputs\' ')
parser.add_argument('data', help='file description data, see docs for structure')
Expand All @@ -148,14 +183,15 @@ def main(argv):
data = json.loads(args.data)

for afile in data[args.filetype]:
debug('processing file: '+afile['path'])
logging.debug('processing file: '+afile['path'])
if process_file(args.filetype, afile):
print('something went wrong', file=sys.stderr)
logging.error('something went wrong')
return 1
# TODO a bit more detailed reporting
else:
debug('Processed file: ' + afile['path'])
logging.debug('Processed file: ' + afile['path'])

return 0

if __name__ == "__main__":
main(sys.argv)
sys.exit(main(sys.argv))
50 changes: 50 additions & 0 deletions scripts/filer_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json

class Filer:
def __init__(self, name, data, filer_version='v0.5', debug=False):
self.name = name
self.spec = {
"kind": "Job",
"apiVersion": "batch/v1",
"metadata": { "name": name },
"spec": {
"template": {
"metadata": { "name": "tesk-filer" },
"spec": {
"containers": [ {
"name": "filer",
"image": "eu.gcr.io/tes-wes/filer:"+filer_version,
"args": [],
"env": [],
"volumeMounts": [],
"imagePullPolicy": "IfNotPresent"
}
],
"volumes": [],
"restartPolicy": "Never"
}
}
}
}

if debug:
self.spec['spec']['template']['spec']['containers'][0]['imagePullPolicy'] = 'Always'

container = self.spec['spec']['template']['spec']['containers'][0]
container['env'].append({ "name": "JSON_INPUT", "value": json.dumps(data) })
#container['env'].append({ "name": "JSON_INPUT", "value": 'test' })

def set_ftp(self, user, pw):
env = self.spec['spec']['template']['spec']['containers'][0]['env']
env.append({ "name": "TESK_FTP_USERNAME", "value": user })
env.append({ "name": "TESK_FTP_PASSWORD", "value": pw })

def set_volume_mounts(self, pvc):
tempspec = self.spec['spec']['template']['spec']
tempspec['containers'][0]['volumeMounts'] = pvc.volume_mounts
tempspec['volumes'] = [ { "name": "task-volume", "persistentVolumeClaim": { "claimName": pvc.name} } ]

def get_spec(self, mode):
self.spec['spec']['template']['spec']['containers'][0]['args'] = [mode, "$(JSON_INPUT)"]
self.spec['spec']['template']['metadata']['name'] = self.name
return self.spec
44 changes: 44 additions & 0 deletions scripts/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from kubernetes import client, config
import logging
import time

class Job:
def __init__(self, body, name='task-job', namespace='default'):
self.name = name
self.namespace = namespace
self.status = 'Initialized'
self.bv1 = client.BatchV1Api()
self.body = body
self.body['metadata']['name'] = self.name

def run_to_completion(self, poll_interval, check_cancelled):
logging.debug(self.body)
self.bv1.create_namespaced_job(self.namespace, self.body)
status = self.get_status()
while status == 'Running':
if check_cancelled():
self.delete()
return 'Cancelled'

time.sleep(poll_interval)

status = self.get_status()

return status

def get_status(self):
job = self.bv1.read_namespaced_job(self.name, self.namespace)
try:
if job.status.conditions[0].type == 'Complete' and job.status.conditions[0].status:
self.status = 'Complete'
elif job.status.conditions[0].type == 'Failed' and job.status.conditions[0].status:
self.status = 'Failed'
else:
self.status = 'Error'
except TypeError: # The condition is not initialized, so it is not complete yet, wait for it
self.status = 'Running'

return self.status

def delete(self):
self.bv1.delete_namespaced_job(self.name, self.namespace, client.V1DeleteOptions())
Loading

0 comments on commit a99c586

Please sign in to comment.