工具与软件:
我尝试在电路板上运行以下代码、但我不使用 OpenCV GUI 窗口、而是使用带 kmssink 的 gstreamer 流水线直接显示输出帧。 下面是这个例子: https://github.com/TexasInstruments-Sandbox/edgeai-gst-apps-people-tracking
import numpy as np
import tflite_runtime.interpreter as tflite # Use tflite_runtime for TIDL delegate support
import faiss
import cv2
from PIL import Image
import os
MODEL_PATH = '/opt/edgeai-tidl-artifacts/cl-tfl-fomo-01/trained.tflite'
CAPTURED_IMAGES_DIR = './captured_frames' # Directory to save captured frames
FAISS_INDEX_PATH = 'faiss_index.bin'
ANOMALY_THRESHOLD = 0.5
# Preprocess frame to match model input
def process_frame(frame, interpreter):
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
input_shape = input_details[0]['shape']
img_resized = img.resize([input_shape[1], input_shape[2]])
np_frame = np.array(img_resized, dtype=np.float32) / 255.0
np_frame = np.expand_dims(np_frame, axis=0) # Add batch dimension
interpreter.set_tensor(input_details[0]['index'], np_frame)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
return output_data
# Build the FAISS index using reference ("good") examples
def build_faiss_index(captured_images, interpreter, output_resolution=[12, 12]):
first_example_response = process_frame(cv2.imread(captured_images[0]), interpreter)
feature_dim = first_example_response.shape[-1]
faiss_index = faiss.IndexFlatL2(feature_dim)
for image_path in captured_images:
response = process_frame(cv2.imread(image_path), interpreter)
for i in range(output_resolution[0]):
for j in range(output_resolution[1]):
patch_features = response[0, i, j, :].reshape(1, -1)
faiss_index.add(patch_features.astype(np.float32))
return faiss_index
# Save FAISS index to disk
def save_faiss_index(faiss_index, index_path):
faiss.write_index(faiss_index, index_path)
# Load FAISS index from disk
def load_faiss_index(index_path):
return faiss.read_index(index_path)
# Compute anomaly scores using FAISS
def compute_anomaly_faiss(image_features, faiss_index, output_resolution):
distances = np.zeros(output_resolution)
for i in range(output_resolution[0]):
for j in range(output_resolution[1]):
f_vec = image_features[0, i, j, :].reshape(1, -1)
distance, _ = faiss_index.search(f_vec, 1)
distances[i, j] = distance[0][0]
return distances
# GStreamer pipeline configuration for TI AM62A board
def gstreamer_pipeline():
return (
'v4l2src device=/dev/video3 io-mode=dmabuf-import ! '
'video/x-bayer, width=640, height=480, framerate=15/1, format=rggb10 ! '
'tiovxisp sink_0::device=/dev/v4l-subdev2 sensor-name="SENSOR_SONY_IMX219_RPI" '
'dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss_10b_640x480.bin sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a_10b_640x480.bin format-msb=9 ! '
'video/x-raw, format=NV12, width=640, height=480, framerate=15/1 ! videoconvert ! video/x-raw, format=BGR ! appsink'
)
# Webcam stream with training and inference functionality
def evaluate_fomo_stream(interpreter):
cap = cv2.VideoCapture(gstreamer_pipeline(), cv2.CAP_GSTREAMER)
captured_images = []
faiss_index = None # No FAISS index initially
def capture_frame(event, x, y, flags, param):
nonlocal captured_images
if event == cv2.EVENT_LBUTTONDOWN:
print("Capturing frame for training...")
img_path = os.path.join(CAPTURED_IMAGES_DIR, f'captured_{len(captured_images)}.png')
cv2.imwrite(img_path, frame) # Save the frame without heatmap
captured_images.append(img_path)
print(f"Captured and saved: {img_path}")
cv2.namedWindow("Anomaly Detection")
cv2.setMouseCallback("Anomaly Detection", capture_frame)
while True:
ret, frame = cap.read()
if not ret:
print("Failed to grab frame")
break
# If we have enough images for training (say, 10), build FAISS index
if len(captured_images) >= 10 and faiss_index is None:
print("Building FAISS index with captured images...")
faiss_index = build_faiss_index(captured_images, interpreter)
save_faiss_index(faiss_index, FAISS_INDEX_PATH)
print("FAISS index saved and inference mode is enabled.")
if faiss_index is not None:
image_feat = process_frame(frame, interpreter)
anomaly_score = compute_anomaly_faiss(image_feat, faiss_index, [12, 12])
global_anomaly_score = np.mean(anomaly_score)
classification = 'Anomaly' if global_anomaly_score > ANOMALY_THRESHOLD else 'No Anomaly'
anomaly_heatmap = cv2.resize(anomaly_score, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_NEAREST)
anomaly_heatmap_normalized = cv2.normalize(anomaly_heatmap, None, 0, 255, cv2.NORM_MINMAX)
anomaly_heatmap_colored = cv2.applyColorMap(np.uint8(anomaly_heatmap_normalized), cv2.COLORMAP_JET)
overlay_frame = cv2.addWeighted(frame, 0.6, anomaly_heatmap_colored, 0.4, 0)
cv2.putText(overlay_frame, f'Anomaly Score: {global_anomaly_score:.2f}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1,
(255, 255, 255), 2)
cv2.putText(overlay_frame, f'Classification: {classification}', (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1,
(0, 0, 255) if classification == 'Anomaly' else (0, 255, 0), 2)
cv2.imshow("Anomaly Detection", overlay_frame)
else:
cv2.imshow("Anomaly Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
# Create a directory to save captured frames
if not os.path.exists(CAPTURED_IMAGES_DIR):
os.makedirs(CAPTURED_IMAGES_DIR)
# Load the TFLite model with TIDL delegate support
tidl_delegate = [tflite.load_delegate('libtidl_tfl_delegate.so', {'artifacts_folder': '/opt/edgeai-tidl-artifacts/cl-tfl-fomo-01'})]
interpreter = tflite.Interpreter(model_path=MODEL_PATH, experimental_delegates=tidl_delegate)
interpreter.allocate_tensors()
# Start real-time anomaly detection with live training
evaluate_fomo_stream(interpreter)
但我对应该如何打破代码感到有点困惑。 我需要什么部分作为一个预或后处理,特别是如何可以做鼠标点击功能10次点击图片,然后使用它们来生成和保存传真索引。 如何创建 gstreamer 流水线(我甚至必须手动创建该流水线、还是使用上面的示例自动生成该流水线?)、通过示例中的脚本已处理了流水线的哪些部分? 我想使用 OptiFlow、以便流水线不会变慢。 如果您能帮助我细分我需要的类/文件/流水线、那就太好了。
下面是我创建的配置 YAML 文件的链接、您能否告诉我是否可以用于此目的? drive.google.com/.../view
谢谢你