import cv2 import numpy as np from typing import Optional, Tuple, Callable import platform import threading # Only import Windows-specific library if on Windows if platform.system() == "Windows": from pygrabber.dshow_graph import FilterGraph class VideoCapturer: def __init__(self, device_index: int): self.device_index = device_index self.frame_callback = None self._current_frame = None self._frame_ready = threading.Event() self.is_running = False self.cap = None # Initialize Windows-specific components if on Windows if platform.system() == "Windows": self.graph = FilterGraph() # Verify device exists devices = self.graph.get_input_devices() if self.device_index >= len(devices): raise ValueError( f"Invalid device index {device_index}. Available devices: {len(devices)}" ) def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: """Initialize and start video capture""" try: if platform.system() == "Windows": # Windows-specific capture methods capture_methods = [ (self.device_index, cv2.CAP_DSHOW), # Try DirectShow first (self.device_index, cv2.CAP_ANY), # Then try default backend (-1, cv2.CAP_ANY), # Try -1 as fallback (0, cv2.CAP_ANY), # Finally try 0 without specific backend ] for dev_id, backend in capture_methods: try: self.cap = cv2.VideoCapture(dev_id, backend) if self.cap.isOpened(): break self.cap.release() except Exception: continue else: # Unix-like systems (Linux/Mac) capture method self.cap = cv2.VideoCapture(self.device_index) if not self.cap or not self.cap.isOpened(): raise RuntimeError("Failed to open camera") # Configure format self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) self.cap.set(cv2.CAP_PROP_FPS, fps) self.is_running = True return True except Exception as e: print(f"Failed to start capture: {str(e)}") if self.cap: self.cap.release() return False def read(self) -> Tuple[bool, Optional[np.ndarray]]: """Read a frame from the camera""" if not self.is_running or self.cap is None: return False, None ret, frame = self.cap.read() if ret: self._current_frame = frame if self.frame_callback: self.frame_callback(frame) return True, frame return False, None def release(self) -> None: """Stop capture and release resources""" if self.is_running and self.cap is not None: self.cap.release() self.is_running = False self.cap = None def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: """Set callback for frame processing""" self.frame_callback = callback