diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index a030d7c70..273962787 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -34,7 +34,6 @@ SourceConnectorNotConfigured, ) from workflow_manager.endpoint_v2.models import WorkflowEndpoint -from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.execution import WorkflowExecutionServiceHelper from workflow_manager.workflow_v2.file_history_helper import FileHistoryHelper from workflow_manager.workflow_v2.models.workflow import Workflow @@ -597,14 +596,11 @@ def _copy_file_to_destination( # Update the seek position seek_position += len(chunk) - def add_file_to_volume( - self, input_file_path: str, workflow_file_execution: WorkflowFileExecution - ) -> str: + def add_file_to_volume(self, input_file_path: str, file_hash: FileHash) -> str: """Add input file to execution directory. Args: input_file_path (str): source file - workflow_file_execution: WorkflowFileExecution model Raises: InvalidSource: _description_ @@ -618,20 +614,18 @@ def add_file_to_volume( file_content_hash = self.add_input_from_connector_to_volume( input_file_path=input_file_path, ) - if file_content_hash != workflow_file_execution.file_hash: + if file_content_hash != file_hash.file_hash: raise FileHashMismatched() elif connection_type == WorkflowEndpoint.ConnectionType.API: self.add_input_from_api_storage_to_volume(input_file_path=input_file_path) - if file_name != workflow_file_execution.file_name: + if file_name != file_hash.file_name: raise FileHashNotFound() - file_content_hash = workflow_file_execution.file_hash + file_content_hash = file_hash.file_hash else: raise InvalidSourceConnectionType() self.add_metadata_to_volume( - input_file_path=input_file_path, - file_execution_id=workflow_file_execution.id, - source_hash=file_content_hash, + input_file_path=input_file_path, source_hash=file_content_hash ) return file_name diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 1afba1c03..ab72eab64 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -231,14 +231,14 @@ def _process_file( workflow_file_execution: WorkflowFileExecution, ) -> Optional[str]: error: Optional[str] = None - # Multiple run_ids are linked to an execution_id - # Each run_id corresponds to workflow runs for a single file - # It should e uuid of workflow_file_execution - file_execution_id = str(workflow_file_execution.id) file_name = source.add_file_to_volume( - input_file_path=input_file, workflow_file_execution=workflow_file_execution + input_file_path=input_file, file_hash=file_hash ) try: + # Multiple run_ids are linked to an execution_id + # Each run_id corresponds to workflow runs for a single file + # It should e uuid of workflow_file_execution + file_execution_id = str(workflow_file_execution.id) execution_service.file_execution_id = file_execution_id execution_service.initiate_tool_execution( current_file_idx, total_files, file_name, single_step diff --git a/tools/classifier/requirements.txt b/tools/classifier/requirements.txt index c65022c39..625789cc3 100644 --- a/tools/classifier/requirements.txt +++ b/tools/classifier/requirements.txt @@ -1,6 +1,6 @@ # Add your dependencies here # Required for all unstract tools -unstract-sdk~=v0.55.0rc2 +unstract-sdk~=0.54.0rc12 # Required for remote storage support s3fs[boto3]==2024.6.0 diff --git a/tools/classifier/src/config/properties.json b/tools/classifier/src/config/properties.json index 90a89371d..358cd044a 100644 --- a/tools/classifier/src/config/properties.json +++ b/tools/classifier/src/config/properties.json @@ -2,7 +2,7 @@ "schemaVersion": "0.0.1", "displayName": "File Classifier", "functionName": "classify", - "toolVersion": "0.0.45", + "toolVersion": "0.0.44", "description": "Classifies a file into a bin based on its contents", "input": { "description": "File to be classified" diff --git a/tools/classifier/src/helper.py b/tools/classifier/src/helper.py index 139017937..52a034344 100644 --- a/tools/classifier/src/helper.py +++ b/tools/classifier/src/helper.py @@ -4,7 +4,7 @@ from typing import Any, Optional from unstract.sdk.cache import ToolCache -from unstract.sdk.constants import LogLevel, MetadataKey, ToolEnv, UsageKwargs +from unstract.sdk.constants import LogLevel, MetadataKey, ToolEnv from unstract.sdk.llm import LLM from unstract.sdk.tool.base import BaseTool from unstract.sdk.utils import ToolUtils @@ -139,13 +139,7 @@ def _extract_from_adapter(self, file: str, adapter_id: str) -> Optional[str]: self.tool.stream_log( f"Creating text extraction adapter using adapter_id: {adapter_id}" ) - usage_kwargs: dict[Any, Any] = dict() - usage_kwargs[UsageKwargs.FILE_NAME] = self.tool.source_file_name - usage_kwargs[UsageKwargs.RUN_ID] = self.tool.file_execution_id - - x2text = X2Text( - tool=self.tool, adapter_instance_id=adapter_id, usage_kwargs=usage_kwargs - ) + x2text = X2Text(tool=self.tool, adapter_instance_id=adapter_id) self.tool.stream_log("Text extraction adapter has been created successfully.") diff --git a/tools/classifier/src/main.py b/tools/classifier/src/main.py index e3176420d..5d12a379e 100644 --- a/tools/classifier/src/main.py +++ b/tools/classifier/src/main.py @@ -3,13 +3,7 @@ from helper import ClassifierHelper # type: ignore from helper import ReservedBins -from unstract.sdk.constants import ( - LogLevel, - LogState, - MetadataKey, - ToolSettingsKey, - UsageKwargs, -) +from unstract.sdk.constants import LogLevel, LogState, MetadataKey, ToolSettingsKey from unstract.sdk.exceptions import SdkError from unstract.sdk.llm import LLM from unstract.sdk.tool.base import BaseTool @@ -86,10 +80,8 @@ def run( bins_with_quotes = [f"'{b}'" for b in bins] usage_kwargs: dict[Any, Any] = dict() - usage_kwargs[UsageKwargs.WORKFLOW_ID] = self.workflow_id - usage_kwargs[UsageKwargs.EXECUTION_ID] = self.execution_id - usage_kwargs[UsageKwargs.FILE_NAME] = self.source_file_name - usage_kwargs[UsageKwargs.RUN_ID] = self.file_execution_id + usage_kwargs["workflow_id"] = self.workflow_id + usage_kwargs["execution_id"] = self.execution_id try: llm = LLM( diff --git a/tools/structure/requirements.txt b/tools/structure/requirements.txt index c65022c39..625789cc3 100644 --- a/tools/structure/requirements.txt +++ b/tools/structure/requirements.txt @@ -1,6 +1,6 @@ # Add your dependencies here # Required for all unstract tools -unstract-sdk~=v0.55.0rc2 +unstract-sdk~=0.54.0rc12 # Required for remote storage support s3fs[boto3]==2024.6.0 diff --git a/tools/structure/src/config/properties.json b/tools/structure/src/config/properties.json index fcabcede2..4186df487 100644 --- a/tools/structure/src/config/properties.json +++ b/tools/structure/src/config/properties.json @@ -2,7 +2,7 @@ "schemaVersion": "0.0.1", "displayName": "Structure Tool", "functionName": "structure_tool", - "toolVersion": "0.0.56", + "toolVersion": "0.0.55", "description": "This is a template tool which can answer set of input prompts designed in the Prompt Studio", "input": { "description": "File that needs to be indexed and parsed for answers" diff --git a/tools/structure/src/main.py b/tools/structure/src/main.py index a420486a1..8918365f4 100644 --- a/tools/structure/src/main.py +++ b/tools/structure/src/main.py @@ -6,12 +6,13 @@ from typing import Any, Callable, Optional from constants import SettingsKeys # type: ignore [attr-defined] -from unstract.sdk.constants import LogLevel, LogState, MetadataKey, ToolEnv, UsageKwargs +from unstract.sdk.constants import LogLevel, LogState, MetadataKey, ToolEnv from unstract.sdk.index import Index from unstract.sdk.prompt import PromptTool from unstract.sdk.tool.base import BaseTool from unstract.sdk.tool.entrypoint import ToolEntrypoint from unstract.sdk.utils import ToolUtils +from unstract.sdk.utils.common_utils import CommonUtils from utils import json_to_markdown logger = logging.getLogger(__name__) @@ -45,6 +46,7 @@ def run( ) challenge_llm: str = settings.get(SettingsKeys.CHALLENGE_LLM_ADAPTER_ID, "") enable_highlight: bool = settings.get(SettingsKeys.ENABLE_HIGHLIGHT, False) + source_file_name = self.get_exec_metadata.get(MetadataKey.SOURCE_NAME) responder: PromptTool = PromptTool( tool=self, prompt_port=self.get_env_or_die(SettingsKeys.PROMPT_PORT), @@ -72,8 +74,8 @@ def run( f"## Loaded '{ps_project_name}'\n{json_to_markdown(tool_metadata)}\n" ) output_log = ( - f"## Processing '{self.source_file_name}'\nThis might take a while and " - "involve...\n- Extracting text\n- Indexing\n- Retrieving answers " + f"## Processing '{source_file_name}'\nThis might take a while and involve" + "...\n- Extracting text\n- Indexing\n- Retrieving answers " f"for possibly '{total_prompt_count}' prompts" ) self.stream_update(input_log, state=LogState.INPUT_UPDATE) @@ -101,17 +103,17 @@ def run( execution_run_data_folder = Path( self.get_env_or_die(SettingsKeys.EXECUTION_RUN_DATA_FOLDER) ) - + run_id = CommonUtils.generate_uuid() index = Index( tool=self, - run_id=self.file_execution_id, + run_id=run_id, capture_metrics=True, ) index_metrics = {} extracted_input_file = str(execution_run_data_folder / SettingsKeys.EXTRACT) # TODO : Resolve and pass log events ID payload = { - SettingsKeys.RUN_ID: self.file_execution_id, + SettingsKeys.RUN_ID: run_id, SettingsKeys.TOOL_SETTINGS: tool_settings, SettingsKeys.OUTPUTS: outputs, SettingsKeys.TOOL_ID: tool_id, @@ -122,10 +124,10 @@ def run( } # TODO: Need to split extraction and indexing # to avoid unwanted indexing - self.stream_log(f"Indexing document '{self.source_file_name}'") + self.stream_log(f"Indexing document '{source_file_name}'") usage_kwargs: dict[Any, Any] = dict() - usage_kwargs[UsageKwargs.RUN_ID] = self.file_execution_id - usage_kwargs[UsageKwargs.FILE_NAME] = self.source_file_name + usage_kwargs[SettingsKeys.RUN_ID] = run_id + usage_kwargs[SettingsKeys.FILE_NAME] = source_file_name process_text: Optional[Callable[[str], str]] = None try: @@ -248,7 +250,7 @@ def run( if SettingsKeys.METADATA in structured_output_dict: structured_output_dict[SettingsKeys.METADATA][ SettingsKeys.FILE_NAME - ] = self.source_file_name + ] = source_file_name if not summarize_as_source: metadata = structured_output_dict[SettingsKeys.METADATA] @@ -283,7 +285,8 @@ def run( # Write the translated text to output file try: self.stream_log("Writing parsed output...") - output_path = Path(output_dir) / f"{Path(self.source_file_name).stem}.json" + source_name = self.get_exec_metadata.get(MetadataKey.SOURCE_NAME) + output_path = Path(output_dir) / f"{Path(source_name).stem}.json" if self.workflow_filestorage: self.workflow_filestorage.json_dump( path=output_path, data=structured_output_dict @@ -326,7 +329,7 @@ def _summarize_and_index( vector_db_instance_id: str = tool_settings[SettingsKeys.VECTOR_DB] x2text_instance_id: str = tool_settings[SettingsKeys.X2TEXT_ADAPTER] summarize_prompt: str = tool_settings[SettingsKeys.SUMMARIZE_PROMPT] - run_id: str = usage_kwargs.get(UsageKwargs.RUN_ID) + run_id: str = usage_kwargs.get(SettingsKeys.RUN_ID) extract_file_path = tool_data_dir / SettingsKeys.EXTRACT summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE diff --git a/tools/text_extractor/requirements.txt b/tools/text_extractor/requirements.txt index c65022c39..625789cc3 100644 --- a/tools/text_extractor/requirements.txt +++ b/tools/text_extractor/requirements.txt @@ -1,6 +1,6 @@ # Add your dependencies here # Required for all unstract tools -unstract-sdk~=v0.55.0rc2 +unstract-sdk~=0.54.0rc12 # Required for remote storage support s3fs[boto3]==2024.6.0 diff --git a/tools/text_extractor/src/config/properties.json b/tools/text_extractor/src/config/properties.json index 32293c88b..09718a7ef 100644 --- a/tools/text_extractor/src/config/properties.json +++ b/tools/text_extractor/src/config/properties.json @@ -2,7 +2,7 @@ "schemaVersion": "0.0.1", "displayName": "Text Extractor", "functionName": "text_extractor", - "toolVersion": "0.0.43", + "toolVersion": "0.0.42", "description": "The Text Extractor is a powerful tool designed to convert documents to its text form or Extract texts from documents", "input": { "description": "Document" diff --git a/tools/text_extractor/src/main.py b/tools/text_extractor/src/main.py index ab51a2198..fe43a38ff 100644 --- a/tools/text_extractor/src/main.py +++ b/tools/text_extractor/src/main.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Any -from unstract.sdk.constants import LogState, UsageKwargs +from unstract.sdk.constants import LogState, MetadataKey from unstract.sdk.tool.base import BaseTool from unstract.sdk.tool.entrypoint import ToolEntrypoint from unstract.sdk.x2txt import TextExtractionResult, X2Text @@ -44,23 +44,18 @@ def run( None """ text_extraction_adapter_id = settings["extractorId"] + source_name = self.get_exec_metadata.get(MetadataKey.SOURCE_NAME) self.stream_log( f"Extractor ID: {text_extraction_adapter_id} " "has been retrieved from settings." ) - input_log = f"Processing file: \n\n`{self.source_file_name}`" + input_log = f"Processing file: \n\n`{source_name}`" self.stream_update(input_log, state=LogState.INPUT_UPDATE) - usage_kwargs: dict[Any, Any] = dict() - usage_kwargs[UsageKwargs.RUN_ID] = self.file_execution_id - usage_kwargs[UsageKwargs.FILE_NAME] = self.source_file_name - text_extraction_adapter = X2Text( - tool=self, - adapter_instance_id=text_extraction_adapter_id, - usage_kwargs=usage_kwargs, + tool=self, adapter_instance_id=text_extraction_adapter_id ) self.stream_log("Text extraction adapter has been created successfully.") if self.workflow_filestorage: @@ -81,10 +76,8 @@ def run( try: self.stream_log("Preparing to write the extracted text.") - if self.source_file_name: - output_path = ( - Path(output_dir) / f"{Path(self.source_file_name).stem}.txt" - ) + if source_name: + output_path = Path(output_dir) / f"{Path(source_name).stem}.txt" if self.workflow_filestorage: self.workflow_filestorage.write( path=output_path, mode="w", data=extracted_text @@ -96,7 +89,7 @@ def run( self.stream_log("Tool output written successfully.") else: self.stream_error_and_exit( - "Error creating/writing output file: source_file_name not found" + "Error creating/writing output file: source_name not found" ) except Exception as e: self.stream_error_and_exit(f"Error creating/writing output file: {e}") diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/constants.py b/unstract/workflow-execution/src/unstract/workflow_execution/constants.py index af767f356..e570a3f00 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/constants.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/constants.py @@ -40,7 +40,6 @@ class MetaDataKey: SOURCE_HASH = "source_hash" WORKFLOW_ID = "workflow_id" EXECUTION_ID = "execution_id" - FILE_EXECUTION_ID = "file_execution_id" ORGANIZATION_ID = "organization_id" TOOL_METADATA = "tool_metadata" diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py b/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py index e9d565cf1..29ae9662b 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py @@ -103,9 +103,7 @@ def get_last_tool_metadata(self, metadata: dict[str, Any]) -> dict[str, Any]: raise ToolMetadataNotFound() return tool_metadata[-1] - def add_metadata_to_volume( - self, input_file_path: str, file_execution_id: str, source_hash: str - ) -> None: + def add_metadata_to_volume(self, input_file_path: str, source_hash: str) -> None: """Creating metadata for workflow. This method is responsible for creating metadata for the workflow. It takes the input file path and the source hash as parameters. The metadata is stored in a JSON file in @@ -113,7 +111,6 @@ def add_metadata_to_volume( Parameters: input_file_path (str): The path of the input file. - file_execution_id (str): Unique execution id for the file. source_hash (str): The hash value of the source/input file. Returns: @@ -130,7 +127,6 @@ def add_metadata_to_volume( MetaDataKey.ORGANIZATION_ID: str(self.organization_id), MetaDataKey.WORKFLOW_ID: str(self.workflow_id), MetaDataKey.EXECUTION_ID: str(self.execution_id), - MetaDataKey.FILE_EXECUTION_ID: str(file_execution_id), } if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)