diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py
index fe68b47c25..883498c309 100644
--- a/seqr/management/commands/check_for_new_samples_from_pipeline.py
+++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py
@@ -3,8 +3,6 @@
from django.contrib.postgres.aggregates import ArrayAgg
from django.core.management.base import BaseCommand, CommandError
-from django.db.models import Q
-from django.db.models.functions import JSONObject
import json
import logging
import re
@@ -186,7 +184,7 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers
sample_type = metadata['sample_type']
logger.info(f'Loading {len(sample_project_tuples)} {sample_type} {dataset_type} samples in {len(samples_by_project)} projects')
- updated_samples, inactivated_sample_guids, *args = match_and_update_search_samples(
+ updated_samples, new_samples, *args = match_and_update_search_samples(
projects=samples_by_project.keys(),
sample_project_tuples=sample_project_tuples,
sample_data={'data_source': run_version, 'elasticsearch_index': ';'.join(metadata['callsets'])},
@@ -196,9 +194,9 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers
)
# Send loading notifications and update Airtable PDOs
- update_sample_data_by_project = {
- s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate(
- samples=ArrayAgg(JSONObject(sample_id='sample_id', individual_id='individual_id')),
+ new_sample_data_by_project = {
+ s['individual__family__project']: s for s in updated_samples.filter(id__in=new_samples).values('individual__family__project').annotate(
+ samples=ArrayAgg('sample_id', distinct=True),
family_guids=ArrayAgg('individual__family__guid', distinct=True),
)
}
@@ -207,15 +205,16 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers
split_project_pdos = {}
session = AirtableSession(user=None, no_auth=True)
for project, sample_ids in samples_by_project.items():
- project_sample_data = update_sample_data_by_project[project.id]
+ project_sample_data = new_sample_data_by_project[project.id]
is_internal = not project_has_anvil(project) or is_internal_anvil_project(project)
notify_search_data_loaded(
- project, is_internal, dataset_type, sample_type, inactivated_sample_guids,
- updated_samples=project_sample_data['samples'], num_samples=len(sample_ids),
+ project, is_internal, dataset_type, sample_type, project_sample_data['samples'],
+ num_samples=len(sample_ids),
)
project_families = project_sample_data['family_guids']
- updated_families.update(project_families)
- updated_project_families.append((project.id, project.name, project.genome_version, project_families))
+ if project_families:
+ updated_families.update(project_families)
+ updated_project_families.append((project.id, project.name, project.genome_version, project_families))
if is_internal and dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS:
split_project_pdos[project.name] = cls._update_pdos(session, project.guid, sample_ids)
diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py
index 4b162f9d27..6a88c51760 100644
--- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py
+++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py
@@ -561,8 +561,8 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo
self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request')
self.mock_utils_logger.info.assert_has_calls([
- mock.call('Updated 0 variants for project Test Reprocessed Project'),
- mock.call('Updated 1 variants for project Non-Analyst Project'),
+ mock.call('Updated 0 variants in 1 families for project Test Reprocessed Project'),
+ mock.call('Updated 1 variants in 1 families for project Non-Analyst Project'),
mock.call('Reload Summary: '),
mock.call(' Non-Analyst Project: Updated 1 variants'),
mock.call('Reloading saved variants in 2 projects'),
diff --git a/seqr/management/tests/reload_saved_variant_json_tests.py b/seqr/management/tests/reload_saved_variant_json_tests.py
index 4ceb4314b6..104db53f40 100644
--- a/seqr/management/tests/reload_saved_variant_json_tests.py
+++ b/seqr/management/tests/reload_saved_variant_json_tests.py
@@ -30,7 +30,7 @@ def test_with_param_command(self, mock_get_variants, mock_logger):
[family_1], ['1-46859832-G-A','21-3343353-GAGA-G'], user=None, user_email='manage_command')
logger_info_calls = [
- mock.call('Updated 2 variants for project 1kg project n\xe5me with uni\xe7\xf8de'),
+ mock.call('Updated 2 variants in 1 families for project 1kg project n\xe5me with uni\xe7\xf8de'),
mock.call('Reload Summary: '),
mock.call(' 1kg project n\xe5me with uni\xe7\xf8de: Updated 2 variants')
]
diff --git a/seqr/utils/search/add_data_utils.py b/seqr/utils/search/add_data_utils.py
index 805f176239..d910cb23bf 100644
--- a/seqr/utils/search/add_data_utils.py
+++ b/seqr/utils/search/add_data_utils.py
@@ -39,7 +39,7 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte
request_json['mappingFilePath'], user) if request_json.get('mappingFilePath') else {}
ignore_extra_samples = request_json.get('ignoreExtraSamplesInCallset')
sample_project_tuples = [(sample_id, project.name) for sample_id in sample_ids]
- updated_samples, inactivated_sample_guids, num_skipped, updated_family_guids = match_and_update_search_samples(
+ updated_samples, new_samples, inactivated_sample_guids, num_skipped, updated_family_guids = match_and_update_search_samples(
projects=[project],
user=user,
sample_project_tuples=sample_project_tuples,
@@ -52,45 +52,41 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte
)
if notify:
- updated_sample_data = updated_samples.values('sample_id', 'individual_id')
- _basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_sample_data)
+ _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples.values())
return inactivated_sample_guids, updated_family_guids, updated_samples
-def _basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=None, slack_channel=None, include_slack_detail=False):
- previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True))
- new_sample_ids = [sample['sample_id'] for sample in updated_samples if sample['individual_id'] not in previous_loaded_individuals]
-
+def _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples, email_template=None, slack_channel=None, include_slack_detail=False):
msg_dataset_type = '' if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS else f' {dataset_type}'
- num_new_samples = len(new_sample_ids)
+ num_new_samples = len(new_samples)
sample_summary = f'{num_new_samples} new {sample_type}{msg_dataset_type} samples'
return send_project_notification(
project,
notification=sample_summary,
- email_template=format_email(num_new_samples) if format_email else None,
+ email_template=email_template,
subject='New data available in seqr',
slack_channel=slack_channel,
- slack_detail=', '.join(sorted(new_sample_ids)) if include_slack_detail else None,
+ slack_detail=', '.join(sorted(new_samples)) if include_slack_detail else None,
)
-def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, inactivated_sample_guids, updated_samples, num_samples):
+def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, new_samples, num_samples):
if is_internal:
- format_email = None
+ email_template = None
else:
workspace_name = f'{project.workspace_namespace}/{project.workspace_name}'
- def format_email(num_new_samples):
- reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else ''
- return '\n'.join([
- f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.',
- f'We have loaded {{notification}}{reload_summary} from the AnVIL workspace {workspace_name} to the corresponding seqr project {{project_link}}.',
- 'Let us know if you have any questions.',
- ])
+ num_new_samples = len(new_samples)
+ reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else ''
+ email_template = '\n'.join([
+ f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.',
+ f'We have loaded {{notification}}{reload_summary} from the AnVIL workspace {workspace_name} to the corresponding seqr project {{project_link}}.',
+ 'Let us know if you have any questions.',
+ ])
url = _basic_notify_search_data_loaded(
- project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=format_email,
+ project, dataset_type, sample_type, new_samples, email_template=email_template,
slack_channel=SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL if is_internal else SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL,
include_slack_detail=is_internal,
)
diff --git a/seqr/views/utils/dataset_utils.py b/seqr/views/utils/dataset_utils.py
index f75e93eb00..42882f5dd4 100644
--- a/seqr/views/utils/dataset_utils.py
+++ b/seqr/views/utils/dataset_utils.py
@@ -222,7 +222,10 @@ def match_and_update_search_samples(
Family.bulk_update(
user, {'analysis_status': Family.ANALYSIS_STATUS_ANALYSIS_IN_PROGRESS}, guid__in=family_guids_to_update)
- return updated_samples, inactivated_sample_guids, len(remaining_sample_keys), family_guids_to_update
+ previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True))
+ new_samples = dict(updated_samples.exclude(individual_id__in=previous_loaded_individuals).values_list('id', 'sample_id'))
+
+ return updated_samples, new_samples, inactivated_sample_guids, len(remaining_sample_keys), family_guids_to_update
def _parse_tsv_row(row):
diff --git a/seqr/views/utils/variant_utils.py b/seqr/views/utils/variant_utils.py
index aaf65a129b..8407052685 100644
--- a/seqr/views/utils/variant_utils.py
+++ b/seqr/views/utils/variant_utils.py
@@ -45,7 +45,8 @@ def update_projects_saved_variant_json(projects, user_email, **kwargs):
skipped[project_name] = True
else:
success[project_name] = len(updated_saved_variants)
- logger.info(f'Updated {len(updated_saved_variants)} variants for project {project_name}')
+ family_summary = f' in {len(family_guids)} families' if family_guids else ''
+ logger.info(f'Updated {len(updated_saved_variants)} variants{family_summary} for project {project_name}')
updated_variants_by_id.update({v.variant_id: v.saved_variant_json for v in updated_saved_variants.values()})
except Exception as e:
traceback_message = traceback.format_exc()