Skip to content

Commit

Permalink
Revert "passing file execution id from backend to tools (#1065)"
Browse files Browse the repository at this point in the history
This reverts commit 650c328.
  • Loading branch information
gaya3-zipstack authored Jan 16, 2025
1 parent 6b020e6 commit f14d31d
Show file tree
Hide file tree
Showing 14 changed files with 44 additions and 73 deletions.
16 changes: 5 additions & 11 deletions backend/workflow_manager/endpoint_v2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
SourceConnectorNotConfigured,
)
from workflow_manager.endpoint_v2.models import WorkflowEndpoint
from workflow_manager.file_execution.models import WorkflowFileExecution
from workflow_manager.workflow_v2.execution import WorkflowExecutionServiceHelper
from workflow_manager.workflow_v2.file_history_helper import FileHistoryHelper
from workflow_manager.workflow_v2.models.workflow import Workflow
Expand Down Expand Up @@ -597,14 +596,11 @@ def _copy_file_to_destination(
# Update the seek position
seek_position += len(chunk)

def add_file_to_volume(
self, input_file_path: str, workflow_file_execution: WorkflowFileExecution
) -> str:
def add_file_to_volume(self, input_file_path: str, file_hash: FileHash) -> str:
"""Add input file to execution directory.
Args:
input_file_path (str): source file
workflow_file_execution: WorkflowFileExecution model
Raises:
InvalidSource: _description_
Expand All @@ -618,20 +614,18 @@ def add_file_to_volume(
file_content_hash = self.add_input_from_connector_to_volume(
input_file_path=input_file_path,
)
if file_content_hash != workflow_file_execution.file_hash:
if file_content_hash != file_hash.file_hash:
raise FileHashMismatched()
elif connection_type == WorkflowEndpoint.ConnectionType.API:
self.add_input_from_api_storage_to_volume(input_file_path=input_file_path)
if file_name != workflow_file_execution.file_name:
if file_name != file_hash.file_name:
raise FileHashNotFound()
file_content_hash = workflow_file_execution.file_hash
file_content_hash = file_hash.file_hash
else:
raise InvalidSourceConnectionType()

self.add_metadata_to_volume(
input_file_path=input_file_path,
file_execution_id=workflow_file_execution.id,
source_hash=file_content_hash,
input_file_path=input_file_path, source_hash=file_content_hash
)
return file_name

Expand Down
10 changes: 5 additions & 5 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ def _process_file(
workflow_file_execution: WorkflowFileExecution,
) -> Optional[str]:
error: Optional[str] = None
# Multiple run_ids are linked to an execution_id
# Each run_id corresponds to workflow runs for a single file
# It should e uuid of workflow_file_execution
file_execution_id = str(workflow_file_execution.id)
file_name = source.add_file_to_volume(
input_file_path=input_file, workflow_file_execution=workflow_file_execution
input_file_path=input_file, file_hash=file_hash
)
try:
# Multiple run_ids are linked to an execution_id
# Each run_id corresponds to workflow runs for a single file
# It should e uuid of workflow_file_execution
file_execution_id = str(workflow_file_execution.id)
execution_service.file_execution_id = file_execution_id
execution_service.initiate_tool_execution(
current_file_idx, total_files, file_name, single_step
Expand Down
2 changes: 1 addition & 1 deletion tools/classifier/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Add your dependencies here

# Required for all unstract tools
unstract-sdk~=v0.55.0rc2
unstract-sdk~=0.54.0rc12
# Required for remote storage support
s3fs[boto3]==2024.6.0
2 changes: 1 addition & 1 deletion tools/classifier/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "File Classifier",
"functionName": "classify",
"toolVersion": "0.0.45",
"toolVersion": "0.0.44",
"description": "Classifies a file into a bin based on its contents",
"input": {
"description": "File to be classified"
Expand Down
10 changes: 2 additions & 8 deletions tools/classifier/src/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any, Optional

from unstract.sdk.cache import ToolCache
from unstract.sdk.constants import LogLevel, MetadataKey, ToolEnv, UsageKwargs
from unstract.sdk.constants import LogLevel, MetadataKey, ToolEnv
from unstract.sdk.llm import LLM
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.utils import ToolUtils
Expand Down Expand Up @@ -139,13 +139,7 @@ def _extract_from_adapter(self, file: str, adapter_id: str) -> Optional[str]:
self.tool.stream_log(
f"Creating text extraction adapter using adapter_id: {adapter_id}"
)
usage_kwargs: dict[Any, Any] = dict()
usage_kwargs[UsageKwargs.FILE_NAME] = self.tool.source_file_name
usage_kwargs[UsageKwargs.RUN_ID] = self.tool.file_execution_id

x2text = X2Text(
tool=self.tool, adapter_instance_id=adapter_id, usage_kwargs=usage_kwargs
)
x2text = X2Text(tool=self.tool, adapter_instance_id=adapter_id)

self.tool.stream_log("Text extraction adapter has been created successfully.")

Expand Down
14 changes: 3 additions & 11 deletions tools/classifier/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@

from helper import ClassifierHelper # type: ignore
from helper import ReservedBins
from unstract.sdk.constants import (
LogLevel,
LogState,
MetadataKey,
ToolSettingsKey,
UsageKwargs,
)
from unstract.sdk.constants import LogLevel, LogState, MetadataKey, ToolSettingsKey
from unstract.sdk.exceptions import SdkError
from unstract.sdk.llm import LLM
from unstract.sdk.tool.base import BaseTool
Expand Down Expand Up @@ -86,10 +80,8 @@ def run(
bins_with_quotes = [f"'{b}'" for b in bins]

usage_kwargs: dict[Any, Any] = dict()
usage_kwargs[UsageKwargs.WORKFLOW_ID] = self.workflow_id
usage_kwargs[UsageKwargs.EXECUTION_ID] = self.execution_id
usage_kwargs[UsageKwargs.FILE_NAME] = self.source_file_name
usage_kwargs[UsageKwargs.RUN_ID] = self.file_execution_id
usage_kwargs["workflow_id"] = self.workflow_id
usage_kwargs["execution_id"] = self.execution_id

try:
llm = LLM(
Expand Down
2 changes: 1 addition & 1 deletion tools/structure/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Add your dependencies here

# Required for all unstract tools
unstract-sdk~=v0.55.0rc2
unstract-sdk~=0.54.0rc12
# Required for remote storage support
s3fs[boto3]==2024.6.0
2 changes: 1 addition & 1 deletion tools/structure/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "Structure Tool",
"functionName": "structure_tool",
"toolVersion": "0.0.56",
"toolVersion": "0.0.55",
"description": "This is a template tool which can answer set of input prompts designed in the Prompt Studio",
"input": {
"description": "File that needs to be indexed and parsed for answers"
Expand Down
27 changes: 15 additions & 12 deletions tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from typing import Any, Callable, Optional

from constants import SettingsKeys # type: ignore [attr-defined]
from unstract.sdk.constants import LogLevel, LogState, MetadataKey, ToolEnv, UsageKwargs
from unstract.sdk.constants import LogLevel, LogState, MetadataKey, ToolEnv
from unstract.sdk.index import Index
from unstract.sdk.prompt import PromptTool
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.tool.entrypoint import ToolEntrypoint
from unstract.sdk.utils import ToolUtils
from unstract.sdk.utils.common_utils import CommonUtils
from utils import json_to_markdown

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -45,6 +46,7 @@ def run(
)
challenge_llm: str = settings.get(SettingsKeys.CHALLENGE_LLM_ADAPTER_ID, "")
enable_highlight: bool = settings.get(SettingsKeys.ENABLE_HIGHLIGHT, False)
source_file_name = self.get_exec_metadata.get(MetadataKey.SOURCE_NAME)
responder: PromptTool = PromptTool(
tool=self,
prompt_port=self.get_env_or_die(SettingsKeys.PROMPT_PORT),
Expand Down Expand Up @@ -72,8 +74,8 @@ def run(
f"## Loaded '{ps_project_name}'\n{json_to_markdown(tool_metadata)}\n"
)
output_log = (
f"## Processing '{self.source_file_name}'\nThis might take a while and "
"involve...\n- Extracting text\n- Indexing\n- Retrieving answers "
f"## Processing '{source_file_name}'\nThis might take a while and involve"
"...\n- Extracting text\n- Indexing\n- Retrieving answers "
f"for possibly '{total_prompt_count}' prompts"
)
self.stream_update(input_log, state=LogState.INPUT_UPDATE)
Expand Down Expand Up @@ -101,17 +103,17 @@ def run(
execution_run_data_folder = Path(
self.get_env_or_die(SettingsKeys.EXECUTION_RUN_DATA_FOLDER)
)

run_id = CommonUtils.generate_uuid()
index = Index(
tool=self,
run_id=self.file_execution_id,
run_id=run_id,
capture_metrics=True,
)
index_metrics = {}
extracted_input_file = str(execution_run_data_folder / SettingsKeys.EXTRACT)
# TODO : Resolve and pass log events ID
payload = {
SettingsKeys.RUN_ID: self.file_execution_id,
SettingsKeys.RUN_ID: run_id,
SettingsKeys.TOOL_SETTINGS: tool_settings,
SettingsKeys.OUTPUTS: outputs,
SettingsKeys.TOOL_ID: tool_id,
Expand All @@ -122,10 +124,10 @@ def run(
}
# TODO: Need to split extraction and indexing
# to avoid unwanted indexing
self.stream_log(f"Indexing document '{self.source_file_name}'")
self.stream_log(f"Indexing document '{source_file_name}'")
usage_kwargs: dict[Any, Any] = dict()
usage_kwargs[UsageKwargs.RUN_ID] = self.file_execution_id
usage_kwargs[UsageKwargs.FILE_NAME] = self.source_file_name
usage_kwargs[SettingsKeys.RUN_ID] = run_id
usage_kwargs[SettingsKeys.FILE_NAME] = source_file_name

process_text: Optional[Callable[[str], str]] = None
try:
Expand Down Expand Up @@ -248,7 +250,7 @@ def run(
if SettingsKeys.METADATA in structured_output_dict:
structured_output_dict[SettingsKeys.METADATA][
SettingsKeys.FILE_NAME
] = self.source_file_name
] = source_file_name

if not summarize_as_source:
metadata = structured_output_dict[SettingsKeys.METADATA]
Expand Down Expand Up @@ -283,7 +285,8 @@ def run(
# Write the translated text to output file
try:
self.stream_log("Writing parsed output...")
output_path = Path(output_dir) / f"{Path(self.source_file_name).stem}.json"
source_name = self.get_exec_metadata.get(MetadataKey.SOURCE_NAME)
output_path = Path(output_dir) / f"{Path(source_name).stem}.json"
if self.workflow_filestorage:
self.workflow_filestorage.json_dump(
path=output_path, data=structured_output_dict
Expand Down Expand Up @@ -326,7 +329,7 @@ def _summarize_and_index(
vector_db_instance_id: str = tool_settings[SettingsKeys.VECTOR_DB]
x2text_instance_id: str = tool_settings[SettingsKeys.X2TEXT_ADAPTER]
summarize_prompt: str = tool_settings[SettingsKeys.SUMMARIZE_PROMPT]
run_id: str = usage_kwargs.get(UsageKwargs.RUN_ID)
run_id: str = usage_kwargs.get(SettingsKeys.RUN_ID)
extract_file_path = tool_data_dir / SettingsKeys.EXTRACT
summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE

Expand Down
2 changes: 1 addition & 1 deletion tools/text_extractor/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Add your dependencies here

# Required for all unstract tools
unstract-sdk~=v0.55.0rc2
unstract-sdk~=0.54.0rc12
# Required for remote storage support
s3fs[boto3]==2024.6.0
2 changes: 1 addition & 1 deletion tools/text_extractor/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "Text Extractor",
"functionName": "text_extractor",
"toolVersion": "0.0.43",
"toolVersion": "0.0.42",
"description": "The Text Extractor is a powerful tool designed to convert documents to its text form or Extract texts from documents",
"input": {
"description": "Document"
Expand Down
21 changes: 7 additions & 14 deletions tools/text_extractor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Any

from unstract.sdk.constants import LogState, UsageKwargs
from unstract.sdk.constants import LogState, MetadataKey
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.tool.entrypoint import ToolEntrypoint
from unstract.sdk.x2txt import TextExtractionResult, X2Text
Expand Down Expand Up @@ -44,23 +44,18 @@ def run(
None
"""
text_extraction_adapter_id = settings["extractorId"]
source_name = self.get_exec_metadata.get(MetadataKey.SOURCE_NAME)

self.stream_log(
f"Extractor ID: {text_extraction_adapter_id} "
"has been retrieved from settings."
)

input_log = f"Processing file: \n\n`{self.source_file_name}`"
input_log = f"Processing file: \n\n`{source_name}`"
self.stream_update(input_log, state=LogState.INPUT_UPDATE)

usage_kwargs: dict[Any, Any] = dict()
usage_kwargs[UsageKwargs.RUN_ID] = self.file_execution_id
usage_kwargs[UsageKwargs.FILE_NAME] = self.source_file_name

text_extraction_adapter = X2Text(
tool=self,
adapter_instance_id=text_extraction_adapter_id,
usage_kwargs=usage_kwargs,
tool=self, adapter_instance_id=text_extraction_adapter_id
)
self.stream_log("Text extraction adapter has been created successfully.")
if self.workflow_filestorage:
Expand All @@ -81,10 +76,8 @@ def run(

try:
self.stream_log("Preparing to write the extracted text.")
if self.source_file_name:
output_path = (
Path(output_dir) / f"{Path(self.source_file_name).stem}.txt"
)
if source_name:
output_path = Path(output_dir) / f"{Path(source_name).stem}.txt"
if self.workflow_filestorage:
self.workflow_filestorage.write(
path=output_path, mode="w", data=extracted_text
Expand All @@ -96,7 +89,7 @@ def run(
self.stream_log("Tool output written successfully.")
else:
self.stream_error_and_exit(
"Error creating/writing output file: source_file_name not found"
"Error creating/writing output file: source_name not found"
)
except Exception as e:
self.stream_error_and_exit(f"Error creating/writing output file: {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class MetaDataKey:
SOURCE_HASH = "source_hash"
WORKFLOW_ID = "workflow_id"
EXECUTION_ID = "execution_id"
FILE_EXECUTION_ID = "file_execution_id"
ORGANIZATION_ID = "organization_id"
TOOL_METADATA = "tool_metadata"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,14 @@ def get_last_tool_metadata(self, metadata: dict[str, Any]) -> dict[str, Any]:
raise ToolMetadataNotFound()
return tool_metadata[-1]

def add_metadata_to_volume(
self, input_file_path: str, file_execution_id: str, source_hash: str
) -> None:
def add_metadata_to_volume(self, input_file_path: str, source_hash: str) -> None:
"""Creating metadata for workflow. This method is responsible for
creating metadata for the workflow. It takes the input file path and
the source hash as parameters. The metadata is stored in a JSON file in
the execution directory.
Parameters:
input_file_path (str): The path of the input file.
file_execution_id (str): Unique execution id for the file.
source_hash (str): The hash value of the source/input file.
Returns:
Expand All @@ -130,7 +127,6 @@ def add_metadata_to_volume(
MetaDataKey.ORGANIZATION_ID: str(self.organization_id),
MetaDataKey.WORKFLOW_ID: str(self.workflow_id),
MetaDataKey.EXECUTION_ID: str(self.execution_id),
MetaDataKey.FILE_EXECUTION_ID: str(file_execution_id),
}
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
Expand Down

0 comments on commit f14d31d

Please sign in to comment.