Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

standardize pipeline options in Load Data UI #4603

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def assert_airflow_delete_families_calls(self):
call_count_per_dag = 5
for i, dataset_type in enumerate(['MITO', 'SNV_INDEL', 'SV']):
offset = i * call_count_per_dag
self._assert_airflow_calls(self._get_dag_variables(dataset_type), call_count_per_dag, offset)
self._assert_airflow_calls(self._get_dag_variables(dataset_type), call_count_per_dag, {}, offset)

def _assert_update_check_airflow_calls(self, call_count, offset, update_check_path):
variables_update_check_path = f'{self.MOCK_AIRFLOW_URL}/api/v1/variables/{self.DAG_NAME}'
Expand Down
17 changes: 11 additions & 6 deletions seqr/utils/search/add_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,26 @@ def format_loading_pipeline_variables(
variables['sample_type'] = sample_type
return variables

def prepare_data_loading_request(projects: list[Project], sample_type: str, dataset_type: str, genome_version: str,
data_path: str, user: User, pedigree_dir: str, raise_pedigree_error: bool = False,
individual_ids: list[int] = None, skip_validation: bool = False):
def prepare_data_loading_request(
projects: list[Project], sample_type: str, dataset_type: str, genome_version: str,
data_path: str, user: User, pedigree_dir: str, raise_pedigree_error: bool = False, individual_ids: list[int] = None,
skip_validation: bool = False, skip_check_sex_and_relatedness: bool = False, ignore_missing_samples_when_remapping: bool = False,
):
variables = format_loading_pipeline_variables(
projects,
genome_version,
dataset_type,
sample_type,
callset_path=data_path,
)
if skip_validation:
variables['skip_validation'] = True
config_params = {
'skip_validation': skip_validation,
'skip_check_sex_and_relatedness': skip_check_sex_and_relatedness,
'ignore_missing_samples_when_remapping': ignore_missing_samples_when_remapping,
}
file_path = _get_pedigree_path(pedigree_dir, genome_version, sample_type, dataset_type)
_upload_data_loading_files(projects, user, file_path, individual_ids, raise_pedigree_error)
return variables, file_path
return variables, file_path, config_params


def _dag_dataset_type(sample_type: str, dataset_type: str):
Expand Down
7 changes: 5 additions & 2 deletions seqr/views/apis/anvil_workspace_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,13 +767,16 @@ def _assert_valid_operation(self, project, test_add_data=True):
}}]})
self.assert_expected_airtable_headers(-1)

dag_json = {
dag_args = {
'projects_to_run': [project.guid],
'dataset_type': 'SNV_INDEL',
'reference_genome': genome_version,
'callset_path': 'gs://test_bucket/test_path.vcf',
'sample_type': 'WES',
'sample_source': 'AnVIL',
'skip_validation': False,
'skip_check_sex_and_relatedness': False,
'ignore_missing_samples_when_remapping': False,
}
sample_summary = '3 new'
if test_add_data:
Expand All @@ -788,7 +791,7 @@ def _assert_valid_operation(self, project, test_add_data=True):
```{dag}```
""".format(guid=project.guid, version=genome_version, workspace_name=project.workspace_name,
project_name=project.name, sample_summary=sample_summary,
dag=json.dumps(dag_json, indent=4),
dag=json.dumps(dag_args, indent=4),
)
self.mock_slack.assert_called_with(
SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, slack_message,
Expand Down
11 changes: 8 additions & 3 deletions seqr/views/apis/data_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ def load_data(request):
request_json = json.loads(request.body)
sample_type = request_json['sampleType']
dataset_type = request_json.get('datasetType', Sample.DATASET_TYPE_VARIANT_CALLS)
skip_validation = request_json.get('skipValidation', False)
projects = [json.loads(project) for project in request_json['projects']]
project_samples = {p['projectGuid']: p.get('sampleIds') for p in projects}

Expand All @@ -537,7 +536,12 @@ def load_data(request):
loading_args = (
project_models, sample_type, dataset_type, request_json['genomeVersion'], _callset_path(request_json),
)
loading_kwargs = {'user': request.user, 'skip_validation': skip_validation}
loading_kwargs = {
'user': request.user,
'skip_validation': request_json.get('skipValidation', False),
'skip_check_sex_and_relatedness': request_json.get('skipSRChecks', False),
'ignore_missing_samples_when_remapping': request_json.get('ignoreMissingRemapSamples', False),
}
if AirtableSession.is_airtable_enabled():
individual_ids = _get_valid_project_samples(project_samples, dataset_type, sample_type, request.user)
success_message = f'*{request.user.email}* triggered loading internal {sample_type} {dataset_type} data for {len(projects)} projects'
Expand All @@ -547,9 +551,10 @@ def load_data(request):
success_slack_channel=SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, is_internal=True, individual_ids=individual_ids,
)
else:
request_json, _ = prepare_data_loading_request(
request_json, _, config_params = prepare_data_loading_request(
*loading_args, **loading_kwargs, pedigree_dir=LOADING_DATASETS_DIR, raise_pedigree_error=True,
)
request_json.update(config_params)
response = requests.post(f'{PIPELINE_RUNNER_SERVER}/loading_pipeline_enqueue', json=request_json, timeout=60)
if response.status_code == 409:
raise ErrorsWarningsException(['Loading pipeline is already running. Wait for it to complete and resubmit'])
Expand Down
21 changes: 12 additions & 9 deletions seqr/views/apis/data_manager_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1561,9 +1561,13 @@ def test_load_data(self, mock_temp_dir, mock_open, mock_mkdir):
'reference_genome': 'GRCh38',
'callset_path': f'{self.TRIGGER_CALLSET_DIR}/mito_callset.mt',
'sample_type': 'WES',
}
config_params = {
'skip_validation': True,
'skip_check_sex_and_relatedness': False,
'ignore_missing_samples_when_remapping': False,
}
self._assert_success_notification(dag_json)
self._assert_success_notification(dag_json, config_params)

# Test loading trigger error
self._set_loading_trigger_error()
Expand All @@ -1573,7 +1577,6 @@ def test_load_data(self, mock_temp_dir, mock_open, mock_mkdir):
self.reset_logs()

del body['skipValidation']
del dag_json['skip_validation']
body.update({'datasetType': 'SV', 'filePath': f'{self.CALLSET_DIR}/sv_callset.vcf'})
self._trigger_error(url, body, dag_json, mock_open, mock_mkdir)

Expand Down Expand Up @@ -1695,9 +1698,10 @@ def _assert_expected_load_data_requests(self, dataset_type='MITO', sample_type='
'sample_type': sample_type,
'dataset_type': dataset_type,
'reference_genome': 'GRCh38',
'skip_validation': skip_validation,
'skip_check_sex_and_relatedness': False,
'ignore_missing_samples_when_remapping': False,
}
if skip_validation:
body['skip_validation'] = True
self.assertDictEqual(json.loads(responses.calls[0].request.body), body)

@staticmethod
Expand All @@ -1708,9 +1712,9 @@ def _has_expected_ped_files(self, mock_open, mock_mkdir, dataset_type, *args, sa
super()._has_expected_ped_files(mock_open, mock_mkdir, dataset_type, *args, sample_type, **kwargs)
mock_mkdir.assert_called_once_with(self._local_pedigree_path(dataset_type, sample_type), exist_ok=True)

def _assert_success_notification(self, dag_json):
def _assert_success_notification(self, dag_json, config_params):
self.maxDiff = None
self.assert_json_logs(self.pm_user, [('Triggered loading pipeline', {'detail': dag_json})])
self.assert_json_logs(self.pm_user, [('Triggered loading pipeline', {'detail': {**dag_json, **config_params}})])

def _set_loading_trigger_error(self):
responses.add(responses.POST, PIPELINE_RUNNER_URL, status=400)
Expand Down Expand Up @@ -1836,7 +1840,6 @@ def _get_dag_variable_overrides(*args, **kwargs):
'sample_source': 'Broad_Internal',
'sample_type': 'WES',
'dataset_type': 'MITO',
'skip_validation': True,
}

def _assert_expected_load_data_requests(self, dataset_type='MITO', **kwargs):
Expand All @@ -1855,15 +1858,15 @@ def _set_loading_trigger_error(self):
self.set_dag_trigger_error_response(status=400)
self.mock_authorized_session.reset_mock()

def _assert_success_notification(self, dag_json):
def _assert_success_notification(self, dag_json, config_params):
dag_json['sample_source'] = 'Broad_Internal'

message = f"""*[email protected]* triggered loading internal WES MITO data for 2 projects

Pedigree files have been uploaded to gs://seqr-loading-temp/v3.1/GRCh38/MITO/pedigrees/WES

DAG LOADING_PIPELINE is triggered with following:
```{json.dumps(dag_json, indent=4)}```
```{json.dumps({**dag_json, **config_params}, indent=4)}```
"""
self.mock_slack.assert_called_once_with(SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, message)
self.mock_slack.reset_mock()
Expand Down
10 changes: 5 additions & 5 deletions seqr/views/utils/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DagRunningException(Exception):
def trigger_airflow_data_loading(*args, user: User, individual_ids: list[int], success_message: str, success_slack_channel: str,
error_message: str, is_internal: bool = False, **kwargs):
success = True
updated_variables, gs_path = prepare_data_loading_request(
updated_variables, gs_path, config_params = prepare_data_loading_request(
*args, user, individual_ids=individual_ids, pedigree_dir=SEQR_V3_PEDIGREE_GS_PATH, **kwargs,
)
updated_variables['sample_source'] = 'Broad_Internal' if is_internal else 'AnVIL'
Expand All @@ -34,15 +34,15 @@ def trigger_airflow_data_loading(*args, user: User, individual_ids: list[int], s
_check_dag_running_state(LOADING_PIPELINE_DAG_NAME)
_update_variables(updated_variables, LOADING_PIPELINE_DAG_NAME)
_wait_for_dag_variable_update_via_tasks(updated_variables['projects_to_run'], LOADING_PIPELINE_DAG_NAME)
_trigger_dag(LOADING_PIPELINE_DAG_NAME)
_trigger_dag(LOADING_PIPELINE_DAG_NAME, config_params)
except Exception as e:
logger_call = logger.warning if isinstance(e, DagRunningException) else logger.error
logger_call(str(e), user)
_send_slack_msg_on_failure_trigger(e, updated_variables, error_message)
success = False

if success or success_slack_channel != SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL:
_send_load_data_slack_msg([success_message] + upload_info, success_slack_channel, updated_variables)
_send_load_data_slack_msg([success_message] + upload_info, success_slack_channel, {**updated_variables, **config_params})
return success


Expand Down Expand Up @@ -124,9 +124,9 @@ def _get_variables(dag_name: str):
return json.loads(airflow_response['value'])


def _trigger_dag(dag_name: str):
def _trigger_dag(dag_name: str, config_params: dict = None):
endpoint = f'dags/{dag_name}/dagRuns'
_make_airflow_api_request(endpoint, method='POST', json={})
_make_airflow_api_request(endpoint, method='POST', json={'conf': config_params or {}})


def _make_airflow_api_request(endpoint, method='GET', timeout=90, **kwargs):
Expand Down
17 changes: 10 additions & 7 deletions seqr/views/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ def set_dag_trigger_error_response(self, status=200):
'state': 'running'}
]})

def assert_airflow_loading_calls(self, trigger_error=False, additional_tasks_check=False, dataset_type=None, offset=0, **kwargs):
def assert_airflow_loading_calls(self, trigger_error=False, additional_tasks_check=False, dataset_type=None, offset=0, skip_validation=False, **kwargs):
call_count = 5
if additional_tasks_check:
call_count = 6
Expand All @@ -665,18 +665,21 @@ def assert_airflow_loading_calls(self, trigger_error=False, additional_tasks_che
'reference_genome': dag_variable_overrides.get('reference_genome', 'GRCh38'),
'callset_path': f'gs://test_bucket/{dag_variable_overrides["callset_path"]}',
'sample_type': dag_variable_overrides['sample_type'],
'sample_source': dag_variable_overrides['sample_source']
}
if dag_variable_overrides.get('skip_validation'):
dag_variables['skip_validation'] = True
dag_variables['sample_source'] = dag_variable_overrides['sample_source']
self._assert_airflow_calls(dag_variables, call_count, offset=offset)
config_params = {
'ignore_missing_samples_when_remapping': False,
'skip_check_sex_and_relatedness': False,
'skip_validation': skip_validation
}
self._assert_airflow_calls(dag_variables, call_count, config_params, offset=offset)

def _assert_call_counts(self, call_count):
self.mock_airflow_logger.info.assert_not_called()
self.assertEqual(len(responses.calls), call_count + self.ADDITIONAL_REQUEST_COUNT)
self.assertEqual(self.mock_authorized_session.call_count, call_count)

def _assert_airflow_calls(self, dag_variables, call_count, offset=0):
def _assert_airflow_calls(self, dag_variables, call_count, config_params, offset=0):
self._assert_dag_running_state_calls(offset)

if call_count < 2:
Expand All @@ -689,7 +692,7 @@ def _assert_airflow_calls(self, dag_variables, call_count, offset=0):
# trigger dag
self.assertEqual(responses.calls[offset+call_cnt].request.url, f'{self._dag_url}/dagRuns')
self.assertEqual(responses.calls[offset+call_cnt].request.method, 'POST')
self.assertDictEqual(json.loads(responses.calls[offset+call_cnt].request.body), {})
self.assertDictEqual(json.loads(responses.calls[offset+call_cnt].request.body), {'conf': config_params})

self.mock_airflow_logger.warning.assert_not_called()
self.mock_airflow_logger.error.assert_not_called()
Expand Down
12 changes: 12 additions & 0 deletions ui/pages/DataManagement/components/LoadData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ const CALLSET_PAGE_FIELDS = [
component: InlineToggle,
asFormInput: true,
},
{
name: 'skipSRChecks',
label: 'Skip Sex and Relatedness Checks',
component: InlineToggle,
asFormInput: true,
},
{
name: 'ignoreMissingRemapSamples',
label: 'Ignore Missing Samples When Remapping',
component: InlineToggle,
asFormInput: true,
},
{
...GENOME_VERSION_FIELD,
component: ButtonRadioGroup,
Expand Down
Loading