Skip to content

Commit

Permalink
feat: add new openfoodfacts.ml module (#293)
Browse files Browse the repository at this point in the history
It currently contains code to ru object detection models.
  • Loading branch information
raphael0202 authored Dec 9, 2024
1 parent 8527443 commit 27659fe
Show file tree
Hide file tree
Showing 20 changed files with 1,165 additions and 2 deletions.
170 changes: 170 additions & 0 deletions openfoodfacts/ml/image_classification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import logging
import math
import time
import typing
from typing import Optional

import numpy as np
from PIL import Image, ImageOps
from tritonclient.grpc import service_pb2

from openfoodfacts.ml.triton import (
add_triton_infer_input_tensor,
get_triton_inference_stub,
)

logger = logging.getLogger(__name__)


def classify_transforms(
img: Image.Image,
size: int = 224,
mean: tuple[float, float, float] = (0.0, 0.0, 0.0),
std: tuple[float, float, float] = (1.0, 1.0, 1.0),
interpolation: Image.Resampling = Image.Resampling.BILINEAR,
crop_fraction: float = 1.0,
) -> np.ndarray:
"""
Applies a series of image transformations including resizing, center
cropping, normalization, and conversion to a NumPy array.
Transformation steps is based on the one used in the Ultralytics library:
https://github.com/ultralytics/ultralytics/blob/main/ultralytics/data/augment.py#L2319
:param img: Input Pillow image.
:param size: The target size for the transformed image (shortest edge).
:param mean: Mean values for each RGB channel used in normalization.
:param std: Standard deviation values for each RGB channel used in
normalization.
:param interpolation: Interpolation method from PIL (
Image.Resampling.NEAREST, Image.Resampling.BILINEAR,
Image.Resampling.BICUBIC).
:param crop_fraction: Fraction of the image to be cropped.
:return: The transformed image as a NumPy array.
"""
if img.mode != "RGB":
img = img.convert("RGB")

# Rotate the image based on the EXIF orientation if needed
img = typing.cast(Image.Image, ImageOps.exif_transpose(img))

# Step 1: Resize while preserving the aspect ratio
width, height = img.size

# Calculate scale size while preserving aspect ratio
scale_size = math.floor(size / crop_fraction)

aspect_ratio = width / height
if width < height:
new_width = scale_size
new_height = int(new_width / aspect_ratio)
else:
new_height = scale_size
new_width = int(new_height * aspect_ratio)

img = img.resize((new_width, new_height), interpolation)

# Step 2: Center crop
left = (new_width - size) // 2
top = (new_height - size) // 2
right = left + size
bottom = top + size
img = img.crop((left, top, right, bottom))

# Step 3: Convert the image to a NumPy array and scale pixel values to
# [0, 1]
img_array = np.array(img).astype(np.float32) / 255.0

# Step 4: Normalize the image
mean_np = np.array(mean, dtype=np.float32).reshape(1, 1, 3)
std_np = np.array(std, dtype=np.float32).reshape(1, 1, 3)
img_array = (img_array - mean_np) / std_np

# Step 5: Change the order of dimensions from (H, W, C) to (C, H, W)
img_array = np.transpose(img_array, (2, 0, 1))
return img_array


class ImageClassifier:
def __init__(self, model_name: str, label_names: list[str], image_size: int = 224):
"""An image classifier based on Yolo models.
We support models trained with Yolov8, v9, v10 and v11.
:param model_name: the name of the model, as registered in Triton
:param label_names: the list of label names
:param image_size: the size of the input image for the model
"""
self.model_name: str = model_name
self.label_names = label_names
self.image_size = image_size

def predict(
self,
image: Image.Image,
triton_uri: str,
model_version: Optional[str] = None,
) -> list[tuple[str, float]]:
"""Run an image classification model on an image.
The model is expected to have been trained with Ultralytics library
(Yolov8).
:param image: the input Pillow image
:param triton_uri: URI of the Triton Inference Server, defaults to
None. If not provided, the default value from settings is used.
:return: the prediction results as a list of tuples (label, confidence)
"""
image_array = self.preprocess(image)

grpc_stub = get_triton_inference_stub(triton_uri)
request = service_pb2.ModelInferRequest()
request.model_name = self.model_name
if model_version:
request.model_version = model_version
add_triton_infer_input_tensor(
request, name="images", data=image_array, datatype="FP32"
)
start_time = time.monotonic()
response = grpc_stub.ModelInfer(request)
latency = time.monotonic() - start_time
logger.debug("Inference time for %s: %s", self.model_name, latency)

start_time = time.monotonic()
result = self.postprocess(response)
latency = time.monotonic() - start_time
logger.debug("Post-processing time for %s: %s", self.model_name, latency)
return result

def preprocess(self, image: Image.Image) -> np.ndarray:
"""Preprocess an image for object detection.
:param image: the input Pillow image
:return: the preprocessed image as a NumPy array
"""
image_array = classify_transforms(image, size=self.image_size)
return np.expand_dims(image_array, axis=0)

def postprocess(
self, response: service_pb2.ModelInferResponse
) -> list[tuple[str, float]]:
"""Postprocess the inference result.
:param response: the inference response
"""
if len(response.outputs) != 1:
raise Exception(f"expected 1 output, got {len(response.outputs)}")

if len(response.raw_output_contents) != 1:
raise Exception(
f"expected 1 raw output content, got {len(response.raw_output_contents)}"
)

output_index = {output.name: i for i, output in enumerate(response.outputs)}
output = np.frombuffer(
response.raw_output_contents[output_index["output0"]],
dtype=np.float32,
).reshape((1, len(self.label_names)))[0]

score_indices = np.argsort(-output)
return [(self.label_names[i], float(output[i])) for i in score_indices]
210 changes: 210 additions & 0 deletions openfoodfacts/ml/object_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import dataclasses
import logging
import time
from typing import Optional

import numpy as np
from cv2 import dnn
from PIL import Image
from tritonclient.grpc import service_pb2

from openfoodfacts.ml.utils import convert_image_to_array
from openfoodfacts.types import JSONType

from .triton import add_triton_infer_input_tensor, get_triton_inference_stub

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class ObjectDetectionRawResult:
num_detections: int
detection_boxes: np.ndarray
detection_scores: np.ndarray
detection_classes: np.ndarray
label_names: list[str]

def to_list(self) -> list[JSONType]:
"""Convert the detection results to a JSON serializable format."""
results = []
for bounding_box, score, label in zip(
self.detection_boxes, self.detection_scores, self.detection_classes
):
label_int = int(label)
label_str = self.label_names[label_int]
if label_str is not None:
result = {
"bounding_box": tuple(bounding_box.tolist()), # type: ignore
"score": float(score),
"label": label_str,
}
results.append(result)
return results


class ObjectDetector:
def __init__(self, model_name: str, label_names: list[str], image_size: int = 640):
"""An object detection detector based on Yolo models.
We support models trained with Yolov8, v9, v10 and v11.
:param model_name: the name of the model, as registered in Triton
:param label_names: the list of label names
:param image_size: the size of the input image for the model
"""
self.model_name: str = model_name
self.label_names = label_names
self.image_size = image_size

def detect_from_image(
self,
image: Image.Image,
triton_uri: str,
threshold: float = 0.5,
model_version: Optional[str] = None,
) -> ObjectDetectionRawResult:
"""Run an object detection model on an image.
The model must have been trained with Ultralytics library.
:param image: the input Pillow image
:param triton_uri: URI of the Triton Inference Server, defaults to
None. If not provided, the default value from settings is used.
:param threshold: the minimum score for a detection to be considered,
defaults to 0.5.
:param model_version: the version of the model to use, defaults to
None (latest).
:return: the detection result
"""
image_array, scale_x, scale_y = self.preprocess(image)
grpc_stub = get_triton_inference_stub(triton_uri)
request = service_pb2.ModelInferRequest()
request.model_name = self.model_name
if model_version:
request.model_version = model_version
add_triton_infer_input_tensor(
request, name="images", data=image_array, datatype="FP32"
)

start_time = time.monotonic()
response = grpc_stub.ModelInfer(request)
latency = time.monotonic() - start_time
logger.debug("Inference time for %s: %s", self.model_name, latency)

start_time = time.monotonic()
response = self.postprocess(
response, threshold=threshold, scale_x=scale_x, scale_y=scale_y
)
latency = time.monotonic() - start_time
logger.debug("Post-processing time for %s: %s", self.model_name, latency)
return response

def preprocess(self, image: Image.Image) -> tuple[np.ndarray, float, float]:
# Yolo object detection models expect a specific image dimension
width, height = image.size
# Prepare a square image for inference
max_size = max(height, width)
# We paste the original image into a larger square image,
# in the upper-left corner, on a black background.
squared_image = Image.new("RGB", (max_size, max_size), color="black")
squared_image.paste(image, (0, 0))
resized_image = squared_image.resize((self.image_size, self.image_size))

# As we don't process the original image but a modified version of it,
# we need to compute the scale factor for the x and y axis.
image_ratio = width / height
scale_x: float
scale_y: float
if image_ratio < 1: # portrait, height > width
scale_x = self.image_size / image_ratio
scale_y = self.image_size
else: # landscape, width > height
scale_x = self.image_size
scale_y = self.image_size * image_ratio

# Preprocess the image and prepare blob for model
image_array = (
convert_image_to_array(resized_image)
.transpose((2, 0, 1))
.astype(np.float32)
)
image_array = image_array / 255.0
image_array = np.expand_dims(image_array, axis=0)
return image_array, scale_x, scale_y

def postprocess(
self, response, threshold: float, scale_x: float, scale_y: float
) -> ObjectDetectionRawResult:
if len(response.outputs) != 1:
raise ValueError(f"expected 1 output, got {len(response.outputs)}")

if len(response.raw_output_contents) != 1:
raise ValueError(
f"expected 1 raw output content, got {len(response.raw_output_contents)}"
)

output_index = {output.name: i for i, output in enumerate(response.outputs)}
output = np.frombuffer(
response.raw_output_contents[output_index["output0"]],
dtype=np.float32,
).reshape((1, len(self.label_names) + 4, -1))[0]

# output is of shape (num_classes + 4, num_detections)
rows = output.shape[1]
raw_detection_classes = np.zeros(rows, dtype=int)
raw_detection_scores = np.zeros(rows, dtype=np.float32)
raw_detection_boxes = np.zeros((rows, 4), dtype=np.float32)

for i in range(rows):
classes_scores = output[4:, i]
max_cls_idx = np.argmax(classes_scores)
max_score = classes_scores[max_cls_idx]
if max_score < threshold:
continue
raw_detection_classes[i] = max_cls_idx
raw_detection_scores[i] = max_score

# The bounding box is in the format (x, y, width, height) in
# relative coordinates
# x and y are the coordinates of the center of the bounding box
bbox_width = output[2, i]
bbox_height = output[3, i]
x_min = output[0, i] - 0.5 * bbox_width
y_min = output[1, i] - 0.5 * bbox_height
x_max = x_min + bbox_width
y_max = y_min + bbox_height

# We save the bounding box in the format
# (y_min, x_min, y_max, x_max) in relative coordinates
# Scale the bounding boxes back to the original image size
raw_detection_boxes[i, 0] = max(0.0, min(1.0, y_min / scale_y))
raw_detection_boxes[i, 1] = max(0.0, min(1.0, x_min / scale_x))
raw_detection_boxes[i, 2] = max(0.0, min(1.0, y_max / scale_y))
raw_detection_boxes[i, 3] = max(0.0, min(1.0, x_max / scale_x))

# Perform NMS (Non Maximum Suppression)
detection_box_indices = dnn.NMSBoxes(
raw_detection_boxes, # type: ignore
raw_detection_scores, # type: ignore
score_threshold=threshold,
# the following values are copied from Ultralytics settings
nms_threshold=0.45,
eta=0.5,
)
detection_classes = np.zeros(len(detection_box_indices), dtype=int)
detection_scores = np.zeros(len(detection_box_indices), dtype=np.float32)
detection_boxes = np.zeros((len(detection_box_indices), 4), dtype=np.float32)

for i, idx in enumerate(detection_box_indices):
detection_classes[i] = raw_detection_classes[idx]
detection_scores[i] = raw_detection_scores[idx]
detection_boxes[i] = raw_detection_boxes[idx]

result = ObjectDetectionRawResult(
num_detections=rows,
detection_classes=detection_classes,
detection_boxes=detection_boxes,
detection_scores=detection_scores,
label_names=self.label_names,
)
return result
Loading

0 comments on commit 27659fe

Please sign in to comment.