Skip to content

Commit

Permalink
improving latest version check errors, comments
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanDietzMorris committed Dec 19, 2024
1 parent 2fa5d8c commit e5b33ab
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 38 deletions.
57 changes: 29 additions & 28 deletions Common/build_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from xxhash import xxh64_hexdigest
from collections import defaultdict
from Common.biolink_utils import BiolinkInformationResources, INFORES_STATUS_INVALID, INFORES_STATUS_DEPRECATED
from Common.utils import LoggingUtil, quick_jsonl_file_iterator
from Common.utils import LoggingUtil, quick_jsonl_file_iterator, GetDataPullError
from Common.data_sources import get_available_data_sources
from Common.exceptions import DataVersionError
from Common.load_manager import SourceDataManager
from Common.kgx_file_merger import KGXFileMerger
from Common.neo4j_tools import create_neo4j_dump
Expand All @@ -25,12 +26,6 @@
REDUNDANT_EDGES_FILENAME = 'redundant_edges.jsonl'


class GraphSpecError(Exception):
def __init__(self, error_message: str, actual_error: Exception = None):
self.error_message = error_message
self.actual_error = actual_error


class GraphBuilder:

def __init__(self):
Expand All @@ -39,14 +34,15 @@ def __init__(self):
line_format='medium',
log_file_path=os.environ['ORION_LOGS'])

# This dictionary determines the graph versions of already parsed graphs.
# This is more temperamental than it seems because the only way to get
# the version name for many subgraphs is to download it.
# This dictionary holds the versions of graphs from the graph spec.
# This is more temperamental than it seems because the only way to get the current version for many sources
# is to retrieve them online. Graph versions are generated from underlying data source versions, so if versions
# are not explicitly specified in the graph spec, they may need to be retrieved.
self.graph_id_to_version = {}

self.graphs_dir = self.init_graphs_dir() # path to the graphs output directory
self.source_data_manager = SourceDataManager() # access to the data sources and their metadata
self.graph_specs = self.load_graph_specs() # list of graphs to build (GraphSpec objects)
self.graph_specs = self.load_graph_specs() # list of potential graphs to build (GraphSpec objects)
self.build_results = {}

def build_graph(self, graph_id: str):
Expand Down Expand Up @@ -147,11 +143,16 @@ def build_graph(self, graph_id: str):
def get_graph_version(self, graph_id: str) -> str:
if graph_id not in self.graph_id_to_version:
graph_spec = self.get_graph_spec(graph_id)
if not graph_spec:
raise GraphSpecError(error_message=f'Tried to dynamically determine the version of a '
f'graph that was not found in the Graph Spec.')
graph_version = self.generate_graph_version(graph_spec)
self.graph_id_to_version[graph_id] = graph_version
if graph_spec is not None:
if graph_spec.graph_version is None:
try:
graph_spec.graph_version = self.generate_graph_version(graph_spec)
except (GetDataPullError, DataVersionError) as e:
raise GraphSpecError(error_message=e.error_message)
self.graph_id_to_version[graph_id] = graph_spec.graph_version
else:
raise GraphSpecError(error_message=f'Tried to determine the version for a '
f'graph that was not found in the Graph Spec: {graph_id}.')
return self.graph_id_to_version[graph_id]

def build_dependencies(self, graph_spec: GraphSpec):
Expand All @@ -160,7 +161,7 @@ def build_dependencies(self, graph_spec: GraphSpec):
subgraph_version = subgraph_source.version
# Get the subgraph version from the subgraph source spec,
# which will either be one specified in the graph spec or None.
if not subgraph_version:
if subgraph_version is None:
try:
# if one was not specified, retrieve or generate it like we would any graph version
subgraph_version = self.get_graph_version(subgraph_id)
Expand Down Expand Up @@ -190,7 +191,7 @@ def build_dependencies(self, graph_spec: GraphSpec):

graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version)
if graph_metadata.get_build_status() == Metadata.STABLE:
# we found the sub graph and it's stable - update the GraphSource in preparation for building the graph
# we found the subgraph and it's stable - update the GraphSource in preparation for building the graph
subgraph_dir = self.get_graph_dir_path(subgraph_id, subgraph_version)
subgraph_nodes_path = self.get_graph_nodes_file_path(subgraph_dir)
subgraph_edges_path = self.get_graph_edges_file_path(subgraph_dir)
Expand Down Expand Up @@ -438,16 +439,6 @@ def parse_data_source_spec(self, source_yml):
f'Valid sources are: {", ".join(get_available_data_sources())}')
raise Exception(error_message)

# The DataSource() will get initialized with either a specific source version, if specified,
# or a callable function which can determine the latest source version. This is for a lazy initialization
# technique, so that we don't call get_latest_source_version until we need to, if at all.
if 'source_version' not in source_yml or source_yml['source_version'] == 'latest':
get_source_version = self.source_data_manager.get_latest_source_version
source_version = None
else:
source_version = str(source_yml['source_version'])
get_source_version = None

parsing_version = source_yml['parsing_version'] if 'parsing_version' in source_yml \
else self.source_data_manager.get_latest_parsing_version(source_id)
merge_strategy = source_yml['merge_strategy'] if 'merge_strategy' in source_yml else 'default'
Expand All @@ -469,6 +460,16 @@ def parse_data_source_spec(self, source_yml):
strict=strict_normalization,
conflation=conflation)
supplementation_version = SequenceVariantSupplementation.SUPPLEMENTATION_VERSION

# The DataSource() will get initialized with either a specific source version, if specified,
# or a callable function which can determine the latest source version. This is for a lazy initialization
# technique, so that we don't call get_latest_source_version until we need to, if at all.
if 'source_version' not in source_yml or source_yml['source_version'] == 'latest':
get_source_version = self.source_data_manager.get_latest_source_version
source_version = None
else:
source_version = str(source_yml['source_version'])
get_source_version = None
data_source = DataSource(id=source_id,
source_version=source_version,
get_source_version=get_source_version,
Expand Down
17 changes: 17 additions & 0 deletions Common/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@


class GraphSpecError(Exception):
def __init__(self, error_message: str, actual_error: Exception = None):
self.error_message = error_message
self.actual_error = actual_error

def __str__(self):
return self.error_message


class DataVersionError(Exception):
def __init__(self, error_message: str):
self.error_message = error_message

def __str__(self):
return self.error_message
21 changes: 11 additions & 10 deletions Common/load_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import argparse
import datetime
import time
from collections import defaultdict

from Common.data_sources import SourceDataLoaderClassFactory, RESOURCE_HOGS, get_available_data_sources
from Common.exceptions import DataVersionError
from Common.utils import LoggingUtil, GetDataPullError
from Common.kgx_file_normalizer import KGXFileNormalizer
from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, NormalizationFailedError
Expand Down Expand Up @@ -124,7 +126,7 @@ def run_fetch_stage(self, source_id: str, source_version: str):
self.logger.info(f"Fetching source data for {source_id} (version: {source_version})...")
return self.fetch_source(source_id, source_version=source_version)

def get_latest_source_version(self, source_id: str, retries: int=0):
def get_latest_source_version(self, source_id: str, retries: int = 0):
if source_id in self.latest_source_version_lookup:
return self.latest_source_version_lookup[source_id]

Expand All @@ -136,19 +138,18 @@ def get_latest_source_version(self, source_id: str, retries: int=0):
self.latest_source_version_lookup[source_id] = latest_source_version
return latest_source_version
except GetDataPullError as failed_error:
self.logger.error(
f"Error while checking for latest source version for {source_id}: {failed_error.error_message}")
error_message = f"Error while checking for latest source version for {source_id}: " \
f"{failed_error.error_message}"
self.logger.error(error_message)
if retries < 2:
time.sleep(3)
return self.get_latest_source_version(source_id, retries=retries+1)
else:
# TODO what should we do here?
# no great place to write an error in metadata because metadata is specific to source versions
# source_metadata.set_version_checking_error(failed_error.error_message)
return None
raise DataVersionError(error_message=error_message)
except Exception as e:
self.logger.error(
f"Error while checking for latest source version for {source_id}: {repr(e)}-{str(e)}")
return None
error_message = f"Error while checking for latest source version for {source_id}: {repr(e)}-{str(e)}"
self.logger.error(error_message)
raise DataVersionError(error_message=error_message)

def fetch_source(self, source_id: str, source_version: str='latest', retries: int=0):

Expand Down

0 comments on commit e5b33ab

Please sign in to comment.