diff --git a/lib/gear/metadata.py b/lib/gear/metadata.py index 339bd733..3bf97395 100755 --- a/lib/gear/metadata.py +++ b/lib/gear/metadata.py @@ -256,7 +256,10 @@ def get_field_value(self, field=None): """ Accessor for field attributes in the metadata dataframe. """ - fv = self.metadata.loc[field, 'value'] + if not field: + fv = self.metadata.loc[:, 'value'] + else: + fv = self.metadata.loc[field, 'value'] if isinstance(fv, dict): if 'value' in fv: fv = fv['value'] @@ -317,7 +320,8 @@ def save_to_mysql(self, status=None, is_public=0): pubmed_id = pubmed_ids.pop() if len(pubmed_ids): - ldesc += "
Additional Pubmed IDS: {0}".format(', '.join(pubmed_ids)) + pubmed_ids_string = ', '.join(pubmed_ids) + ldesc = f'{ldesc}
Additional Pubmed IDS: {pubmed_ids_string}' platform_id = get_value_from_df(df, 'platform_id') instrument_model = get_value_from_df(df, 'instrument_model') diff --git a/www/api/resources/submission.py b/www/api/resources/submission.py index 142f251a..44205f91 100644 --- a/www/api/resources/submission.py +++ b/www/api/resources/submission.py @@ -196,15 +196,14 @@ async def import_dataset(s_dataset): print(str(e), file=sys.stderr) return result - result = import_result # should have "success" = True in here + result.update(import_result) # should have "success" = True in here # Let's save the display to the submission layout while we are at it - result = add_display_to_layout.add_display_to_layout(session_id, result['share_id'], result['display_id'], 12, 1) + result.update(add_display_to_layout.add_display_to_layout(session_id, result['share_id'], result['display_id'], 12, 1)) if not result["success"]: raise Exception("Write H5AD step failed") result["filetype"] = result["dataset"]["filetype"] - result["dataset_id"] = dataset_id return result diff --git a/www/api/resources/submission_dataset.py b/www/api/resources/submission_dataset.py index 241080ca..d1a27ba0 100644 --- a/www/api/resources/submission_dataset.py +++ b/www/api/resources/submission_dataset.py @@ -80,7 +80,7 @@ def get_submission_dataset(dataset_id) -> geardb.SubmissionDataset: abort(404, message=f"Submission dataset id {dataset_id} does not exist.") return s_dataset -def save_submission_dataset(dataset_id, identifier, is_restricted): +def save_submission_dataset(dataset_id, identifier, is_restricted) -> geardb.SubmissionDataset: # Dataset is a foreign key in SubmissionDataset so we need to ensure we do not duplicate dataset = geardb.get_dataset_by_id(dataset_id) if not dataset: @@ -131,7 +131,7 @@ def submission_dataset_callback(dataset_id, metadata, session_id, url_path, acti if action == "make_display": try: - result = make_display.make_default_display(dataset_id, session_id, category, gene) + result.update(make_display.make_default_display(dataset_id, session_id, category, gene)) result["self"] = url_path if not result["success"]: raise Exception("Make UMAP step failed") @@ -147,33 +147,30 @@ def submission_dataset_callback(dataset_id, metadata, session_id, url_path, acti #NOTE: Each of the CGI scripts will control loading/failed/complete status of their process try: - result = pull_nemoarchive_metadata(s_dataset, metadata["identifier"]) + result.update(pull_nemoarchive_metadata(s_dataset, metadata["identifier"])) if not result["success"]: raise Exception("Could not pull metadata from NeMO Archive API") dataset_mdata = result["metadata"].get("dataset") db_step = "pulled_to_vm_status" # step name in database if should_step_run(s_dataset, db_step): - # Component = file format type - component_files = dataset_mdata["component_fields"] - for component in component_files: - bucket_path = dataset_mdata[component] - result = pull_from_gcp.pull_gcp_files_to_vm(bucket_path, dataset_id) - if not result["success"]: - raise Exception("Pull GCP Files step failed") + bucket_path = dataset_mdata["bucket_path"] + result.update(pull_from_gcp.pull_gcp_files_to_vm(bucket_path, dataset_id)) + if not result["success"]: + raise Exception("Pull GCP Files step failed") ### db_step = "convert_to_h5ad_status" if should_step_run(s_dataset, db_step): filetype = dataset_mdata["filetype"] - result = write_h5ad.run_write_h5ad(dataset_id, filetype) + result.update(write_h5ad.run_write_h5ad(dataset_id, filetype)) if not result["success"]: raise Exception("Write H5AD step failed") ### db_step = "make_umap_status" if should_step_run(s_dataset, db_step): - result = make_display.make_default_display(dataset_id, session_id, category, gene) + result.update(make_display.make_default_display(dataset_id, session_id, category, gene)) if not result["success"]: raise Exception("Make UMAP step failed") @@ -221,6 +218,10 @@ def pull_nemoarchive_metadata(s_dataset, nemo_id) -> dict: s_dataset.save_change(attribute="log_message", value=api_file_result["error"]) return result + if not "access" in api_file_result or api_file_result["access"] != "open": + s_dataset.save_change(attribute="log_message", value="File is not open access. Cannot import file at this time.") + return result + sample_identifier = api_file_result["sample"] if not sample_identifier: s_dataset.save_change(attribute="log_message", value="No sample identifier found in NeMO Archive API. Cannot get sample metadata.") @@ -403,6 +404,16 @@ def process_nemo_assets_api_file_result(api_result): dataset_metadata["reference_annot_id"] = None #dataset_metadata["reference_annot_id"] = get_reference_annot_id(connection, nemo_id) + # get GCP bucket path + manifest_file_urls = api_result["manifest_file_urls"] + if manifest_file_urls: + # find the entry where protocol is "gcp" + gcp_manifest = next((entry for entry in manifest_file_urls if entry["protocol"] == "gcp"), None) + if gcp_manifest: + dataset_metadata["bucket_path"] = gcp_manifest["file_location"] + else: + dataset_metadata["bucket_path"] = None + return dataset_metadata def process_nemo_assets_api_sample_result(api_result): @@ -578,9 +589,9 @@ def _on_response(channel, method_frame, properties, body): while not task_finished: pass print("[x] sending payload response for submission_dataset {} back to client".format(dataset_id), file=sys.stderr) - result = response + result.update(response) else: - result = submission_dataset_callback(dataset_id, metadata, session_id, url_path, action, category, gene) + result.update(submission_dataset_callback(dataset_id, metadata, session_id, url_path, action, category, gene)) if not result["success"]: print(result.get("message", "Something went wrong."), file=sys.stderr) diff --git a/www/cgi/nemoarchive_pull_gcp_files_to_vm.cgi b/www/cgi/nemoarchive_pull_gcp_files_to_vm.cgi index 50bd3bee..20adafea 100755 --- a/www/cgi/nemoarchive_pull_gcp_files_to_vm.cgi +++ b/www/cgi/nemoarchive_pull_gcp_files_to_vm.cgi @@ -1,6 +1,7 @@ #!/opt/bin/python3 -# nemoarchive_pull_gcp_files_to_vm.cgi - Run gsutils to pull datasets from a NeMO Archive GCP bucket into the NeMO Analytics VM +# nemoarchive_pull_gcp_files_to_vm.cgi - Run gsutils to pull an archived dataset from a NeMO Archive GCP bucket into the NeMO Analytics VM +# After the files are pulled, the filenames are extracted and returned to the client import cgi import json @@ -53,14 +54,26 @@ def download_blob(bucket_name, source_blob_name, destination_file_name): # using `Bucket.blob` is preferred here. blob = bucket.blob(source_blob_name) blob.download_to_filename(destination_file_name) - #success_dict["message"] = "Downloaded storage object {} from bucket {} to local file {}.".format( - # source_blob_name, bucket_name, destination_file_name - #) return destination_file_name except Exception as e: print(str(e), file=sys.stderr) raise +def extract_filenames(filename, dest_dir): + # untar the file and save the filenames to result + if filename.endswith(".tar.gz"): + import tarfile + with tarfile.open(filename, "r:gz") as tar: + tar.extractall(path=dest_dir) + return tar.getnames() + elif filename.endswith(".zip"): + import zipfile + with zipfile.ZipFile(filename, 'r') as zip_ref: + zip_ref.extractall(dest_dir) + return zip_ref.namelist() + else: + return [filename] + def pull_gcp_files_to_vm(bucket_path, dataset_id): s_dataset = geardb.get_submission_dataset_by_dataset_id(dataset_id) if not s_dataset: @@ -72,9 +85,10 @@ def pull_gcp_files_to_vm(bucket_path, dataset_id): dest_dir = Path(UPLOAD_BASE_DIR).joinpath(dataset_id) dest_dir.mkdir(exist_ok=True) dest_filename = Path(source_blob_name).name - result = {"success": False, "filename":""} + result = {"success": False, "filenames":[]} try: - result["filename"] = download_blob(BUCKET_NAME, source_blob_name, str(dest_dir.joinpath(dest_filename))) + filename = download_blob(BUCKET_NAME, source_blob_name, str(dest_dir.joinpath(dest_filename))) + result["filenames"] = extract_filenames(filename, dest_dir) result["success"] = True # Update status in dataset s_dataset.save_change(attribute=DB_STEP, value="completed") diff --git a/www/cgi/nemoarchive_validate_metadata.cgi b/www/cgi/nemoarchive_validate_metadata.cgi index 75c8e75c..3e4b47f1 100755 --- a/www/cgi/nemoarchive_validate_metadata.cgi +++ b/www/cgi/nemoarchive_validate_metadata.cgi @@ -2,9 +2,10 @@ # nemoarchive_validate_metadata.cgi - Write metadata to JSON file +import shutil, gzip import json import logging -import os, subprocess, sys +import sys from pathlib import Path gear_root = Path(__file__).resolve().parents[2] # web-root dir @@ -46,6 +47,14 @@ def setup_logger(): logger = setup_logger() +def extract_gz_file(gz_file: str) -> str: + """Extracts a .gz file and returns the path to the extracted file.""" + extracted_file = gz_file.replace(".gz", "") + with gzip.open(gz_file, 'rb') as f_in: + with open(extracted_file, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + return extracted_file + def get_ensembl_release(gene_file, organism_id): """Given the list of genes and the organism, determine best ensemble release to use for this dataset.""" @@ -56,34 +65,40 @@ def get_ensembl_release(gene_file, organism_id): return find_best_ensembl_release_match(gene_file, organism_id, silent=True) -def get_genes_file_path(base_dir:Path, file_format): +def get_genes_file_path(base_dir: Path, file_format: str) -> Path: """Grab list of genes depending on file format. Returns filepath. Accepts Ensembl ID as first column too.""" - - # This is working under the assumption that only one of the files in the base_dir matches - if file_format.lower() == "mex": - # I'm bad with glob patterns - mex_features_file = list(base_dir.glob(r"features.tsv*")) - mex_genes_file = list(base_dir.glob(r"genes.tsv*")) - genes_file = str([*mex_features_file, *mex_genes_file][0]) # One of these should match - if genes_file.endswith(".gz"): - import shutil, gzip - gunzip_file = genes_file.replace(".gz", "") - with gzip.open(genes_file, 'rb') as f_in: - with open(gunzip_file, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - genes_file = gunzip_file - return base_dir.joinpath(genes_file) - if file_format.lower() == "tabcounts": - genes_file = list(base_dir.glob(r"*(genes|ROWmeta).tab"))[0] - return base_dir.joinpath(genes_file) - if file_format.lower() == "h5ad": - h5ad_file = list(base_dir.glob(r"*\.h5ad"))[0] - import anndata - adata = anndata.read(base_dir.joinpath(h5ad_file)) - genes_file = base_dir.joinpath("h5ad_genes.tsv") - adata.var.to_csv(genes_file, sep="\t") - return genes_file - raise Exception("File format {} not supported".format(file_format)) + file_format = file_format.lower() + if file_format == "mex": + return get_mex_genes_file(base_dir) + elif file_format == "tabcounts": + return get_tabcounts_genes_file(base_dir) + elif file_format == "h5ad": + return get_h5ad_genes_file(base_dir) + else: + raise Exception(f"File format {file_format} not supported") + +def get_mex_genes_file(base_dir: Path) -> Path: + """Finds and returns the MEX genes file path.""" + mex_features_file = list(base_dir.glob(r"features.tsv*")) + mex_genes_file = list(base_dir.glob(r"genes.tsv*")) + genes_file = str([*mex_features_file, *mex_genes_file][0]) # One of these should match + if genes_file.endswith(".gz"): + genes_file = extract_gz_file(genes_file) + return base_dir.joinpath(genes_file) + +def get_tabcounts_genes_file(base_dir: Path) -> Path: + """Finds and returns the TabCounts genes file path.""" + genes_file = list(base_dir.glob(r"*(genes|ROWmeta).tab"))[0] + return base_dir.joinpath(genes_file) + +def get_h5ad_genes_file(base_dir: Path) -> Path: + """Finds and returns the H5AD genes file path.""" + import anndata + h5ad_file = list(base_dir.glob(r"*.h5ad"))[0] + adata = anndata.read(base_dir.joinpath(h5ad_file)) + genes_file = base_dir.joinpath("h5ad_genes.tsv") + adata.var.to_csv(genes_file, sep="\t") + return genes_file def organism_to_taxon_id(org): # Returns a gear-related mapping, or None if not encountered @@ -215,7 +230,7 @@ def validate_metadata(dataset_id, session_id, attributes): # Dataset type json_attributes["field"].append("dataset_type") - tissue_type = attributes["sample"]["tissue_type"] + tissue_type = attributes["dataset"]["tissue_type"] dataset_type = tissue_type_to_dataset_type(tissue_type) # ATAC-Seq has it's own metadata datatype if "ATAC-seq".lower() in attributes["dataset"]["technique"].lower(): @@ -228,7 +243,7 @@ def validate_metadata(dataset_id, session_id, attributes): # Organism json_attributes["field"].append("sample_taxid") - organism = attributes["sample"]["sample_organism"] + organism = attributes["dataset"]["organism"] taxon_id = organism_to_taxon_id(organism) if not taxon_id: err_msg = "Could not find taxon ID for organism {}".format(organism)