diff --git a/Common/normalization.py b/Common/normalization.py index b0e6bf40..2b932052 100644 --- a/Common/normalization.py +++ b/Common/normalization.py @@ -130,15 +130,19 @@ def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list: """ This method calls the NodeNormalization web service and normalizes a list of nodes. - :param node_list: A list of nodes to normalize + :param node_list: A list of unique nodes to normalize :param batch_size: the number of curies to be sent to NodeNormalization at once :return: """ - # init the cache - this accumulates all the results from the node norm service - cached_node_norms: dict = {} - # make a list of the node ids + # look up all valid biolink node types if needed + # this is used when strict normalization is off to ensure only valid types go into the graph as NODE_TYPES + if not self.strict_normalization and not self.biolink_compliant_node_types: + biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version) + self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types() + + # make a list of the node ids, we used to deduplicate here, but now we expect the list to be unique ids to_normalize: list = [node['id'] for node in node_list] # use indexes and slice to grab batch_size sized chunks of ids from the list @@ -164,14 +168,15 @@ def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list: # hit the node norm api with the chunks of curies in parallel # we could try to optimize the number of max_workers for ThreadPoolExecutor more specifically, # by default python attempts to find a reasonable # based on os.cpu_count() + node_normalization_results: dict = {} with ThreadPoolExecutor() as executor: normalization_results = list(executor.map(self.hit_node_norm_service, chunks_of_ids)) for normalization_json, ids in zip(normalization_results, chunks_of_ids): if not normalization_json: - self.logger.error(f'Normalization json results missing for ids:{ids}') + self.logger.error(f'!!! Normalization json results missing for ids: {ids}') else: # merge the normalization results into one dictionary - cached_node_norms.update(**normalization_json) + node_normalization_results.update(**normalization_json) # reset the node index node_idx = 0 @@ -179,11 +184,6 @@ def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list: # node ids that failed to normalize failed_to_normalize: list = [] - # look up valid node types if needed - if not self.strict_normalization and not self.biolink_compliant_node_types: - biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version) - self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types() - # for each node update the node with normalized information # store the normalized IDs in self.node_normalization_lookup for later look up while node_idx < len(node_list): @@ -228,7 +228,7 @@ def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list: current_node[NODE_TYPES] = list(set(current_node[NODE_TYPES])) # did we get a response from the normalizer - current_node_normalization = cached_node_norms[current_node_id] + current_node_normalization = node_normalization_results[current_node_id] if current_node_normalization is not None: current_node_id_section = current_node_normalization['id'] @@ -354,10 +354,10 @@ def get_current_node_norm_version(self): @staticmethod def get_normalization_requests_session(): - pool_maxsize = min(os.cpu_count(), 10) + pool_maxsize = max(os.cpu_count(), 10) s = requests.Session() - retries = Retry(total=10, - backoff_factor=.5, + retries = Retry(total=8, + backoff_factor=1, status_forcelist=[502, 503, 504, 403, 429]) s.mount('https://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize)) s.mount('http://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize))