From ec84d6e8192f685ebfadd94b4be0480652d73326 Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Mon, 13 Nov 2023 22:24:25 -0500 Subject: [PATCH] add cli to handle terminated submission (#413) Signed-off-by: Jinzhe Zeng --------- Signed-off-by: Jinzhe Zeng --- .github/workflows/test.yml | 6 +- dpdispatcher/__main__.py | 8 +++ dpdispatcher/dpdisp.py | 41 ++++++++++++- dpdispatcher/entrypoints/submission.py | 83 ++++++++++++++++++++++++++ dpdispatcher/submission.py | 3 +- tests/context.py | 1 + tests/test_cli.py | 5 +- tests/test_run_submission.py | 8 +++ 8 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 dpdispatcher/__main__.py create mode 100644 dpdispatcher/entrypoints/submission.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7a7bd169..92ca05ab 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,7 +27,11 @@ jobs: python-version: ${{ matrix.python-version }} - run: pip install .[test] coverage - name: Test - run: coverage run --source=./dpdispatcher -m unittest -v && coverage report + run: | + coverage run -p --source=./dpdispatcher -m unittest -v + coverage run -p --source=./dpdispatcher -m dpdispatcher -h + coverage combine + coverage report - uses: codecov/codecov-action@v3 pass: needs: [test] diff --git a/dpdispatcher/__main__.py b/dpdispatcher/__main__.py new file mode 100644 index 00000000..719be2d5 --- /dev/null +++ b/dpdispatcher/__main__.py @@ -0,0 +1,8 @@ +"""Package dp entry point.""" + +from dpdispatcher.dpdisp import ( + main, +) + +if __name__ == "__main__": + main() diff --git a/dpdispatcher/dpdisp.py b/dpdispatcher/dpdisp.py index d092df3b..c0be0a15 100644 --- a/dpdispatcher/dpdisp.py +++ b/dpdispatcher/dpdisp.py @@ -3,6 +3,7 @@ from typing import List, Optional from dpdispatcher.entrypoints.gui import start_dpgui +from dpdispatcher.entrypoints.submission import handle_submission def main_parser() -> argparse.ArgumentParser: @@ -23,6 +24,37 @@ def main_parser() -> argparse.ArgumentParser: ) subparsers = parser.add_subparsers(title="Valid subcommands", dest="command") ########################################## + # backward + parser_submission = subparsers.add_parser( + "submission", + help="Handle terminated submission.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser_submission.add_argument( + "SUBMISSION_HASH", + type=str, + help="Submission hash to download.", + ) + parser_submission_action = parser_submission.add_argument_group( + "Actions", + description="One or more actions to take on submission.", + ) + parser_submission_action.add_argument( + "--download-terminated-log", + action="store_true", + help="Download log files of terminated tasks.", + ) + parser_submission_action.add_argument( + "--download-finished-task", + action="store_true", + help="Download finished tasks.", + ) + parser_submission_action.add_argument( + "--clean", + action="store_true", + help="Clean submission.", + ) + ########################################## # gui parser_gui = subparsers.add_parser( "gui", @@ -67,7 +99,14 @@ def parse_args(args: Optional[List[str]] = None): def main(): args = parse_args() - if args.command == "gui": + if args.command == "submission": + handle_submission( + submission_hash=args.SUBMISSION_HASH, + download_terminated_log=args.download_terminated_log, + download_finished_task=args.download_finished_task, + clean=args.clean, + ) + elif args.command == "gui": start_dpgui( port=args.port, bind_all=args.bind_all, diff --git a/dpdispatcher/entrypoints/submission.py b/dpdispatcher/entrypoints/submission.py new file mode 100644 index 00000000..0019d41e --- /dev/null +++ b/dpdispatcher/entrypoints/submission.py @@ -0,0 +1,83 @@ +from pathlib import Path + +from dpdispatcher.dlog import dlog +from dpdispatcher.submission import Submission +from dpdispatcher.utils.job_status import JobStatus +from dpdispatcher.utils.record import record + + +def handle_submission( + *, + submission_hash: str, + download_terminated_log: bool = False, + download_finished_task: bool = False, + clean: bool = False, +): + """Handle terminated submission. + + Parameters + ---------- + submission_hash : str + Submission hash to download. + download_terminated_log : bool, optional + Download log files of terminated tasks. + download_finished_task : bool, optional + Download finished tasks. + clean : bool, optional + Clean submission. + + Raises + ------ + ValueError + At least one action should be specified. + """ + if int(download_terminated_log) + int(download_finished_task) + int(clean) == 0: + raise ValueError("At least one action should be specified.") + + submission_file = record.get_submission(submission_hash) + submission = Submission.submission_from_json(str(submission_file)) + submission.belonging_tasks = [ + task for job in submission.belonging_jobs for task in job.job_task_list + ] + # TODO: for unclear reason, the submission_hash may be changed + submission.submission_hash = submission_hash + submission.machine.context.bind_submission(submission) + submission.update_submission_state() + + terminated_tasks = [] + finished_tasks = [] + for task in submission.belonging_tasks: + task.get_task_state(submission.machine.context) + if task.task_state == JobStatus.terminated: + terminated_tasks.append(task) + elif task.task_state == JobStatus.finished: + finished_tasks.append(task) + submission.belonging_tasks = [] + + if download_terminated_log: + for task in terminated_tasks: + task.backward_files = [task.outlog, task.errlog] + submission.belonging_tasks += terminated_tasks + if download_finished_task: + submission.belonging_tasks += finished_tasks + + submission.download_jobs() + + if download_terminated_log: + terminated_log_files = [] + for task in terminated_tasks: + assert submission.local_root is not None + terminated_log_files.append( + Path(submission.local_root) / task.task_work_path / task.outlog + ) + terminated_log_files.append( + Path(submission.local_root) / task.task_work_path / task.errlog + ) + + dlog.info( + "Terminated logs are downloaded into:\n " + + "\n ".join([str(f) for f in terminated_log_files]) + ) + + if clean: + submission.clean_jobs() diff --git a/dpdispatcher/submission.py b/dpdispatcher/submission.py index 7de78c00..970cca1c 100644 --- a/dpdispatcher/submission.py +++ b/dpdispatcher/submission.py @@ -364,7 +364,8 @@ def handle_unexpected_submission_state(self): f"Debug information: remote_root=={self.machine.context.remote_root}.\n" f"Debug information: submission_hash=={self.submission_hash}.\n" f"Please check error messages above and in remote_root. " - f"The submission information is saved in {str(record_path)}." + f"The submission information is saved in {str(record_path)}.\n" + f"For furthur actions, run the following command with proper flags: dpdisp submission {self.submission_hash}" ) from e def check_ratio_unfinished(self, ratio_unfinished: float) -> bool: diff --git a/tests/context.py b/tests/context.py index 73809f9a..ca56c2ab 100644 --- a/tests/context.py +++ b/tests/context.py @@ -11,6 +11,7 @@ from dpdispatcher.contexts.lazy_local_context import LazyLocalContext # noqa: F401 from dpdispatcher.contexts.local_context import LocalContext # noqa: F401 from dpdispatcher.contexts.ssh_context import SSHContext, SSHSession # noqa: F401 +from dpdispatcher.entrypoints.submission import handle_submission # noqa: F401 from dpdispatcher.machine import Machine # noqa: F401 from dpdispatcher.machines.distributed_shell import DistributedShell # noqa: F401 from dpdispatcher.machines.dp_cloud_server import Lebesgue # noqa: F401 diff --git a/tests/test_cli.py b/tests/test_cli.py index 30c00626..26edc73d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,5 +5,8 @@ class TestCLI(unittest.TestCase): def test_cli(self): sp.check_output(["dpdisp", "-h"]) - for subcommand in ("gui",): + for subcommand in ( + "submission", + "gui", + ): sp.check_output(["dpdisp", subcommand, "-h"]) diff --git a/tests/test_run_submission.py b/tests/test_run_submission.py index 6e1af0df..9a933dbf 100644 --- a/tests/test_run_submission.py +++ b/tests/test_run_submission.py @@ -14,6 +14,7 @@ Resources, Submission, Task, + handle_submission, record, setUpModule, # noqa: F401 ) @@ -134,6 +135,13 @@ def test_failed_submission(self): if sys.platform == "linux": self.assertTrue(err_msg in traceback.format_exc()) self.assertTrue(record.get_submission(submission.submission_hash).is_file()) + # post processing + handle_submission( + submission_hash=submission.submission_hash, + download_finished_task=True, + download_terminated_log=True, + clean=True, + ) def test_async_run_submission(self): machine = Machine.load_from_dict(self.machine_dict)