From c72582506d5ae55d24f62223a1d9e079b186345c Mon Sep 17 00:00:00 2001 From: KRSHH <136873090+KRSHH@users.noreply.github.com> Date: Fri, 13 Dec 2024 19:49:11 +0530 Subject: [PATCH] Adding Pygrabber as Cam manager --- modules/globals.py | 2 +- modules/metadata.py | 2 +- modules/processors/frame/face_swapper.py | 7 +- modules/ui.py | 114 +++++++++++++++-------- modules/utilities.py | 106 +++++++++++++++++---- modules/video_capture.py | 84 +++++++++++++++++ 6 files changed, 254 insertions(+), 61 deletions(-) create mode 100644 modules/video_capture.py diff --git a/modules/globals.py b/modules/globals.py index cac2302..693084d 100644 --- a/modules/globals.py +++ b/modules/globals.py @@ -26,7 +26,7 @@ nsfw_filter = False video_encoder = None video_quality = None live_mirror = False -live_resizable = False +live_resizable = True max_memory = None execution_providers: List[str] = [] execution_threads = None diff --git a/modules/metadata.py b/modules/metadata.py index ec5b868..02639c4 100644 --- a/modules/metadata.py +++ b/modules/metadata.py @@ -1,3 +1,3 @@ name = 'Deep Live Cam' -version = '1.7.0' +version = '1.7.5' edition = 'Portable' diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index cfe2147..c188393 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -21,7 +21,10 @@ THREAD_LOCK = threading.Lock() NAME = "DLC.FACE-SWAPPER" abs_dir = os.path.dirname(os.path.abspath(__file__)) -models_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), 'models') +models_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models" +) + def pre_check() -> bool: download_directory_path = abs_dir @@ -56,7 +59,7 @@ def get_face_swapper() -> Any: with THREAD_LOCK: if FACE_SWAPPER is None: - model_path = os.path.join(models_dir, 'inswapper_128_fp16.onnx') + model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") FACE_SWAPPER = insightface.model_zoo.get_model( model_path, providers=modules.globals.execution_providers ) diff --git a/modules/ui.py b/modules/ui.py index f9095e7..a3fd725 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -7,7 +7,7 @@ from cv2_enumerate_cameras import enumerate_cameras # Add this import from PIL import Image, ImageOps import time import json - +from pygrabber.dshow_graph import FilterGraph import modules.globals import modules.metadata from modules.face_analyser import ( @@ -26,6 +26,7 @@ from modules.utilities import ( resolve_relative_path, has_image_extension, ) +from modules.video_capture import VideoCapturer ROOT = None POPUP = None @@ -96,7 +97,7 @@ def save_switch_states(): "fp_ui": modules.globals.fp_ui, "show_fps": modules.globals.show_fps, "mouth_mask": modules.globals.mouth_mask, - "show_mouth_mask_box": modules.globals.show_mouth_mask_box + "show_mouth_mask_box": modules.globals.show_mouth_mask_box, } with open("switch_states.json", "w") as f: json.dump(switch_states, f) @@ -118,7 +119,9 @@ def load_switch_states(): modules.globals.fp_ui = switch_states.get("fp_ui", {"face_enhancer": False}) modules.globals.show_fps = switch_states.get("show_fps", False) modules.globals.mouth_mask = switch_states.get("mouth_mask", False) - modules.globals.show_mouth_mask_box = switch_states.get("show_mouth_mask_box", False) + modules.globals.show_mouth_mask_box = switch_states.get( + "show_mouth_mask_box", False + ) except FileNotFoundError: # If the file doesn't exist, use default values pass @@ -315,18 +318,22 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C camera_label.place(relx=0.1, rely=0.86, relwidth=0.2, relheight=0.05) available_cameras = get_available_cameras() - # Convert camera indices to strings for CTkOptionMenu - available_camera_indices, available_camera_strings = available_cameras - camera_variable = ctk.StringVar( - value=( - available_camera_strings[0] - if available_camera_strings - else "No cameras found" + camera_indices, camera_names = available_cameras + + if not camera_names or camera_names[0] == "No cameras found": + camera_variable = ctk.StringVar(value="No cameras found") + camera_optionmenu = ctk.CTkOptionMenu( + root, + variable=camera_variable, + values=["No cameras found"], + state="disabled", ) - ) - camera_optionmenu = ctk.CTkOptionMenu( - root, variable=camera_variable, values=available_camera_strings - ) + else: + camera_variable = ctk.StringVar(value=camera_names[0]) + camera_optionmenu = ctk.CTkOptionMenu( + root, variable=camera_variable, values=camera_names + ) + camera_optionmenu.place(relx=0.35, rely=0.86, relwidth=0.25, relheight=0.05) live_button = ctk.CTkButton( @@ -335,9 +342,16 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C cursor="hand2", command=lambda: webcam_preview( root, - available_camera_indices[ - available_camera_strings.index(camera_variable.get()) - ], + ( + camera_indices[camera_names.index(camera_variable.get())] + if camera_names and camera_names[0] != "No cameras found" + else None + ), + ), + state=( + "normal" + if camera_names and camera_names[0] != "No cameras found" + else "disabled" ), ) live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) @@ -745,7 +759,7 @@ def update_preview(frame_number: int = 0) -> None: def webcam_preview(root: ctk.CTk, camera_index: int): if not modules.globals.map_faces: if modules.globals.source_path is None: - # No image selected + update_status("Please select a source image first") return create_webcam_preview(camera_index) else: @@ -757,40 +771,60 @@ def webcam_preview(root: ctk.CTk, camera_index: int): def get_available_cameras(): """Returns a list of available camera names and indices.""" - camera_indices = [] - camera_names = [] + try: + graph = FilterGraph() + devices = graph.get_input_devices() - for camera in enumerate_cameras(): - cap = cv2.VideoCapture(camera.index) - if cap.isOpened(): - camera_indices.append(camera.index) - camera_names.append(camera.name) - cap.release() - return (camera_indices, camera_names) + # Create list of indices and names + camera_indices = list(range(len(devices))) + camera_names = devices + + # If no cameras found through DirectShow, try OpenCV fallback + if not camera_names: + # Try to open camera with index -1 and 0 + test_indices = [-1, 0] + working_cameras = [] + + for idx in test_indices: + cap = cv2.VideoCapture(idx) + if cap.isOpened(): + working_cameras.append(f"Camera {idx}") + cap.release() + + if working_cameras: + return test_indices[: len(working_cameras)], working_cameras + + # If still no cameras found, return empty lists + if not camera_names: + return [], ["No cameras found"] + + return camera_indices, camera_names + + except Exception as e: + print(f"Error detecting cameras: {str(e)}") + return [], ["No cameras found"] def create_webcam_preview(camera_index: int): global preview_label, PREVIEW - camera = cv2.VideoCapture(camera_index) - camera.set(cv2.CAP_PROP_FRAME_WIDTH, PREVIEW_DEFAULT_WIDTH) - camera.set(cv2.CAP_PROP_FRAME_HEIGHT, PREVIEW_DEFAULT_HEIGHT) - camera.set(cv2.CAP_PROP_FPS, 60) + cap = VideoCapturer(camera_index) + if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60): + update_status("Failed to start camera") + return preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT) - PREVIEW.deiconify() frame_processors = get_frame_processors_modules(modules.globals.frame_processors) - source_image = None prev_time = time.time() - fps_update_interval = 0.5 # Update FPS every 0.5 seconds + fps_update_interval = 0.5 frame_count = 0 fps = 0 - while camera: - ret, frame = camera.read() + while True: + ret, frame = cap.read() if not ret: break @@ -804,6 +838,11 @@ def create_webcam_preview(camera_index: int): temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() ) + else: + temp_frame = fit_image_to_size( + temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() + ) + if not modules.globals.map_faces: if source_image is None and modules.globals.source_path: source_image = get_one_face(cv2.imread(modules.globals.source_path)) @@ -816,7 +855,6 @@ def create_webcam_preview(camera_index: int): temp_frame = frame_processor.process_frame(source_image, temp_frame) else: modules.globals.target_path = None - for frame_processor in frame_processors: if frame_processor.NAME == "DLC.FACE-ENHANCER": if modules.globals.fp_ui["face_enhancer"]: @@ -855,7 +893,7 @@ def create_webcam_preview(camera_index: int): if PREVIEW.state() == "withdrawn": break - camera.release() + cap.release() PREVIEW.withdraw() diff --git a/modules/utilities.py b/modules/utilities.py index 782395f..fe17997 100644 --- a/modules/utilities.py +++ b/modules/utilities.py @@ -12,16 +12,23 @@ from tqdm import tqdm import modules.globals -TEMP_FILE = 'temp.mp4' -TEMP_DIRECTORY = 'temp' +TEMP_FILE = "temp.mp4" +TEMP_DIRECTORY = "temp" # monkey patch ssl for mac -if platform.system().lower() == 'darwin': +if platform.system().lower() == "darwin": ssl._create_default_https_context = ssl._create_unverified_context def run_ffmpeg(args: List[str]) -> bool: - commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', modules.globals.log_level] + commands = [ + "ffmpeg", + "-hide_banner", + "-hwaccel", + "auto", + "-loglevel", + modules.globals.log_level, + ] commands.extend(args) try: subprocess.check_output(commands, stderr=subprocess.STDOUT) @@ -32,8 +39,19 @@ def run_ffmpeg(args: List[str]) -> bool: def detect_fps(target_path: str) -> float: - command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path] - output = subprocess.check_output(command).decode().strip().split('/') + command = [ + "ffprobe", + "-v", + "error", + "-select_streams", + "v:0", + "-show_entries", + "stream=r_frame_rate", + "-of", + "default=noprint_wrappers=1:nokey=1", + target_path, + ] + output = subprocess.check_output(command).decode().strip().split("/") try: numerator, denominator = map(int, output) return numerator / denominator @@ -44,25 +62,65 @@ def detect_fps(target_path: str) -> float: def extract_frames(target_path: str) -> None: temp_directory_path = get_temp_directory_path(target_path) - run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')]) + run_ffmpeg( + [ + "-i", + target_path, + "-pix_fmt", + "rgb24", + os.path.join(temp_directory_path, "%04d.png"), + ] + ) def create_video(target_path: str, fps: float = 30.0) -> None: temp_output_path = get_temp_output_path(target_path) temp_directory_path = get_temp_directory_path(target_path) - run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path]) + run_ffmpeg( + [ + "-r", + str(fps), + "-i", + os.path.join(temp_directory_path, "%04d.png"), + "-c:v", + modules.globals.video_encoder, + "-crf", + str(modules.globals.video_quality), + "-pix_fmt", + "yuv420p", + "-vf", + "colorspace=bt709:iall=bt601-6-625:fast=1", + "-y", + temp_output_path, + ] + ) def restore_audio(target_path: str, output_path: str) -> None: temp_output_path = get_temp_output_path(target_path) - done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path]) + done = run_ffmpeg( + [ + "-i", + temp_output_path, + "-i", + target_path, + "-c:v", + "copy", + "-map", + "0:v:0", + "-map", + "1:a:0", + "-y", + output_path, + ] + ) if not done: move_temp(target_path, output_path) def get_temp_frame_paths(target_path: str) -> List[str]: temp_directory_path = get_temp_directory_path(target_path) - return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png'))) + return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png"))) def get_temp_directory_path(target_path: str) -> str: @@ -81,7 +139,9 @@ def normalize_output_path(source_path: str, target_path: str, output_path: str) source_name, _ = os.path.splitext(os.path.basename(source_path)) target_name, target_extension = os.path.splitext(os.path.basename(target_path)) if os.path.isdir(output_path): - return os.path.join(output_path, source_name + '-' + target_name + target_extension) + return os.path.join( + output_path, source_name + "-" + target_name + target_extension + ) return output_path @@ -108,20 +168,20 @@ def clean_temp(target_path: str) -> None: def has_image_extension(image_path: str) -> bool: - return image_path.lower().endswith(('png', 'jpg', 'jpeg')) + return image_path.lower().endswith(("png", "jpg", "jpeg")) def is_image(image_path: str) -> bool: if image_path and os.path.isfile(image_path): mimetype, _ = mimetypes.guess_type(image_path) - return bool(mimetype and mimetype.startswith('image/')) + return bool(mimetype and mimetype.startswith("image/")) return False def is_video(video_path: str) -> bool: if video_path and os.path.isfile(video_path): mimetype, _ = mimetypes.guess_type(video_path) - return bool(mimetype and mimetype.startswith('video/')) + return bool(mimetype and mimetype.startswith("video/")) return False @@ -129,12 +189,20 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None: if not os.path.exists(download_directory_path): os.makedirs(download_directory_path) for url in urls: - download_file_path = os.path.join(download_directory_path, os.path.basename(url)) + download_file_path = os.path.join( + download_directory_path, os.path.basename(url) + ) if not os.path.exists(download_file_path): - request = urllib.request.urlopen(url) # type: ignore[attr-defined] - total = int(request.headers.get('Content-Length', 0)) - with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress: - urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] + request = urllib.request.urlopen(url) # type: ignore[attr-defined] + total = int(request.headers.get("Content-Length", 0)) + with tqdm( + total=total, + desc="Downloading", + unit="B", + unit_scale=True, + unit_divisor=1024, + ) as progress: + urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] def resolve_relative_path(path: str) -> str: diff --git a/modules/video_capture.py b/modules/video_capture.py new file mode 100644 index 0000000..dd55672 --- /dev/null +++ b/modules/video_capture.py @@ -0,0 +1,84 @@ +import cv2 +import numpy as np +from pygrabber.dshow_graph import FilterGraph +import threading +from typing import Optional, Tuple, Callable + + +class VideoCapturer: + def __init__(self, device_index: int): + self.graph = FilterGraph() + self.device_index = device_index + self.frame_callback = None + self._current_frame = None + self._frame_ready = threading.Event() + self.is_running = False + self.cap = None + + # Verify device exists + devices = self.graph.get_input_devices() + if self.device_index >= len(devices): + raise ValueError( + f"Invalid device index {device_index}. Available devices: {len(devices)}" + ) + + def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: + """Initialize and start video capture""" + try: + # Try different capture methods in order + capture_methods = [ + (self.device_index, cv2.CAP_DSHOW), # Try DirectShow first + (self.device_index, cv2.CAP_ANY), # Then try default backend + (-1, cv2.CAP_ANY), # Try -1 as fallback + (0, cv2.CAP_ANY), # Finally try 0 without specific backend + ] + + for dev_id, backend in capture_methods: + try: + self.cap = cv2.VideoCapture(dev_id, backend) + if self.cap.isOpened(): + break + self.cap.release() + except Exception: + continue + + if not self.cap or not self.cap.isOpened(): + raise RuntimeError("Failed to open camera with all available methods") + + # Configure format + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + self.cap.set(cv2.CAP_PROP_FPS, fps) + + self.is_running = True + return True + + except Exception as e: + print(f"Failed to start capture: {str(e)}") + if self.cap: + self.cap.release() + return False + + def read(self) -> Tuple[bool, Optional[np.ndarray]]: + """Read a frame from the camera""" + if not self.is_running or self.cap is None: + return False, None + + ret, frame = self.cap.read() + if ret: + self._current_frame = frame + if self.frame_callback: + self.frame_callback(frame) + return True, frame + return False, None + + def release(self) -> None: + """Stop capture and release resources""" + if self.is_running and self.cap is not None: + self.cap.release() + self.is_running = False + self.cap = None + + def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: + """Set callback for frame processing""" + self.frame_callback = callback