Skip to content

Commit

Permalink
fixing pool size and making backoff factor slower for node norm
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanDietzMorris committed Jul 24, 2024
1 parent 3302b5a commit 1613f62
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions Common/normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,19 @@ def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list:
"""
This method calls the NodeNormalization web service and normalizes a list of nodes.
:param node_list: A list of nodes to normalize
:param node_list: A list of unique nodes to normalize
:param batch_size: the number of curies to be sent to NodeNormalization at once
:return:
"""
# init the cache - this accumulates all the results from the node norm service
cached_node_norms: dict = {}

# make a list of the node ids
# look up all valid biolink node types if needed
# this is used when strict normalization is off to ensure only valid types go into the graph as NODE_TYPES
if not self.strict_normalization and not self.biolink_compliant_node_types:
biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version)
self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types()

# make a list of the node ids, we used to deduplicate here, but now we expect the list to be unique ids
to_normalize: list = [node['id'] for node in node_list]

# use indexes and slice to grab batch_size sized chunks of ids from the list
Expand All @@ -164,26 +168,22 @@ def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list:
# hit the node norm api with the chunks of curies in parallel
# we could try to optimize the number of max_workers for ThreadPoolExecutor more specifically,
# by default python attempts to find a reasonable # based on os.cpu_count()
node_normalization_results: dict = {}
with ThreadPoolExecutor() as executor:
normalization_results = list(executor.map(self.hit_node_norm_service, chunks_of_ids))
for normalization_json, ids in zip(normalization_results, chunks_of_ids):
if not normalization_json:
self.logger.error(f'Normalization json results missing for ids:{ids}')
self.logger.error(f'!!! Normalization json results missing for ids: {ids}')
else:
# merge the normalization results into one dictionary
cached_node_norms.update(**normalization_json)
node_normalization_results.update(**normalization_json)

# reset the node index
node_idx = 0

# node ids that failed to normalize
failed_to_normalize: list = []

# look up valid node types if needed
if not self.strict_normalization and not self.biolink_compliant_node_types:
biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version)
self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types()

# for each node update the node with normalized information
# store the normalized IDs in self.node_normalization_lookup for later look up
while node_idx < len(node_list):
Expand Down Expand Up @@ -228,7 +228,7 @@ def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list:
current_node[NODE_TYPES] = list(set(current_node[NODE_TYPES]))

# did we get a response from the normalizer
current_node_normalization = cached_node_norms[current_node_id]
current_node_normalization = node_normalization_results[current_node_id]
if current_node_normalization is not None:

current_node_id_section = current_node_normalization['id']
Expand Down Expand Up @@ -354,10 +354,10 @@ def get_current_node_norm_version(self):

@staticmethod
def get_normalization_requests_session():
pool_maxsize = min(os.cpu_count(), 10)
pool_maxsize = max(os.cpu_count(), 10)
s = requests.Session()
retries = Retry(total=10,
backoff_factor=.5,
retries = Retry(total=8,
backoff_factor=1,
status_forcelist=[502, 503, 504, 403, 429])
s.mount('https://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize))
s.mount('http://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize))
Expand Down

0 comments on commit 1613f62

Please sign in to comment.