Skip to content

Commit

Permalink
Modified the DataSource class in kgxmodel.py to make source_version g…
Browse files Browse the repository at this point in the history
…enerated when needed instead of at objection creation time. Modified build_manager.py to get strings which describe graph_version as needed for building the graph and dependant subgraphs and not unnecessarily.
  • Loading branch information
Daniel Korn committed Jul 23, 2024
1 parent e44cae6 commit 7671b8b
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 31 deletions.
72 changes: 43 additions & 29 deletions Common/build_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ def __init__(self):
line_format='medium',
log_file_path=os.environ['ORION_LOGS'])

self.current_graph_versions = {}
#This dictionary determines the graph versions of already parsed graphs.
#This is more tempermental than it seems because the only way to get
#the version name for many subgraphs is to download it.
self.graph_id_to_version = {}

self.graphs_dir = self.init_graphs_dir() # path to the graphs output directory
self.source_data_manager = SourceDataManager() # access to the data sources and their metadata
self.graph_specs = self.load_graph_specs() # list of graphs to build (GraphSpec objects)
Expand All @@ -43,14 +47,15 @@ def build_graph(self, graph_id: str):

self.logger.info(f'Building graph {graph_id}. Checking dependencies...')
graph_spec = self.get_graph_spec(graph_id)
graph_version = self.get_graph_version(graph_id)

if self.build_dependencies(graph_spec):
self.logger.info(f'Building graph {graph_id}. Dependencies are ready...')
else:
self.logger.warning(f'Aborting graph {graph_spec.graph_id}, building dependencies failed.')
return

# check the status for previous builds of this version
graph_version = graph_spec.graph_version
graph_metadata = self.get_graph_metadata(graph_id, graph_version)
build_status = graph_metadata.get_build_status()
if build_status == Metadata.IN_PROGRESS:
Expand Down Expand Up @@ -133,18 +138,27 @@ def build_graph(self, graph_id: str):
redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME)
generate_redundant_kg(edges_filepath, redundant_filepath)

def get_graph_version(self, graph_id:str) -> str:
if(graph_id not in self.graph_id_to_version):
graph_spec = self.get_graph_spec(graph_id)
graph_version = self.generate_graph_version(graph_spec)
self.graph_id_to_version[graph_id] = graph_version
return self.graph_id_to_version[graph_id]


def build_dependencies(self, graph_spec: GraphSpec):
for subgraph_source in graph_spec.subgraphs:
subgraph_id = subgraph_source.id
subgraph_version = subgraph_source.version
subgraph_version = self.get_graph_version(subgraph_id)
if self.check_for_existing_graph_dir(subgraph_id, subgraph_version):
# load previous metadata
graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version)
subgraph_source.graph_metadata = graph_metadata.metadata
elif self.current_graph_versions[subgraph_id] == subgraph_version:
elif self.graphid_to_version_name[subgraph_id] == subgraph_version:
self.logger.warning(f'For graph {graph_spec.graph_id} subgraph dependency '
f'{subgraph_id} version {subgraph_version} is not ready. Building now...')
self.build_graph(subgraph_id)
#With changes here, this line should never run...
else:
self.logger.warning(f'Building graph {graph_spec.graph_id} failed, '
f'subgraph {subgraph_id} had version {subgraph_version} specified, '
Expand All @@ -166,14 +180,15 @@ def build_dependencies(self, graph_spec: GraphSpec):

for data_source in graph_spec.sources:
source_id = data_source.id
source_version = data_source.source_version
if source_id not in get_available_data_sources():
self.logger.warning(
f'Attempting to build graph {graph_spec.graph_id} failed: '
f'{source_id} is not a valid data source id. ')
return False

source_metadata: SourceMetadata = self.source_data_manager.get_source_metadata(source_id,
data_source.source_version)
source_version)
release_version = source_metadata.get_release_version(parsing_version=data_source.parsing_version,
normalization_version=data_source.normalization_scheme.get_composite_normalization_version(),
supplementation_version=data_source.supplementation_version)
Expand All @@ -182,7 +197,7 @@ def build_dependencies(self, graph_spec: GraphSpec):
f'Attempting to build graph {graph_spec.graph_id}, '
f'dependency {source_id} is not ready. Building now...')
release_version = self.source_data_manager.run_pipeline(source_id,
source_version=data_source.source_version,
source_version=source_version,
parsing_version=data_source.parsing_version,
normalization_scheme=data_source.normalization_scheme,
supplementation_version=data_source.supplementation_version)
Expand All @@ -194,7 +209,7 @@ def build_dependencies(self, graph_spec: GraphSpec):
data_source.version = release_version
data_source.release_info = source_metadata.get_release_info(release_version)
data_source.file_paths = self.source_data_manager.get_final_file_paths(source_id,
data_source.source_version,
source_version,
data_source.parsing_version,
data_source.normalization_scheme.get_composite_normalization_version(),
data_source.supplementation_version)
Expand Down Expand Up @@ -369,33 +384,33 @@ def parse_graph_spec(self, graph_spec_yaml):
data_source.normalization_scheme.normalization_code_version = graph_wide_normalization_code_version

graph_output_format = graph_yaml['output_format'] if 'output_format' in graph_yaml else ""
current_graph_spec = GraphSpec(graph_id=graph_id,
graph_spec = GraphSpec(graph_id=graph_id,
graph_name=graph_name,
graph_description=graph_description,
graph_url=graph_url,
graph_version=None, # this will get populated later
graph_output_format=graph_output_format,
subgraphs=subgraph_sources,
sources=data_sources)
graph_version = self.generate_graph_version(current_graph_spec)
current_graph_spec.graph_version = graph_version
self.current_graph_versions[graph_id] = graph_version
graph_specs.append(current_graph_spec)
graph_specs.append(graph_spec)
except Exception as e:
self.logger.error(f'Error parsing Graph Spec ({graph_id}), formatting error or missing information: {repr(e)}')
raise e
return graph_specs

def parse_subgraph_spec(self, subgraph_yml):
subgraph_id = subgraph_yml['graph_id']
subgraph_version = subgraph_yml['graph_version'] if 'graph_version' in subgraph_yml else 'current'
if subgraph_version == 'current':
if subgraph_id in self.current_graph_versions:
subgraph_version = self.current_graph_versions[subgraph_id]
else:
raise Exception(f'Graph Spec Error - Could not determine version of subgraph {subgraph_id}. '
f'Either specify an existing version, already built in your graphs directory, '
f'or the subgraph must be defined previously in the same Graph Spec.')

if 'graph_version' in subgraph_yml: subgraph_version = subgraph_yml['graph_version']
else: subgraph_version= 'latest'

# if subgraph_version == 'current':
# if subgraph_id in self.current_graph_versions:
# subgraph_version = self.current_graph_versions[subgraph_id]
# else:
# raise Exception(f'Graph Spec Error - Could not determine version of subgraph {subgraph_id}. '
# f'Either specify an existing version, already built in your graphs directory, '
# f'or the subgraph must be defined previously in the same Graph Spec.')
merge_strategy = subgraph_yml['merge_strategy'] if 'merge_strategy' in subgraph_yml else 'default'
subgraph_source = SubGraphSource(id=subgraph_id,
version=subgraph_version,
Expand All @@ -410,12 +425,10 @@ def parse_data_source_spec(self, source_yml):
f'Valid sources are: {", ".join(get_available_data_sources())}')
raise Exception(error_message)

source_version = source_yml['source_version'] if 'source_version' in source_yml \
else self.source_data_manager.get_latest_source_version(source_id)
if source_version is None:
# TODO it would be great if we could default to the last stable version already built somehow
error_message = f'Data source {source_id} could not determine the latest version. The service may be down.'
raise Exception(error_message)
if 'source_version' not in source_yml or source_yml['source_version']=='latest':
get_source_version = self.source_data_manager.get_latest_source_version
else:
get_source_version = lambda source_id=None : str(source_yml['source_version'])

parsing_version = source_yml['parsing_version'] if 'parsing_version' in source_yml \
else self.source_data_manager.get_latest_parsing_version(source_id)
Expand All @@ -438,14 +451,15 @@ def parse_data_source_spec(self, source_yml):
strict=strict_normalization,
conflation=conflation)
supplementation_version = SequenceVariantSupplementation.SUPPLEMENTATION_VERSION
graph_source = DataSource(id=source_id,
data_source = DataSource(id=source_id,
version=None, # this will get populated later in build_dependencies
source_version=source_version,
get_source_version=get_source_version,
# source_version=source_version,
merge_strategy=merge_strategy,
normalization_scheme=normalization_scheme,
parsing_version=parsing_version,
supplementation_version=supplementation_version)
return graph_source
return data_source

def get_graph_spec(self, graph_id: str):
for graph_spec in self.graph_specs:
Expand Down
31 changes: 29 additions & 2 deletions Common/kgxmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,45 @@ def get_metadata_representation(self):
'merge_strategy:': self.merge_strategy,
'graph_metadata': self.graph_metadata}

from typing import Callable

@dataclass
class DataSource(GraphSource):
normalization_scheme: NormalizationScheme = None
source_version: str = None
#This function serves as an update to a static variable which used to \
# be used to represent "source_version". This is because there are two
# cases. One is the source version has been set by the graph spec yaml file,
# in which case we have this as a lambda that ignores the paramater and returns that string.
# The other case is when it needs to be dynamically generated based on the information in
# a data set. In that case it will be invoked when needed.
get_source_version: Callable[[str],str] = None
__source_version : str = None
parsing_version: str = None
supplementation_version: str = None
release_info: dict = None


# def __init__(self, normalization_scheme, source_version, parsing_version=None, supplementation_version=None, release_info=None, **kwargs):
# self.normalization_scheme = normalization_scheme
# self.parsing_version = parsing_version
# self.supplementation_version = supplementation_version
# self.release_info = release_info
#def __init__(source_version, **kwargs):
#print(self.source_version)
#breakpoint()
#print(kwargs)
# super().__init__(**kwargs)

def __getattr__(self, name):
if(name=="source_version"):
if(self.__source_version==None):
self.__source_version = self.get_source_version(self.id)
return self.__source_version
else: return object.__getattribute__(self,name)

def get_metadata_representation(self):
metadata = {'source_id': self.id,
'source_version': self.source_version,
'source_version': self.get_source_version(self.id),
'release_version': self.version,
'parsing_version': self.parsing_version,
'supplementation_version': self.supplementation_version,
Expand Down

0 comments on commit 7671b8b

Please sign in to comment.