Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add changes required to effectively index content of batch processing #942

Merged
merged 4 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion inference/core/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.33.0"
__version__ = "0.34.0rc1"


if __name__ == "__main__":
Expand Down
91 changes: 83 additions & 8 deletions inference_cli/lib/workflows/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import os.path
import re
from collections import defaultdict
from copy import copy
from datetime import datetime
from functools import lru_cache
from threading import Lock
Expand All @@ -14,7 +16,13 @@
from rich.progress import track

from inference_cli.lib.utils import dump_json, dump_jsonl, read_json
from inference_cli.lib.workflows.entities import OutputFileType
from inference_cli.lib.workflows.entities import (
ImagePath,
ImageResultsIndexEntry,
OutputFileType,
WorkflowExecutionMetadataResultPath,
WorkflowOutputField,
)

BASE64_DATA_TYPE_PATTERN = re.compile(r"^data:image\/[a-z]+;base64,")

Expand Down Expand Up @@ -74,10 +82,10 @@ def get_progress_log_path(output_directory: str) -> str:

def dump_image_processing_results(
result: Dict[str, Any],
image_path: str,
image_path: ImagePath,
output_directory: str,
save_image_outputs: bool,
) -> None:
) -> ImageResultsIndexEntry:
images_in_result = []
if save_image_outputs:
images_in_result = extract_images_from_result(result=result)
Expand All @@ -92,21 +100,33 @@ def dump_image_processing_results(
path=structured_results_path,
content=structured_content,
)
dump_images_outputs(
image_outputs = dump_images_outputs(
image_results_dir=image_results_dir,
images_in_result=images_in_result,
)
return ImageResultsIndexEntry(
metadata_output_path=structured_results_path,
image_outputs=image_outputs,
)


def dump_images_outputs(
image_results_dir: str,
images_in_result: List[Tuple[str, np.ndarray]],
) -> None:
) -> Dict[WorkflowOutputField, List[ImagePath]]:
result = defaultdict(list)
for image_key, image in images_in_result:
target_path = os.path.join(image_results_dir, f"{image_key}.jpg")
target_path_dir = os.path.dirname(target_path)
os.makedirs(target_path_dir, exist_ok=True)
cv2.imwrite(target_path, image)
workflow_field = _extract_workflow_field_from_image_key(image_key=image_key)
result[workflow_field].append(target_path)
return result


def _extract_workflow_field_from_image_key(image_key: str) -> WorkflowOutputField:
return image_key.split("/")[0]


def construct_image_output_dir_path(image_path: str, output_directory: str) -> str:
Expand Down Expand Up @@ -203,7 +223,7 @@ def _is_file_system_case_sensitive() -> bool:

def report_failed_files(
failed_files: List[Tuple[str, str]], output_directory: str
) -> None:
) -> Optional[str]:
if not failed_files:
return None
os.makedirs(output_directory, exist_ok=True)
Expand All @@ -216,12 +236,13 @@ def report_failed_files(
print(
f"Detected {len(failed_files)} processing failures. Details saved under: {failed_files_path}"
)
return failed_files_path


def aggregate_batch_processing_results(
output_directory: str,
aggregation_format: OutputFileType,
) -> None:
) -> str:
file_descriptor, all_processed_files = open_progress_log(
output_directory=output_directory
)
Expand All @@ -247,7 +268,7 @@ def aggregate_batch_processing_results(
decoded_content, description="Dumping aggregated results to JSONL..."
),
)
return None
return aggregated_results_path
dumped_results = []
for decoded_result in track(
decoded_content, description="Dumping aggregated results to CSV..."
Expand All @@ -258,6 +279,7 @@ def aggregate_batch_processing_results(
data_frame = pd.DataFrame(dumped_results)
aggregated_results_path = os.path.join(output_directory, "aggregated_results.csv")
data_frame.to_csv(aggregated_results_path, index=False)
return aggregated_results_path


def dump_objects_to_json(value: Any) -> Any:
Expand All @@ -266,3 +288,56 @@ def dump_objects_to_json(value: Any) -> Any:
if isinstance(value, list) or isinstance(value, dict) or isinstance(value, set):
return json.dumps(value)
return value


class WorkflowsImagesProcessingIndex:

@classmethod
def init(cls) -> "WorkflowsImagesProcessingIndex":
return cls(index_content={}, registered_output_images=set())

def __init__(
self,
index_content: Dict[ImagePath, ImageResultsIndexEntry],
registered_output_images: Set[WorkflowOutputField],
):
self._index_content = index_content
self._registered_output_images = registered_output_images

@property
def registered_output_images(self) -> Set[WorkflowOutputField]:
return copy(self._registered_output_images)

def collect_entry(
self, image_path: ImagePath, entry: ImageResultsIndexEntry
) -> None:
self._index_content[image_path] = entry
for image_output_name in entry.image_outputs.keys():
self._registered_output_images.add(image_output_name)

def export_metadata(
self,
) -> List[Tuple[ImagePath, WorkflowExecutionMetadataResultPath]]:
return [
(image_path, index_entry.metadata_output_path)
for image_path, index_entry in self._index_content.items()
]

def export_images(
self,
) -> Dict[WorkflowOutputField, List[Tuple[ImagePath, List[ImagePath]]]]:
result = {}
for field_name in self._registered_output_images:
result[field_name] = self.export_images_for_field(field_name=field_name)
return result

def export_images_for_field(
self, field_name: WorkflowOutputField
) -> List[Tuple[ImagePath, List[ImagePath]]]:
results = []
for image_path, index_entry in self._index_content.items():
if field_name not in index_entry.image_outputs:
continue
registered_images = index_entry.image_outputs[field_name]
results.append((image_path, registered_images))
return results
4 changes: 2 additions & 2 deletions inference_cli/lib/workflows/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def process_images_directory_with_workflow(
process_image_directory_with_workflow_using_inference_package,
)

process_image_directory_with_workflow_using_inference_package(
_ = process_image_directory_with_workflow_using_inference_package(
input_directory=input_directory,
output_directory=output_directory,
workflow_specification=workflow_specification,
Expand All @@ -138,7 +138,7 @@ def process_images_directory_with_workflow(
debug_mode=debug_mode,
)
return None
process_image_directory_with_workflow_using_api(
_ = process_image_directory_with_workflow_using_api(
input_directory=input_directory,
output_directory=output_directory,
workflow_specification=workflow_specification,
Expand Down
25 changes: 25 additions & 0 deletions inference_cli/lib/workflows/entities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Tuple

ImagePath = str
WorkflowExecutionMetadataResultPath = str
WorkflowOutputField = str


class OutputFileType(str, Enum):
Expand All @@ -9,3 +15,22 @@ class OutputFileType(str, Enum):
class ProcessingTarget(str, Enum):
API = "api"
INFERENCE_PACKAGE = "inference_package"


@dataclass(frozen=True)
class ImageResultsIndexEntry:
metadata_output_path: WorkflowExecutionMetadataResultPath
image_outputs: Dict[WorkflowOutputField, List[ImagePath]]


@dataclass(frozen=True)
class ImagesDirectoryProcessingDetails:
output_directory: str
processed_images: int
failures: int
result_metadata_paths: List[Tuple[ImagePath, WorkflowExecutionMetadataResultPath]]
result_images_paths: Dict[
WorkflowOutputField, List[Tuple[ImagePath, List[ImagePath]]]
]
aggregated_results_path: Optional[str] = field(default=None)
failures_report_path: Optional[str] = field(default=None)
Loading
Loading