Example: blobs/compile_model.py

# Copyright (c) 2024 Justin Davis (davisjustin302@gmail.com)
#
# MIT License
"""Example showcasing how to use the compile_model function."""

from __future__ import annotations

from pathlib import Path

import cv2
import depthai as dai
import kornia
import torch
from oakutils.blobs import compile_model
from oakutils.blobs.definitions import AbstractModel, InputType, ModelType
from oakutils.nodes import (
    create_color_camera,
    create_neural_network,
    create_xout,
    get_nn_bgr_frame,
)
from typing_extensions import Self


class Custom(AbstractModel):
    """nn.Module wrapper for a custom operation."""

    def __init__(self: Self) -> None:
        """Create a new Custom model."""
        super().__init__()

    @classmethod
    def model_type(cls: Custom) -> ModelType:
        """Type of input this model takes."""
        return ModelType.KERNEL

    @classmethod
    def input_names(cls: Custom) -> list[tuple[str, InputType]]:
        """Names of the input tensors."""
        return [("input", InputType.FP16)]

    @classmethod
    def output_names(cls: Custom) -> list[str]:
        """Names of the output tensors."""
        return ["output"]

    def forward(self: Self, image: torch.Tensor) -> torch.Tensor:
        """Forward pass of the model."""
        return kornia.filters.laplacian(
            kornia.filters.gaussian_blur2d(
                image,
                (5, 5),
                (1.5, 1.5),
            ),
            7,
        )


model_path = compile_model(
    Custom,  # simply put the class definition here, but not a created version!
    # Sobel(),  # this is wrong!
    {},  # this model doesn't take any arguments, simply put an empty dict
    # If the model took arguments, you would put them here
    # For example, if the model took a kernel size and sigma, you would put:
    # {"kernel_size": 3, "sigma": 0.5}
    cache=False,  # this will cache the model in ~/site-packages/oakutils/blobs/_cache
    # if the model is compiled again, it will instead look in the cache for an
    # already compiled version. Set to True to check for the model, False recompile always
    # cache=True,
    shaves=6,  # this is the number of shaves to use for the model
    # shaves are the computational units onboard the OAK cameras
    # NOTE: adding more shaves does not always mean better performance!
    # Luxonis recommends using 6 shaves for most models (actual models, not CV functions)
    # CV functions can often use 1 or 2 shaves
    shape_mapping={
        InputType.FP16: (300, 300, 3),  # this is the shape of the input tensor
        # you can change this to match whatever shapes you want your model to take as input
        # output size is determined by the model itself
        # MAKE SURE YOU DEFINE A SHAPE FOR ALL InputTypes!
        # Defaults are:
        # InputType.FP16: (640, 480, 3)
        # InputType.XYZ: (640, 400, 3)
        # InputType.U8: (640, 400, 1)
    },
    # to use default provide nothing
    # shape_mapping=None,
    creation_func=torch.ones,  # this is the function used to create the "dummy" tensor
    # the dummy tensor is the data used by torch's tracer to generate the model graph
    # such that we can export it to onnx
    # the default is torch.rand, which creates a random tensor
    # you can change this to whatever you want, as long as it returns a torch.Tensor
    # Example: torch.zeros, torch.ones, torch.rand, torch.randn, torch.randperm, etc.
)
# model_path is the path to the compiled model
print(model_path)

# verify that the path exists
if not Path.exists(model_path):
    err_msg = f"Model path {model_path} does not exist!"
    raise FileNotFoundError(err_msg)

# verify that the path is a file
if not Path.is_file(model_path):
    err_msg = f"Model path {model_path} is not a file!"
    raise FileNotFoundError(err_msg)

# now lets use the new model on the camera
pipeline = dai.Pipeline()

# create the rgb cam to get some data
cam = create_color_camera(
    pipeline,
    preview_size=(
        300,
        300,
    ),  # use the preview size to get an image that matches the model
    # this is important since the resize will be done on hardware onboard the camera
    # and the normal resolution has set dimensions which do not match the models
)
# add the sobel model to the pipeline
custom_network = create_neural_network(
    pipeline,
    cam.preview,  # use the preview stream as the input
    model_path,  # our compiled model path from compile_model
)

# create an output stream
streamname = "network"
xout_nn = create_xout(pipeline, custom_network.out, streamname)

with dai.Device(pipeline) as device:
    queue: dai.DataOutputQueue = device.getOutputQueue(streamname)

    while True:
        data = queue.get()

        # use the get_nn_bgr_frame helper to get a frame from the nn data
        # if your network doesnt output an image define a custom helper
        frame = get_nn_bgr_frame(
            data,  # the raw data packet, this will be a dai.NNData
            (300, 300),  # make sure to match the size
            normalization=255.0,  # this is how to multiply the data to get the correct values
            # by default the outputs are normalized to [0-1] by OpenVINO (the actual compiler)
        )

        cv2.imshow(streamname, frame)
        if cv2.waitKey(1) == ord("q"):
            break