Skip to content

Commit

Permalink
Merge pull request #880 from roboflow/feature/add-delta-filter-block
Browse files Browse the repository at this point in the history
Add delta filter block, controlling workflow flow based on value delta
  • Loading branch information
grzegorz-roboflow authored Dec 13, 2024
2 parents 129c319 + 177b3b3 commit cbbf7f3
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 0 deletions.
Empty file.
100 changes: 100 additions & 0 deletions inference/core/workflows/core_steps/flow_control/delta_filter/v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from typing import Any, Dict, List, Literal, Optional, Type

from pydantic import ConfigDict, Field

from inference.core.workflows.execution_engine.entities.base import (
OutputDefinition,
WorkflowImageData,
)
from inference.core.workflows.execution_engine.entities.types import (
WILDCARD_KIND,
Selector,
StepSelector,
WorkflowImageSelector,
)
from inference.core.workflows.execution_engine.v1.entities import FlowControl
from inference.core.workflows.prototypes.block import (
BlockResult,
WorkflowBlock,
WorkflowBlockManifest,
)

SHORT_DESCRIPTION: str = (
"Allow the execution of workflow to proceed if the input value has changed."
)
LONG_DESCRIPTION: str = """
The Delta Filter is a flow control block that triggers workflow steps only when an input value changes.
It avoids redundant processing and optimizes system efficiency.
+----------------+ (value changes) +----------------+
| Previous Value | -----------------------> | Next Steps |
+----------------+ +----------------+
Key Features:
Change Detection: Tracks input values and only proceeds when a change is detected.
Dynamic Value Support: Handles various input types (e.g., numbers, strings).
Context-Aware Caching: Tracks changes on a per-video basis using video_identifier.
Usage Instructions:
Input Configuration: Set "Input Value" to reference the value to monitor (e.g., counter).
Next Steps Setup: Define steps to execute on value change.
Example Use Case:
A video analysis workflow counts people in the zone. When the count changes, Delta Filter triggers downstream steps (e.g., setting variable in OPC), minimizing redundant processing.
"""


class DeltaFilterManifest(WorkflowBlockManifest):
type: Literal["roboflow_core/delta_filter@v1"]
model_config = ConfigDict(
json_schema_extra={
"name": "Delta Filter",
"version": "v1",
"short_description": SHORT_DESCRIPTION,
"long_description": LONG_DESCRIPTION,
"license": "Apache-2.0",
"block_type": "flow_control",
}
)
image: WorkflowImageSelector
value: Selector(kind=[WILDCARD_KIND]) = Field(
title="Input Value",
description="The input value for this step. Flow will be allowed to continue only if this value changes between frames.",
examples=["$steps.line_counter.count_in"],
)
next_steps: List[StepSelector] = Field(
description="Reference to steps which shall be executed when value changes.",
examples=["$steps.write_to_csv", "$steps.write_to_opc"],
)

@classmethod
def describe_outputs(cls) -> List[OutputDefinition]:
return []

@classmethod
def get_execution_engine_compatibility(cls) -> Optional[str]:
return ">=1.4.0,<2.0.0"


class DeltaFilterBlockV1(WorkflowBlock):
def __init__(self):
self.cache: Dict[str, Any] = {}

@classmethod
def get_manifest(cls) -> Type[DeltaFilterManifest]:
return DeltaFilterManifest

def run(
self,
image: WorkflowImageData,
value: Any,
next_steps: List[StepSelector],
) -> BlockResult:
metadata = image.video_metadata
video_identifier = metadata.video_identifier
if self.cache.get(video_identifier) != value:
self.cache[video_identifier] = value
return FlowControl(mode="select_step", context=next_steps)
return FlowControl(mode="terminate_branch")
4 changes: 4 additions & 0 deletions inference/core/workflows/core_steps/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@
from inference.core.workflows.core_steps.flow_control.continue_if.v1 import (
ContinueIfBlockV1,
)
from inference.core.workflows.core_steps.flow_control.delta_filter.v1 import (
DeltaFilterBlockV1,
)
from inference.core.workflows.core_steps.flow_control.rate_limiter.v1 import (
RateLimiterBlockV1,
)
Expand Down Expand Up @@ -467,6 +470,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]:
ContinueIfBlockV1,
RateLimiterBlockV1,
PerspectiveCorrectionBlockV1,
DeltaFilterBlockV1,
DynamicZonesBlockV1,
SizeMeasurementBlockV1,
DetectionsClassesReplacementBlockV1,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import os.path
from glob import glob

import numpy as np
import pandas as pd

from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS
from inference.core.managers.base import ModelManager
from inference.core.workflows.core_steps.common.entities import StepExecutionMode
from inference.core.workflows.execution_engine.core import ExecutionEngine
from tests.workflows.integration_tests.execution.workflows_gallery_collector.decorators import (
add_to_workflows_gallery,
)

WORKFLOW_WITH_DELTA_FILTER = {
"version": "1.0",
"inputs": [
{"type": "WorkflowImage", "name": "image"},
{"type": "WorkflowParameter", "name": "target_directory"},
{
"type": "WorkflowParameter",
"name": "model_id",
"default_value": "yolov8n-640",
},
],
"steps": [
{
"type": "roboflow_core/roboflow_object_detection_model@v2",
"name": "model",
"images": "$inputs.image",
"model_id": "$inputs.model_id",
},
{
"type": "roboflow_core/detections_filter@v1",
"name": "detections_filter",
"predictions": "$steps.model.predictions",
"operations": [
{
"type": "DetectionsFilter",
"filter_operation": {
"type": "StatementGroup",
"operator": "and",
"statements": [
{
"type": "BinaryStatement",
"left_operand": {
"type": "DynamicOperand",
"operand_name": "_",
"operations": [
{
"type": "ExtractDetectionProperty",
"property_name": "center",
}
],
},
"comparator": {"type": "(Detection) in zone"},
"right_operand": {
"type": "StaticOperand",
"value": [
[0, 0],
[0, 1000],
[1000, 1000],
[1000, 0],
],
},
"negate": False,
}
],
},
}
],
"operations_parameters": {},
},
{
"type": "roboflow_core/property_definition@v1",
"name": "property_definition",
"data": "$steps.detections_filter.predictions",
"operations": [{"type": "SequenceLength"}],
},
{
"type": "roboflow_core/delta_filter@v1",
"name": "delta_filter",
"value": "$steps.property_definition.output",
"image": "$inputs.image",
"next_steps": [
"$steps.csv_formatter",
],
},
{
"type": "roboflow_core/csv_formatter@v1",
"name": "csv_formatter",
"columns_data": {"Class Name": "$steps.detections_filter.predictions"},
"columns_operations": {
"Class Name": [
{"type": "DetectionsPropertyExtract", "property_name": "class_name"}
]
},
},
{
"type": "roboflow_core/local_file_sink@v1",
"name": "reports_sink",
"content": "$steps.csv_formatter.csv_content",
"file_type": "csv",
"output_mode": "append_log",
"target_directory": "$inputs.target_directory",
"file_name_prefix": "csv_containing_changes",
},
],
"outputs": [
{
"type": "JsonField",
"name": "csv",
"coordinates_system": "own",
"selector": "$steps.csv_formatter.csv_content",
}
],
}


@add_to_workflows_gallery(
category="Filtering resulting data based on value delta change",
use_case_title="Saving Workflow results into file, but only if value changes between frames",
use_case_description="""
This Workflow was created to achieve few ends:
* getting predictions from object detection model
* filtering out predictions found outside of zone
* counting detections in zone
* if count of detection in zone changes save results to csv file
!!! warning "Run on video to produce *meaningful* results"
This workflow will not work using the docs preview. You must run it on video file.
Copy the template into your Roboflow app, start `inference` server and use video preview
to get the results.
""",
workflow_definition=WORKFLOW_WITH_DELTA_FILTER,
workflow_name_in_app="file-sink-for-data-aggregation",
)
def test_workflow_with_delta_filter(
model_manager: ModelManager,
dogs_image: np.ndarray,
crowd_image: np.ndarray,
empty_directory: str,
) -> None:
# given
workflow_init_parameters = {
"workflows_core.model_manager": model_manager,
"workflows_core.api_key": None,
"workflows_core.step_execution_mode": StepExecutionMode.LOCAL,
}
execution_engine = ExecutionEngine.init(
workflow_definition=WORKFLOW_WITH_DELTA_FILTER,
init_parameters=workflow_init_parameters,
max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS,
)

# when
execution_engine.run(
runtime_parameters={
"image": [crowd_image],
"target_directory": empty_directory,
}
)
execution_engine.run(
runtime_parameters={
"image": [dogs_image],
"target_directory": empty_directory,
}
)
execution_engine.run(
runtime_parameters={
"image": [crowd_image],
"target_directory": empty_directory,
}
)
execution_engine.run(
runtime_parameters={
"image": [crowd_image],
"target_directory": empty_directory,
}
)
# trigger aggregated file flush
del execution_engine

# then
reports_files = glob(os.path.join(empty_directory, "csv_containing_changes_*.csv"))
assert len(reports_files) == 1, "Expected one report file"
report = pd.read_csv(reports_files[0])
assert len(report) == 3, "Expected 3 rows in report"

0 comments on commit cbbf7f3

Please sign in to comment.