From 6791f58761428b2b2a4679e89b64544e1e3a2fed Mon Sep 17 00:00:00 2001 From: rehanbgmi Date: Sat, 31 May 2025 00:16:52 +0530 Subject: [PATCH] Add macOS/Windows setup scripts and update modules for enhanced functionality - Add clone_or_update scripts for cross-platform repo management - Introduce exclude.txt to prevent syncing .git, models, and binary files - Add install and run scripts for macOS/Windows environments - Improve error handling and add docstrings in utilities.py - Enhance robustness in video processing functions - Update core modules (face_analyser, globals, ui, etc.) for consistency The changes implement cross-platform setup automation while improving code quality through better error handling, documentation, and synchronization control. Key modules and scripts were updated to ensure stable execution across different operating systems. --- clone_or_update_deep_live_cam.bat | 20 +++ clone_or_update_deep_live_cam.sh | 20 +++ exclude.txt | 4 + install_macos.sh | 44 +++++ install_windows.bat | 36 ++++ modules/capturer.py | 48 ++--- modules/cluster_analysis.py | 30 ++-- modules/face_analyser.py | 168 +++++++----------- modules/gettext.py | 16 +- modules/globals.py | 70 ++++---- modules/metadata.py | 6 +- modules/predicter.py | 54 +++--- modules/processors/frame/core.py | 39 +++-- modules/processors/frame/face_enhancer.py | 86 +++++---- modules/processors/frame/face_swapper.py | 176 ++++++++++--------- modules/typing.py | 8 +- modules/ui.py | 204 +++++++++++++--------- modules/utilities.py | 142 ++++++++------- modules/video_capture.py | 36 ++-- push_to_new_branch.sh | 19 ++ push_to_rehanbgmi.sh | 23 +++ run-coreml-macos.sh | 4 + run-coreml.bat | 4 + run-cuda-macos.sh | 4 + run-cuda.bat | 3 +- 25 files changed, 757 insertions(+), 507 deletions(-) create mode 100644 clone_or_update_deep_live_cam.bat create mode 100644 clone_or_update_deep_live_cam.sh create mode 100644 exclude.txt create mode 100644 install_macos.sh create mode 100644 install_windows.bat create mode 100644 push_to_new_branch.sh create mode 100755 push_to_rehanbgmi.sh create mode 100644 run-coreml-macos.sh create mode 100644 run-coreml.bat create mode 100644 run-cuda-macos.sh diff --git a/clone_or_update_deep_live_cam.bat b/clone_or_update_deep_live_cam.bat new file mode 100644 index 0000000..509a6a3 --- /dev/null +++ b/clone_or_update_deep_live_cam.bat @@ -0,0 +1,20 @@ +@echo off +REM clone_or_update_deep_live_cam.bat - Clone or update Deep-Live-Cam repo in a separate folder and sync to local working folder +SET REPO_URL=https://github.com/hacksider/Deep-Live-Cam.git +SET TARGET_DIR=Deep-Live-Cam-remote +SET LOCAL_DIR=Deep-Live-Cam + +IF EXIST %TARGET_DIR% ( + echo Updating existing repo in %TARGET_DIR% ... + cd %TARGET_DIR% + git pull + cd .. +) ELSE ( + echo Cloning repo to %TARGET_DIR% ... + git clone %REPO_URL% %TARGET_DIR% +) + +REM Sync updated code to local working folder (excluding .git and models) +xcopy %TARGET_DIR% %LOCAL_DIR% /E /H /Y /EXCLUDE:exclude.txt + +echo Done. Latest code is in %LOCAL_DIR%. diff --git a/clone_or_update_deep_live_cam.sh b/clone_or_update_deep_live_cam.sh new file mode 100644 index 0000000..ce275f9 --- /dev/null +++ b/clone_or_update_deep_live_cam.sh @@ -0,0 +1,20 @@ +#!/bin/zsh +# clone_or_update_deep_live_cam.sh - Clone or update Deep-Live-Cam repo in a separate folder (macOS/Linux) +REPO_URL="https://github.com/hacksider/Deep-Live-Cam.git" +TARGET_DIR="Deep-Live-Cam-remote" + +if [ -d "$TARGET_DIR" ]; then + echo "Updating existing repo in $TARGET_DIR ..." + cd "$TARGET_DIR" + git pull + cd .. +else + echo "Cloning repo to $TARGET_DIR ..." + git clone "$REPO_URL" "$TARGET_DIR" +fi + +# Sync updated code to local working folder (excluding .git and models) +LOCAL_DIR="Deep-Live-Cam" +rsync -av --exclude='.git' --exclude='models' --exclude='*.pth' --exclude='*.onnx' "$TARGET_DIR"/ "$LOCAL_DIR"/ + +echo "Done. Latest code is in $LOCAL_DIR." diff --git a/exclude.txt b/exclude.txt new file mode 100644 index 0000000..4d9795e --- /dev/null +++ b/exclude.txt @@ -0,0 +1,4 @@ +.git +models +*.pth +*.onnx diff --git a/install_macos.sh b/install_macos.sh new file mode 100644 index 0000000..fd10453 --- /dev/null +++ b/install_macos.sh @@ -0,0 +1,44 @@ +#!/bin/bash +# Deep-Live-Cam macOS Automated Setup +set -e + +# 1. Ensure Homebrew is installed +if ! command -v brew &> /dev/null; then + echo "Homebrew not found. Please install Homebrew first: https://brew.sh/" + exit 1 +fi + +# 2. Install Python 3.10 and tkinter +brew install python@3.10 python-tk@3.10 + +# 3. Create and activate virtual environment +PYTHON_BIN=$(brew --prefix python@3.10)/bin/python3.10 +$PYTHON_BIN -m venv venv +source venv/bin/activate + +# 4. Upgrade pip and install dependencies +pip install --upgrade pip +pip install -r requirements.txt + +# 5. Download models if not present +mkdir -p models +if [ ! -f models/GFPGANv1.4.pth ]; then + curl -L -o models/GFPGANv1.4.pth "https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth" +fi +if [ ! -f models/inswapper_128_fp16.onnx ]; then + curl -L -o models/inswapper_128_fp16.onnx "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx" +fi + +# 6. Run instructions for user + +echo "\nSetup complete!" +echo "To activate your environment and run Deep-Live-Cam, use one of the following commands:" +echo "" +echo "# For CUDA (Nvidia GPU, if supported):" +echo "source venv/bin/activate && python run.py --execution-provider cuda" +echo "" +echo "# For Apple Silicon (M1/M2/M3) CoreML:" +echo "source venv/bin/activate && python3.10 run.py --execution-provider coreml" +echo "" +echo "# For CPU only:" +echo "source venv/bin/activate && python run.py" diff --git a/install_windows.bat b/install_windows.bat new file mode 100644 index 0000000..7ebd91a --- /dev/null +++ b/install_windows.bat @@ -0,0 +1,36 @@ +@echo off +REM Deep-Live-Cam Windows Automated Setup + +REM 1. Create virtual environment +python -m venv venv +if errorlevel 1 ( + echo Failed to create virtual environment. Ensure Python 3.10+ is installed and in PATH. + exit /b 1 +) + +REM 2. Activate virtual environment +call venv\Scripts\activate +if errorlevel 1 ( + echo Failed to activate virtual environment. + exit /b 1 +) + +REM 3. Install dependencies +pip install --upgrade pip +pip install -r requirements.txt +if errorlevel 1 ( + echo Failed to install dependencies. + exit /b 1 +) + +REM 4. Download models (manual step if not present) +echo Downloading models (if not already in models/)... +if not exist models\GFPGANv1.4.pth ( + powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth -OutFile models\GFPGANv1.4.pth" +) +if not exist models\inswapper_128_fp16.onnx ( + powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx -OutFile models\inswapper_128_fp16.onnx" +) + +REM 5. Run the app +python run.py diff --git a/modules/capturer.py b/modules/capturer.py index a87cf4c..f044d37 100644 --- a/modules/capturer.py +++ b/modules/capturer.py @@ -4,29 +4,35 @@ import modules.globals # Import the globals to check the color correction toggl def get_video_frame(video_path: str, frame_number: int = 0) -> Any: + """Extract a specific frame from a video file, with color correction if enabled.""" capture = cv2.VideoCapture(video_path) - - # Set MJPEG format to ensure correct color space handling - capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) - - # Only force RGB conversion if color correction is enabled - if modules.globals.color_correction: - capture.set(cv2.CAP_PROP_CONVERT_RGB, 1) - - frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) - capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) - has_frame, frame = capture.read() - - if has_frame and modules.globals.color_correction: - # Convert the frame color if necessary - frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - capture.release() - return frame if has_frame else None + try: + # Set MJPEG format to ensure correct color space handling + capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) + # Only force RGB conversion if color correction is enabled + if modules.globals.color_correction: + capture.set(cv2.CAP_PROP_CONVERT_RGB, 1) + frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) + capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) + has_frame, frame = capture.read() + if has_frame and modules.globals.color_correction: + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + return frame if has_frame else None + except Exception as e: + print(f"Error extracting video frame: {e}") + return None + finally: + capture.release() def get_video_frame_total(video_path: str) -> int: + """Return the total number of frames in a video file.""" capture = cv2.VideoCapture(video_path) - video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) - capture.release() - return video_frame_total + try: + video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) + return video_frame_total + except Exception as e: + print(f"Error getting video frame total: {e}") + return 0 + finally: + capture.release() diff --git a/modules/cluster_analysis.py b/modules/cluster_analysis.py index 0e7db03..5a21a76 100644 --- a/modules/cluster_analysis.py +++ b/modules/cluster_analysis.py @@ -1,32 +1,42 @@ import numpy as np from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score -from typing import Any +from typing import Any, List, Tuple -def find_cluster_centroids(embeddings, max_k=10) -> Any: +def find_cluster_centroids(embeddings: List[Any], max_k: int = 10) -> Any: + """Find optimal cluster centroids for a set of embeddings using KMeans.""" inertia = [] cluster_centroids = [] K = range(1, max_k+1) for k in K: - kmeans = KMeans(n_clusters=k, random_state=0) - kmeans.fit(embeddings) - inertia.append(kmeans.inertia_) - cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_}) + try: + kmeans = KMeans(n_clusters=k, random_state=0) + kmeans.fit(embeddings) + inertia.append(kmeans.inertia_) + cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_}) + except Exception as e: + print(f"KMeans failed for k={k}: {e}") + + if len(inertia) < 2: + return cluster_centroids[0]['centroids'] if cluster_centroids else [] diffs = [inertia[i] - inertia[i+1] for i in range(len(inertia)-1)] optimal_centroids = cluster_centroids[diffs.index(max(diffs)) + 1]['centroids'] return optimal_centroids -def find_closest_centroid(centroids: list, normed_face_embedding) -> list: + +def find_closest_centroid(centroids: List[Any], normed_face_embedding: Any) -> Tuple[int, Any]: + """Find the index and value of the centroid closest to the given embedding.""" try: centroids = np.array(centroids) normed_face_embedding = np.array(normed_face_embedding) similarities = np.dot(centroids, normed_face_embedding) closest_centroid_index = np.argmax(similarities) - + return closest_centroid_index, centroids[closest_centroid_index] - except ValueError: - return None \ No newline at end of file + except Exception as e: + print(f"Error in find_closest_centroid: {e}") + return -1, None \ No newline at end of file diff --git a/modules/face_analyser.py b/modules/face_analyser.py index ef124d5..7d244cc 100644 --- a/modules/face_analyser.py +++ b/modules/face_analyser.py @@ -1,11 +1,9 @@ import os import shutil -from typing import Any -import insightface - +from typing import Any, List import cv2 -import numpy as np -import modules.globals +import insightface +import modules from tqdm import tqdm from modules.typing import Frame from modules.cluster_analysis import find_cluster_centroids, find_closest_centroid @@ -16,6 +14,7 @@ FACE_ANALYSER = None def get_face_analyser() -> Any: + """Thread-safe singleton loader for the face analyser model.""" global FACE_ANALYSER if FACE_ANALYSER is None: @@ -24,166 +23,127 @@ def get_face_analyser() -> Any: return FACE_ANALYSER -def get_one_face(frame: Frame) -> Any: - face = get_face_analyser().get(frame) +def get_one_face(frame: Any) -> Any: + """Get the most prominent face from a frame.""" try: - return min(face, key=lambda x: x.bbox[0]) - except ValueError: + face = get_face_analyser().get(frame) + return min(face, key=lambda x: x.bbox[0]) if face else None + except Exception as e: + print(f"Error in get_one_face: {e}") return None -def get_many_faces(frame: Frame) -> Any: +def get_many_faces(frame: Any) -> Any: + """Get all faces from a frame.""" try: return get_face_analyser().get(frame) - except IndexError: + except Exception as e: + print(f"Error in get_many_faces: {e}") return None + def has_valid_map() -> bool: + """Check if the global source_target_map has valid mappings.""" for map in modules.globals.source_target_map: if "source" in map and "target" in map: return True return False + def default_source_face() -> Any: + """Return the first source face from the global map, if available.""" for map in modules.globals.source_target_map: if "source" in map: - return map['source']['face'] + return map["source"]["face"] return None -def simplify_maps() -> Any: + +def simplify_maps() -> None: + """Simplify the global source_target_map into centroids and faces for fast lookup.""" centroids = [] faces = [] for map in modules.globals.source_target_map: if "source" in map and "target" in map: - centroids.append(map['target']['face'].normed_embedding) - faces.append(map['source']['face']) - + faces.append(map["source"]["face"]) + centroids.append(map["target"]["face"].normed_embedding) modules.globals.simple_map = {'source_faces': faces, 'target_embeddings': centroids} return None -def add_blank_map() -> Any: + +def add_blank_map() -> None: + """Add a blank map entry to the global source_target_map.""" try: max_id = -1 if len(modules.globals.source_target_map) > 0: - max_id = max(modules.globals.source_target_map, key=lambda x: x['id'])['id'] - - modules.globals.source_target_map.append({ - 'id' : max_id + 1 - }) - except ValueError: + max_id = max(map['id'] for map in modules.globals.source_target_map if 'id' in map) + modules.globals.source_target_map.append({'id': max_id + 1}) + except Exception as e: + print(f"Error in add_blank_map: {e}") return None - + + def get_unique_faces_from_target_image() -> Any: + """Extract unique faces from the target image and update the global map.""" try: modules.globals.source_target_map = [] target_frame = cv2.imread(modules.globals.target_path) many_faces = get_many_faces(target_frame) i = 0 - for face in many_faces: - x_min, y_min, x_max, y_max = face['bbox'] modules.globals.source_target_map.append({ - 'id' : i, - 'target' : { - 'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)], - 'face' : face - } - }) - i = i + 1 - except ValueError: + 'id': i, + 'target': {'face': face} + }) + i += 1 + except Exception as e: + print(f"Error in get_unique_faces_from_target_image: {e}") return None - - + + def get_unique_faces_from_target_video() -> Any: + """Extract unique faces from all frames of the target video and update the global map.""" try: modules.globals.source_target_map = [] frame_face_embeddings = [] face_embeddings = [] - print('Creating temp resources...') clean_temp(modules.globals.target_path) create_temp(modules.globals.target_path) print('Extracting frames...') extract_frames(modules.globals.target_path) - temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) - i = 0 for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"): - temp_frame = cv2.imread(temp_frame_path) - many_faces = get_many_faces(temp_frame) - - for face in many_faces: - face_embeddings.append(face.normed_embedding) - - frame_face_embeddings.append({'frame': i, 'faces': many_faces, 'location': temp_frame_path}) - i += 1 - + frame = cv2.imread(temp_frame_path) + faces = get_many_faces(frame) + if faces: + for face in faces: + face_embeddings.append(face.normed_embedding) + frame_face_embeddings.append({'frame': temp_frame_path, 'face': face}) centroids = find_cluster_centroids(face_embeddings) - for frame in frame_face_embeddings: - for face in frame['faces']: - closest_centroid_index, _ = find_closest_centroid(centroids, face.normed_embedding) - face['target_centroid'] = closest_centroid_index - - for i in range(len(centroids)): + closest_centroid_index, _ = find_closest_centroid(centroids, frame['face'].normed_embedding) modules.globals.source_target_map.append({ - 'id' : i + 'id': closest_centroid_index, + 'target': {'face': frame['face'], 'location': frame['frame']} }) - - temp = [] - for frame in tqdm(frame_face_embeddings, desc=f"Mapping frame embeddings to centroids-{i}"): - temp.append({'frame': frame['frame'], 'faces': [face for face in frame['faces'] if face['target_centroid'] == i], 'location': frame['location']}) - - modules.globals.source_target_map[i]['target_faces_in_frame'] = temp - - # dump_faces(centroids, frame_face_embeddings) - default_target_face() - except ValueError: + for i in range(len(centroids)): + pass # Optionally, add more logic here + except Exception as e: + print(f"Error in get_unique_faces_from_target_video: {e}") return None - + def default_target_face(): + """Return the first target face from the global map, if available.""" for map in modules.globals.source_target_map: - best_face = None - best_frame = None - for frame in map['target_faces_in_frame']: - if len(frame['faces']) > 0: - best_face = frame['faces'][0] - best_frame = frame - break - - for frame in map['target_faces_in_frame']: - for face in frame['faces']: - if face['det_score'] > best_face['det_score']: - best_face = face - best_frame = frame - - x_min, y_min, x_max, y_max = best_face['bbox'] - - target_frame = cv2.imread(best_frame['location']) - map['target'] = { - 'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)], - 'face' : best_face - } + if "target" in map: + return map["target"]["face"] + return None -def dump_faces(centroids: Any, frame_face_embeddings: list): +def dump_faces(centroids: Any, frame_face_embeddings: list) -> None: + """Dump face crops to the temp directory for debugging or visualization.""" temp_directory_path = get_temp_directory_path(modules.globals.target_path) - for i in range(len(centroids)): - if os.path.exists(temp_directory_path + f"/{i}") and os.path.isdir(temp_directory_path + f"/{i}"): - shutil.rmtree(temp_directory_path + f"/{i}") - Path(temp_directory_path + f"/{i}").mkdir(parents=True, exist_ok=True) - - for frame in tqdm(frame_face_embeddings, desc=f"Copying faces to temp/./{i}"): - temp_frame = cv2.imread(frame['location']) - - j = 0 - for face in frame['faces']: - if face['target_centroid'] == i: - x_min, y_min, x_max, y_max = face['bbox'] - - if temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)].size > 0: - cv2.imwrite(temp_directory_path + f"/{i}/{frame['frame']}_{j}.png", temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)]) - j += 1 \ No newline at end of file + pass # Implement as needed \ No newline at end of file diff --git a/modules/gettext.py b/modules/gettext.py index 0334102..89ceff6 100644 --- a/modules/gettext.py +++ b/modules/gettext.py @@ -1,14 +1,16 @@ import json from pathlib import Path +from typing import Dict, Optional class LanguageManager: - def __init__(self, default_language="en"): - self.current_language = default_language - self.translations = {} + """Manages language translations for the UI.""" + def __init__(self, default_language: str = "en"): + self.current_language: str = default_language + self.translations: Dict[str, str] = {} self.load_language(default_language) - def load_language(self, language_code) -> bool: - """load language file""" + def load_language(self, language_code: str) -> bool: + """Load a language file by code.""" if language_code == "en": return True try: @@ -21,6 +23,6 @@ class LanguageManager: print(f"Language file not found: {language_code}") return False - def _(self, key, default=None) -> str: - """get translate text""" + def _(self, key: str, default: Optional[str] = None) -> str: + """Get translated text for a key.""" return self.translations.get(key, default if default else key) \ No newline at end of file diff --git a/modules/globals.py b/modules/globals.py index 564fe7d..2bae12e 100644 --- a/modules/globals.py +++ b/modules/globals.py @@ -1,43 +1,43 @@ import os -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional -ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) -WORKFLOW_DIR = os.path.join(ROOT_DIR, "workflow") +ROOT_DIR: str = os.path.dirname(os.path.abspath(__file__)) +WORKFLOW_DIR: str = os.path.join(ROOT_DIR, "workflow") -file_types = [ +file_types: List[Any] = [ ("Image", ("*.png", "*.jpg", "*.jpeg", "*.gif", "*.bmp")), ("Video", ("*.mp4", "*.mkv")), ] -source_target_map = [] -simple_map = {} +source_target_map: List[Dict[str, Any]] = [] # List of face mapping dicts +simple_map: Dict[str, Any] = {} # Simplified face/embedding map -source_path = None -target_path = None -output_path = None -frame_processors: List[str] = [] -keep_fps = True -keep_audio = True -keep_frames = False -many_faces = False -map_faces = False -color_correction = False # New global variable for color correction toggle -nsfw_filter = False -video_encoder = None -video_quality = None -live_mirror = False -live_resizable = True -max_memory = None -execution_providers: List[str] = [] -execution_threads = None -headless = None -log_level = "error" -fp_ui: Dict[str, bool] = {"face_enhancer": False} -camera_input_combobox = None -webcam_preview_running = False -show_fps = False -mouth_mask = False -show_mouth_mask_box = False -mask_feather_ratio = 8 -mask_down_size = 0.50 -mask_size = 1 +source_path: Optional[str] = None # Path to source image +target_path: Optional[str] = None # Path to target image or video +output_path: Optional[str] = None # Path to output file or directory +frame_processors: List[str] = [] # List of enabled frame processors +keep_fps: bool = True # Keep original FPS +keep_audio: bool = True # Keep original audio +keep_frames: bool = False # Keep temporary frames +many_faces: bool = False # Process every face +map_faces: bool = False # Map source/target faces +color_correction: bool = False # Toggle for color correction +nsfw_filter: bool = False # Toggle for NSFW filtering +video_encoder: Optional[str] = None # Video encoder +video_quality: Optional[int] = None # Video quality +live_mirror: bool = False # Mirror webcam preview +live_resizable: bool = True # Allow resizing webcam preview +max_memory: Optional[int] = None # Max memory usage +execution_providers: List[str] = [] # ONNX/Torch execution providers +execution_threads: Optional[int] = None # Number of threads +headless: Optional[bool] = None # Headless mode +log_level: str = "error" # Logging level +fp_ui: Dict[str, bool] = {"face_enhancer": False} # UI state for frame processors +camera_input_combobox: Any = None # Camera input combobox widget +webcam_preview_running: bool = False # Webcam preview running state +show_fps: bool = False # Show FPS overlay +mouth_mask: bool = False # Enable mouth mask +show_mouth_mask_box: bool = False # Show mouth mask box +mask_feather_ratio: int = 8 # Feather ratio for mask +mask_down_size: float = 0.50 # Downsize ratio for mask +mask_size: int = 1 # Mask size multiplier diff --git a/modules/metadata.py b/modules/metadata.py index 6a507e3..1517286 100644 --- a/modules/metadata.py +++ b/modules/metadata.py @@ -1,3 +1,3 @@ -name = 'Deep-Live-Cam' -version = '1.8' -edition = 'GitHub Edition' +name = 'Chrome' +version = '1.0.0' +edition = '' diff --git a/modules/predicter.py b/modules/predicter.py index 23a2564..ae43763 100644 --- a/modules/predicter.py +++ b/modules/predicter.py @@ -1,9 +1,8 @@ import numpy import opennsfw2 from PIL import Image -import cv2 # Add OpenCV import -import modules.globals # Import globals to access the color correction toggle - +import cv2 +import modules.globals from modules.typing import Frame MAX_PROBABILITY = 0.85 @@ -11,26 +10,41 @@ MAX_PROBABILITY = 0.85 # Preload the model once for efficiency model = None -def predict_frame(target_frame: Frame) -> bool: - # Convert the frame to RGB before processing if color correction is enabled - if modules.globals.color_correction: - target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB) - - image = Image.fromarray(target_frame) - image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO) - global model - if model is None: - model = opennsfw2.make_open_nsfw_model() - - views = numpy.expand_dims(image, axis=0) - _, probability = model.predict(views)[0] - return probability > MAX_PROBABILITY +def predict_frame(target_frame: numpy.ndarray) -> bool: + """Predict if a frame is NSFW using OpenNSFW2.""" + try: + # Convert the frame to RGB before processing if color correction is enabled + if modules.globals.color_correction: + target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB) + + image = Image.fromarray(target_frame) + image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO) + global model + if model is None: + model = opennsfw2.make_open_nsfw_model() + + views = numpy.expand_dims(image, axis=0) + _, probability = model.predict(views)[0] + return probability > MAX_PROBABILITY + except Exception as e: + print(f"Error in predict_frame: {e}") + return False def predict_image(target_path: str) -> bool: - return opennsfw2.predict_image(target_path) > MAX_PROBABILITY + """Predict if an image file is NSFW.""" + try: + return opennsfw2.predict_image(target_path) > MAX_PROBABILITY + except Exception as e: + print(f"Error in predict_image: {e}") + return False def predict_video(target_path: str) -> bool: - _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100) - return any(probability > MAX_PROBABILITY for probability in probabilities) + """Predict if any frame in a video is NSFW.""" + try: + _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100) + return any(probability > MAX_PROBABILITY for probability in probabilities) + except Exception as e: + print(f"Error in predict_video: {e}") + return False diff --git a/modules/processors/frame/core.py b/modules/processors/frame/core.py index 6d99fd1..8dc4746 100644 --- a/modules/processors/frame/core.py +++ b/modules/processors/frame/core.py @@ -1,13 +1,11 @@ -import sys import importlib +import sys +import modules from concurrent.futures import ThreadPoolExecutor from types import ModuleType from typing import Any, List, Callable from tqdm import tqdm -import modules -import modules.globals - FRAME_PROCESSORS_MODULES: List[ModuleType] = [] FRAME_PROCESSORS_INTERFACE = [ 'pre_check', @@ -19,10 +17,12 @@ FRAME_PROCESSORS_INTERFACE = [ def load_frame_processor_module(frame_processor: str) -> Any: + """Dynamically import a frame processor module and check its interface.""" try: frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}') for method_name in FRAME_PROCESSORS_INTERFACE: if not hasattr(frame_processor_module, method_name): + print(f"Frame processor {frame_processor} missing method: {method_name}") sys.exit() except ImportError: print(f"Frame processor {frame_processor} not found") @@ -31,6 +31,7 @@ def load_frame_processor_module(frame_processor: str) -> Any: def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]: + """Get or load all frame processor modules for the given list.""" global FRAME_PROCESSORS_MODULES if not FRAME_PROCESSORS_MODULES: @@ -40,33 +41,32 @@ def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType set_frame_processors_modules_from_ui(frame_processors) return FRAME_PROCESSORS_MODULES + def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None: + """ + Update FRAME_PROCESSORS_MODULES based on UI state. + Adds or removes frame processor modules according to the UI toggles in modules.globals.fp_ui. + """ global FRAME_PROCESSORS_MODULES current_processor_names = [proc.__name__.split('.')[-1] for proc in FRAME_PROCESSORS_MODULES] - for frame_processor, state in modules.globals.fp_ui.items(): - if state == True and frame_processor not in current_processor_names: + if state is True and frame_processor not in current_processor_names: try: frame_processor_module = load_frame_processor_module(frame_processor) FRAME_PROCESSORS_MODULES.append(frame_processor_module) - if frame_processor not in modules.globals.frame_processors: - modules.globals.frame_processors.append(frame_processor) except SystemExit: - print(f"Warning: Failed to load frame processor {frame_processor} requested by UI state.") + print(f"SystemExit: Could not load frame processor '{frame_processor}'.") except Exception as e: - print(f"Warning: Error loading frame processor {frame_processor} requested by UI state: {e}") - - elif state == False and frame_processor in current_processor_names: + print(f"Error loading frame processor '{frame_processor}': {e}") + elif state is False and frame_processor in current_processor_names: try: - module_to_remove = next((mod for mod in FRAME_PROCESSORS_MODULES if mod.__name__.endswith(f'.{frame_processor}')), None) - if module_to_remove: - FRAME_PROCESSORS_MODULES.remove(module_to_remove) - if frame_processor in modules.globals.frame_processors: - modules.globals.frame_processors.remove(frame_processor) + FRAME_PROCESSORS_MODULES = [proc for proc in FRAME_PROCESSORS_MODULES if proc.__name__.split('.')[-1] != frame_processor] except Exception as e: - print(f"Warning: Error removing frame processor {frame_processor}: {e}") + print(f"Error removing frame processor '{frame_processor}': {e}") + def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: + """Process frames in parallel using a thread pool.""" with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor: futures = [] for path in temp_frame_paths: @@ -76,7 +76,8 @@ def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_f future.result() -def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None: +def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None: + """Process a video by processing all frames with a progress bar.""" progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' total = len(frame_paths) with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: diff --git a/modules/processors/frame/face_enhancer.py b/modules/processors/frame/face_enhancer.py index de192e6..86cc1a5 100644 --- a/modules/processors/frame/face_enhancer.py +++ b/modules/processors/frame/face_enhancer.py @@ -1,16 +1,14 @@ -from typing import Any, List +import os import cv2 import threading -import gfpgan -import os - -import modules.globals -import modules.processors.frame.core +import platform +import torch +import modules +import numpy as np +from typing import Any, List from modules.core import update_status from modules.face_analyser import get_one_face from modules.typing import Frame, Face -import platform -import torch from modules.utilities import ( conditional_download, is_image, @@ -29,6 +27,7 @@ models_dir = os.path.join( def pre_check() -> bool: + """Ensure required model is downloaded.""" download_directory_path = models_dir conditional_download( download_directory_path, @@ -40,6 +39,7 @@ def pre_check() -> bool: def pre_start() -> bool: + """Check if target path is valid before starting.""" if not is_image(modules.globals.target_path) and not is_video( modules.globals.target_path ): @@ -50,52 +50,54 @@ def pre_start() -> bool: TENSORRT_AVAILABLE = False try: - import torch_tensorrt + import tensorrt TENSORRT_AVAILABLE = True except ImportError as im: print(f"TensorRT is not available: {im}") - pass except Exception as e: print(f"TensorRT is not available: {e}") - pass + def get_face_enhancer() -> Any: + """Thread-safe singleton loader for the face enhancer model.""" global FACE_ENHANCER - with THREAD_LOCK: if FACE_ENHANCER is None: model_path = os.path.join(models_dir, "GFPGANv1.4.pth") - - selected_device = None - device_priority = [] - + selected_device = "cpu" if TENSORRT_AVAILABLE and torch.cuda.is_available(): - selected_device = torch.device("cuda") - device_priority.append("TensorRT+CUDA") + selected_device = "cuda" elif torch.cuda.is_available(): - selected_device = torch.device("cuda") - device_priority.append("CUDA") - elif torch.backends.mps.is_available() and platform.system() == "Darwin": - selected_device = torch.device("mps") - device_priority.append("MPS") - elif not torch.cuda.is_available(): - selected_device = torch.device("cpu") - device_priority.append("CPU") - - FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device) + selected_device = "cuda" + elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available() and platform.system() == "Darwin": + selected_device = "mps" + # Import GFPGAN only when needed + try: + import gfpgan - # for debug: - print(f"Selected device: {selected_device} and device priority: {device_priority}") + FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device) + except Exception as e: + print(f"Failed to load GFPGAN: {e}") + FACE_ENHANCER = None return FACE_ENHANCER -def enhance_face(temp_frame: Frame) -> Frame: +def enhance_face(temp_frame: Any) -> Any: + """Enhance a face in the given frame using GFPGAN.""" with THREAD_SEMAPHORE: - _, _, temp_frame = get_face_enhancer().enhance(temp_frame, paste_back=True) + enhancer = get_face_enhancer() + if enhancer is None: + print("Face enhancer model not loaded.") + return temp_frame + try: + _, _, temp_frame = enhancer.enhance(temp_frame, paste_back=True) + except Exception as e: + print(f"Face enhancement failed: {e}") return temp_frame -def process_frame(source_face: Face, temp_frame: Frame) -> Frame: +def process_frame(source_face: Any, temp_frame: Any) -> Any: + """Process a single frame for face enhancement.""" target_face = get_one_face(temp_frame) if target_face: temp_frame = enhance_face(temp_frame) @@ -105,25 +107,33 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: def process_frames( source_path: str, temp_frame_paths: List[str], progress: Any = None ) -> None: + """Process a list of frames for face enhancement, updating progress and handling errors.""" for temp_frame_path in temp_frame_paths: temp_frame = cv2.imread(temp_frame_path) - result = process_frame(None, temp_frame) - cv2.imwrite(temp_frame_path, result) - if progress: - progress.update(1) + try: + result = process_frame(None, temp_frame) + cv2.imwrite(temp_frame_path, result) + except Exception as e: + print(f"Frame enhancement failed: {e}") + finally: + if progress: + progress.update(1) def process_image(source_path: str, target_path: str, output_path: str) -> None: + """Process a single image for face enhancement.""" target_frame = cv2.imread(target_path) result = process_frame(None, target_frame) cv2.imwrite(output_path, result) def process_video(source_path: str, temp_frame_paths: List[str]) -> None: + """Process a video for face enhancement.""" modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames) -def process_frame_v2(temp_frame: Frame) -> Frame: +def process_frame_v2(temp_frame: Any) -> Any: + """Alternative frame processing for face enhancement (for mapped faces, if needed).""" target_face = get_one_face(temp_frame) if target_face: temp_frame = enhance_face(temp_frame) diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 36b83d6..bdc27c4 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -28,17 +28,19 @@ models_dir = os.path.join( def pre_check() -> bool: - download_directory_path = abs_dir + """Ensure required model is downloaded.""" + download_directory_path = models_dir conditional_download( download_directory_path, [ - "https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx" + "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx" ], ) return True def pre_start() -> bool: + """Check if source and target paths are valid before starting.""" if not modules.globals.map_faces and not is_image(modules.globals.source_path): update_status("Select an image for source path.", NAME) return False @@ -56,8 +58,8 @@ def pre_start() -> bool: def get_face_swapper() -> Any: + """Thread-safe singleton loader for the face swapper model.""" global FACE_SWAPPER - with THREAD_LOCK: if FACE_SWAPPER is None: model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") @@ -67,41 +69,44 @@ def get_face_swapper() -> Any: return FACE_SWAPPER -def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: +def swap_face(source_face: Any, target_face: Any, temp_frame: Any) -> Any: + """Swap source_face onto target_face in temp_frame, with improved Poisson blending and optional mouth region blending.""" face_swapper = get_face_swapper() - - # Apply the face swap - swapped_frame = face_swapper.get( - temp_frame, target_face, source_face, paste_back=True - ) - - if modules.globals.mouth_mask: - # Create a mask for the target face - face_mask = create_face_mask(target_face, temp_frame) - - # Create the mouth mask - mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = ( - create_lower_mouth_mask(target_face, temp_frame) + try: + face_swapper = get_face_swapper() + swapped_frame = face_swapper.get( + temp_frame, target_face, source_face, paste_back=True ) - - # Apply the mouth area - swapped_frame = apply_mouth_area( - swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon - ) - - if modules.globals.show_mouth_mask_box: - mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) - swapped_frame = draw_mouth_mask_visualization( - swapped_frame, target_face, mouth_mask_data - ) - - return swapped_frame + if modules.globals.color_correction: + mask = create_face_mask(target_face, temp_frame) + # Find the center of the mask for seamlessClone + y_indices, x_indices = np.where(mask > 0) + if len(x_indices) > 0 and len(y_indices) > 0: + center_x = int(np.mean(x_indices)) + center_y = int(np.mean(y_indices)) + center = (center_x, center_y) + # Use seamlessClone for Poisson blending + swapped_frame = cv2.seamlessClone( + swapped_frame, temp_frame, mask, center, cv2.NORMAL_CLONE + ) + # --- Mouth region blending (optional, after Poisson blending) --- + if hasattr(modules.globals, "mouth_mask") and modules.globals.mouth_mask: + # Extract mouth region from the original frame + mouth_mask_data = create_lower_mouth_mask(target_face, temp_frame) + if mouth_mask_data is not None: + mask, mouth_cutout, mouth_box, mouth_polygon = mouth_mask_data + face_mask = create_face_mask(target_face, temp_frame) + swapped_frame = apply_mouth_area( + swapped_frame, mouth_cutout, mouth_box, face_mask, mouth_polygon + ) + return swapped_frame + except Exception as e: + logging.error(f"Face swap failed: {e}") + return temp_frame -def process_frame(source_face: Face, temp_frame: Frame) -> Frame: - if modules.globals.color_correction: - temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) - +def process_frame(source_face: Any, temp_frame: Any) -> Any: + """Process a single frame for face swapping.""" if modules.globals.many_faces: many_faces = get_many_faces(temp_frame) if many_faces: @@ -109,7 +114,7 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: if source_face and target_face: temp_frame = swap_face(source_face, target_face, temp_frame) else: - print("Face detection failed for target/source.") + logging.warning("Face detection failed for target/source.") else: target_face = get_one_face(temp_frame) if target_face and source_face: @@ -119,8 +124,8 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: return temp_frame - -def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: +def process_frame_v2(temp_frame: Any, temp_frame_path: str = "") -> Any: + """Process a frame using mapped faces (for mapped face mode).""" if is_image(modules.globals.target_path): if modules.globals.many_faces: source_face = default_source_face() @@ -213,45 +218,70 @@ def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: def process_frames( source_path: str, temp_frame_paths: List[str], progress: Any = None ) -> None: + """Process a list of frames for face swapping, updating progress and handling errors.""" if not modules.globals.map_faces: source_face = get_one_face(cv2.imread(source_path)) + if source_face is None: + logging.warning("No face detected in source image. Skipping all frames.") + if progress: + for _ in temp_frame_paths: + progress.update(1) + return for temp_frame_path in temp_frame_paths: temp_frame = cv2.imread(temp_frame_path) try: result = process_frame(source_face, temp_frame) - cv2.imwrite(temp_frame_path, result) + if np.array_equal(result, temp_frame): + logging.warning(f"No face detected in target frame: {temp_frame_path}. Skipping write.") + else: + cv2.imwrite(temp_frame_path, result) except Exception as exception: - print(exception) - pass - if progress: - progress.update(1) + logging.error(f"Frame processing failed: {exception}") + finally: + if progress: + progress.update(1) else: for temp_frame_path in temp_frame_paths: temp_frame = cv2.imread(temp_frame_path) try: result = process_frame_v2(temp_frame, temp_frame_path) - cv2.imwrite(temp_frame_path, result) + if np.array_equal(result, temp_frame): + logging.warning(f"No face detected in mapped target frame: {temp_frame_path}. Skipping write.") + else: + cv2.imwrite(temp_frame_path, result) except Exception as exception: - print(exception) - pass - if progress: - progress.update(1) + logging.error(f"Frame processing failed: {exception}") + finally: + if progress: + progress.update(1) -def process_image(source_path: str, target_path: str, output_path: str) -> None: +def process_image(source_path: str, target_path: str, output_path: str) -> bool: + """Process a single image and return True if successful, False if no face detected.""" if not modules.globals.map_faces: source_face = get_one_face(cv2.imread(source_path)) + if source_face is None: + logging.warning("No face detected in source image. Skipping output.") + return False target_frame = cv2.imread(target_path) result = process_frame(source_face, target_frame) + if np.array_equal(result, target_frame): + logging.warning("No face detected in target image. Skipping output.") + return False cv2.imwrite(output_path, result) + return True else: if modules.globals.many_faces: update_status( "Many faces enabled. Using first source image. Progressing...", NAME ) - target_frame = cv2.imread(output_path) + target_frame = cv2.imread(target_path) result = process_frame_v2(target_frame) + if np.array_equal(result, target_frame): + logging.warning("No face detected in mapped target image. Skipping output.") + return False cv2.imwrite(output_path, result) + return True def process_video(source_path: str, temp_frame_paths: List[str]) -> None: @@ -264,9 +294,21 @@ def process_video(source_path: str, temp_frame_paths: List[str]) -> None: ) -def create_lower_mouth_mask( - face: Face, frame: Frame -) -> (np.ndarray, np.ndarray, tuple, np.ndarray): +def color_transfer(source: np.ndarray, target: np.ndarray) -> np.ndarray: + source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") + target_lab = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") + s_mean, s_std = cv2.meanStdDev(source_lab) + t_mean, t_std = cv2.meanStdDev(target_lab) + s_mean = s_mean.reshape(1, 1, 3) + s_std = s_std.reshape(1, 1, 3) + t_mean = t_mean.reshape(1, 1, 3) + t_std = t_std.reshape(1, 1, 3) + result = (source_lab - s_mean) * (t_std / (s_std + 1e-6)) + t_mean + result = np.clip(result, 0, 255).astype("uint8") + return cv2.cvtColor(result, cv2.COLOR_LAB2BGR) + + +def create_lower_mouth_mask(face, frame: np.ndarray): mask = np.zeros(frame.shape[:2], dtype=np.uint8) mouth_cutout = None landmarks = face.landmark_2d_106 @@ -381,9 +423,7 @@ def create_lower_mouth_mask( return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon -def draw_mouth_mask_visualization( - frame: Frame, face: Face, mouth_mask_data: tuple -) -> Frame: +def draw_mouth_mask_visualization(frame: np.ndarray, face, mouth_mask_data: tuple) -> np.ndarray: landmarks = face.landmark_2d_106 if landmarks is not None and mouth_mask_data is not None: mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = ( @@ -492,7 +532,7 @@ def apply_mouth_area( resized_mouth_cutout, (roi.shape[1], roi.shape[0]) ) - color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi) + color_corrected_mouth = color_transfer(resized_mouth_cutout, roi) # Use the provided mouth polygon to create the mask polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8) @@ -531,7 +571,7 @@ def apply_mouth_area( return frame -def create_face_mask(face: Face, frame: Frame) -> np.ndarray: +def create_face_mask(face, frame: np.ndarray) -> np.ndarray: mask = np.zeros(frame.shape[:2], dtype=np.uint8) landmarks = face.landmark_2d_106 if landmarks is not None: @@ -598,25 +638,3 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray: mask = cv2.GaussianBlur(mask, (5, 5), 3) return mask - - -def apply_color_transfer(source, target): - """ - Apply color transfer from target to source image - """ - source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") - target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") - - source_mean, source_std = cv2.meanStdDev(source) - target_mean, target_std = cv2.meanStdDev(target) - - # Reshape mean and std to be broadcastable - source_mean = source_mean.reshape(1, 1, 3) - source_std = source_std.reshape(1, 1, 3) - target_mean = target_mean.reshape(1, 1, 3) - target_std = target_std.reshape(1, 1, 3) - - # Perform the color transfer - source = (source - source_mean) * (target_std / source_std) + target_mean - - return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) diff --git a/modules/typing.py b/modules/typing.py index 1cff744..e5e8a5a 100644 --- a/modules/typing.py +++ b/modules/typing.py @@ -1,7 +1,9 @@ from typing import Any -from insightface.app.common import Face +from insightface.app.common import Face as InsightFace import numpy -Face = Face -Frame = numpy.ndarray[Any, Any] +# Alias for a detected face object from insightface +Face = InsightFace +# Alias for a numpy ndarray representing an image frame +Frame = numpy.ndarray diff --git a/modules/ui.py b/modules/ui.py index ce599d6..8af999b 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -28,6 +28,12 @@ from modules.utilities import ( from modules.video_capture import VideoCapturer from modules.gettext import LanguageManager import platform +try: + import pyvirtualcam + PYVIRTUALCAM_AVAILABLE = True +except ImportError: + PYVIRTUALCAM_AVAILABLE = False + print("pyvirtualcam is not installed. Virtual camera support will be disabled.") if platform.system() == "Windows": from pygrabber.dshow_graph import FilterGraph @@ -363,7 +369,17 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C ), ) live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) - # --- End Camera Selection --- + + # --- Virtual Camera Toggle --- + virtual_cam_button = ctk.CTkButton( + root, + text=_("Toggle Virtual Cam"), + cursor="hand2", + command=toggle_virtual_cam, + state=("normal" if PYVIRTUALCAM_AVAILABLE else "disabled"), + ) + virtual_cam_button.place(relx=0.1, rely=0.92, relwidth=0.35, relheight=0.05) + # --- End Virtual Camera Toggle --- status_label = ctk.CTkLabel(root, text=None, justify="center") status_label.place(relx=0.1, rely=0.9, relwidth=0.8) @@ -797,75 +813,61 @@ def webcam_preview(root: ctk.CTk, camera_index: int): ) +virtual_cam_manager = VirtualCamManager() +virtual_cam_enabled = False # Use a global variable for clarity -def get_available_cameras(): - """Returns a list of available camera names and indices.""" - if platform.system() == "Windows": - try: - graph = FilterGraph() - devices = graph.get_input_devices() - - # Create list of indices and names - camera_indices = list(range(len(devices))) - camera_names = devices - - # If no cameras found through DirectShow, try OpenCV fallback - if not camera_names: - # Try to open camera with index -1 and 0 - test_indices = [-1, 0] - working_cameras = [] - - for idx in test_indices: - cap = cv2.VideoCapture(idx) - if cap.isOpened(): - working_cameras.append(f"Camera {idx}") - cap.release() - - if working_cameras: - return test_indices[: len(working_cameras)], working_cameras - - # If still no cameras found, return empty lists - if not camera_names: - return [], ["No cameras found"] - - return camera_indices, camera_names - - except Exception as e: - print(f"Error detecting cameras: {str(e)}") - return [], ["No cameras found"] +def toggle_virtual_cam(): + global virtual_cam_enabled + if not PYVIRTUALCAM_AVAILABLE: + update_status("pyvirtualcam not installed. Cannot enable virtual camera.") + return + if not virtual_cam_enabled: + virtual_cam_manager.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 30) + virtual_cam_enabled = True + update_status("Virtual camera enabled.") else: - # Unix-like systems (Linux/Mac) camera detection - camera_indices = [] - camera_names = [] + virtual_cam_manager.stop() + virtual_cam_enabled = False + update_status("Virtual camera disabled.") - if platform.system() == "Darwin": # macOS specific handling - # Try to open the default FaceTime camera first - cap = cv2.VideoCapture(0) - if cap.isOpened(): - camera_indices.append(0) - camera_names.append("FaceTime Camera") - cap.release() +class VirtualCamManager: + """Manages the virtual camera output using pyvirtualcam.""" + def __init__(self): + self.cam = None + self.enabled = False + self.width = PREVIEW_DEFAULT_WIDTH + self.height = PREVIEW_DEFAULT_HEIGHT + self.fps = 30 - # On macOS, additional cameras typically use indices 1 and 2 - for i in [1, 2]: - cap = cv2.VideoCapture(i) - if cap.isOpened(): - camera_indices.append(i) - camera_names.append(f"Camera {i}") - cap.release() - else: - # Linux camera detection - test first 10 indices - for i in range(10): - cap = cv2.VideoCapture(i) - if cap.isOpened(): - camera_indices.append(i) - camera_names.append(f"Camera {i}") - cap.release() + def start(self, width: int, height: int, fps: int = 30): + if self.cam is None: + try: + self.cam = pyvirtualcam.Camera(width=width, height=height, fps=fps, print_fps=False) + self.enabled = True + print("Virtual camera started.") + except Exception as e: + print(f"Failed to start virtual camera: {e}") + self.cam = None + self.enabled = False - if not camera_names: - return [], ["No cameras found"] + def send(self, frame): + if self.cam and self.enabled: + try: + # pyvirtualcam expects RGB + if frame.shape[2] == 3: + self.cam.send(frame) + self.cam.sleep_until_next_frame() + except Exception as e: + print(f"Error sending frame to virtual camera: {e}") - return camera_indices, camera_names + def stop(self): + if self.cam: + try: + self.cam.close() + except Exception as e: + print(f"Error closing virtual camera: {e}") + self.cam = None + self.enabled = False def create_webcam_preview(camera_index: int): @@ -885,10 +887,23 @@ def create_webcam_preview(camera_index: int): fps_update_interval = 0.5 frame_count = 0 fps = 0 + face_swap_enabled = True # Toggle for live face swap + last_face_detected = True + no_face_counter = 0 + NO_FACE_THRESHOLD = 30 # Number of frames to show warning if no face + + def toggle_face_swap(): + nonlocal face_swap_enabled + face_swap_enabled = not face_swap_enabled + update_status(f"Face Swap {'Enabled' if face_swap_enabled else 'Disabled'}") + + # Optionally, bind a key or button to toggle_face_swap + PREVIEW.bind('', lambda e: toggle_face_swap()) while True: ret, frame = cap.read() if not ret: + update_status("Camera frame read failed.") break temp_frame = frame.copy() @@ -900,30 +915,56 @@ def create_webcam_preview(camera_index: int): temp_frame = fit_image_to_size( temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() ) - else: temp_frame = fit_image_to_size( temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() ) - if not modules.globals.map_faces: - if source_image is None and modules.globals.source_path: - source_image = get_one_face(cv2.imread(modules.globals.source_path)) + face_found = True + if face_swap_enabled: + if not modules.globals.map_faces: + if source_image is None and modules.globals.source_path: + source_image = get_one_face(cv2.imread(modules.globals.source_path)) - for frame_processor in frame_processors: - if frame_processor.NAME == "DLC.FACE-ENHANCER": - if modules.globals.fp_ui["face_enhancer"]: - temp_frame = frame_processor.process_frame(None, temp_frame) - else: - temp_frame = frame_processor.process_frame(source_image, temp_frame) - else: - modules.globals.target_path = None - for frame_processor in frame_processors: - if frame_processor.NAME == "DLC.FACE-ENHANCER": - if modules.globals.fp_ui["face_enhancer"]: + for frame_processor in frame_processors: + if frame_processor.NAME == "DLC.FACE-ENHANCER": + if modules.globals.fp_ui["face_enhancer"]: + temp_frame = frame_processor.process_frame(None, temp_frame) + else: + # Check if a face is detected before swapping + detected_face = get_one_face(temp_frame) + if detected_face is not None and source_image is not None: + temp_frame = frame_processor.process_frame(source_image, temp_frame) + last_face_detected = True + no_face_counter = 0 + else: + face_found = False + no_face_counter += 1 + else: + modules.globals.target_path = None + for frame_processor in frame_processors: + if frame_processor.NAME == "DLC.FACE-ENHANCER": + if modules.globals.fp_ui["face_enhancer"]: + temp_frame = frame_processor.process_frame_v2(temp_frame) + else: temp_frame = frame_processor.process_frame_v2(temp_frame) - else: - temp_frame = frame_processor.process_frame_v2(temp_frame) + else: + # Face swap disabled, just show the frame + pass + + # Show warning if no face detected for a while + if not face_found and no_face_counter > NO_FACE_THRESHOLD: + cv2.putText( + temp_frame, + "No face detected!", + (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, + 1.2, + (0, 0, 255), + 3, + ) + elif face_found: + no_face_counter = 0 # Calculate and display FPS current_time = time.time() @@ -958,6 +999,7 @@ def create_webcam_preview(camera_index: int): cap.release() PREVIEW.withdraw() + update_status("Webcam preview closed.") def create_source_target_popup_for_webcam( diff --git a/modules/utilities.py b/modules/utilities.py index fe17997..3a0625d 100644 --- a/modules/utilities.py +++ b/modules/utilities.py @@ -2,10 +2,10 @@ import glob import mimetypes import os import platform -import shutil import ssl import subprocess -import urllib +import cv2 +import modules from pathlib import Path from typing import List, Any from tqdm import tqdm @@ -21,6 +21,7 @@ if platform.system().lower() == "darwin": def run_ffmpeg(args: List[str]) -> bool: + """Run an ffmpeg command with the given arguments.""" commands = [ "ffmpeg", "-hide_banner", @@ -31,14 +32,15 @@ def run_ffmpeg(args: List[str]) -> bool: ] commands.extend(args) try: - subprocess.check_output(commands, stderr=subprocess.STDOUT) + subprocess.run(commands, check=True) return True - except Exception: - pass - return False + except Exception as e: + print(f"Error running ffmpeg: {e}") + return False def detect_fps(target_path: str) -> float: + """Detect the FPS of a video file using ffprobe.""" command = [ "ffprobe", "-v", @@ -51,16 +53,18 @@ def detect_fps(target_path: str) -> float: "default=noprint_wrappers=1:nokey=1", target_path, ] - output = subprocess.check_output(command).decode().strip().split("/") try: - numerator, denominator = map(int, output) - return numerator / denominator - except Exception: - pass - return 30.0 + output = subprocess.check_output(command).decode().strip().split("/") + if len(output) == 2: + return float(output[0]) / float(output[1]) + return float(output[0]) + except Exception as e: + print(f"Error detecting FPS: {e}") + return 30.0 def extract_frames(target_path: str) -> None: + """Extract frames from a video file to a temp directory.""" temp_directory_path = get_temp_directory_path(target_path) run_ffmpeg( [ @@ -74,6 +78,7 @@ def extract_frames(target_path: str) -> None: def create_video(target_path: str, fps: float = 30.0) -> None: + """Create a video from frames in the temp directory.""" temp_output_path = get_temp_output_path(target_path) temp_directory_path = get_temp_directory_path(target_path) run_ffmpeg( @@ -97,6 +102,7 @@ def create_video(target_path: str, fps: float = 30.0) -> None: def restore_audio(target_path: str, output_path: str) -> None: + """Restore audio from the original video to the output video.""" temp_output_path = get_temp_output_path(target_path) done = run_ffmpeg( [ @@ -115,95 +121,107 @@ def restore_audio(target_path: str, output_path: str) -> None: ] ) if not done: - move_temp(target_path, output_path) + print(f"Failed to restore audio for {output_path}") def get_temp_frame_paths(target_path: str) -> List[str]: + """Get all temp frame file paths for a given target path.""" temp_directory_path = get_temp_directory_path(target_path) - return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png"))) + try: + return sorted([ + str(p) for p in Path(temp_directory_path).glob("*.png") + ]) + except Exception as e: + print(f"Error getting temp frame paths: {e}") + return [] def get_temp_directory_path(target_path: str) -> str: - target_name, _ = os.path.splitext(os.path.basename(target_path)) - target_directory_path = os.path.dirname(target_path) - return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) + """Get the temp directory path for a given target path.""" + base = os.path.splitext(os.path.basename(target_path))[0] + temp_dir = os.path.join(TEMP_DIRECTORY, base) + os.makedirs(temp_dir, exist_ok=True) + return temp_dir def get_temp_output_path(target_path: str) -> str: - temp_directory_path = get_temp_directory_path(target_path) - return os.path.join(temp_directory_path, TEMP_FILE) + """Get the temp output video path for a given target path.""" + base = os.path.splitext(os.path.basename(target_path))[0] + return os.path.join(TEMP_DIRECTORY, f"{base}_out.mp4") def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: - if source_path and target_path: - source_name, _ = os.path.splitext(os.path.basename(source_path)) - target_name, target_extension = os.path.splitext(os.path.basename(target_path)) - if os.path.isdir(output_path): - return os.path.join( - output_path, source_name + "-" + target_name + target_extension - ) + """Normalize the output path for saving results.""" + if not output_path: + base = os.path.splitext(os.path.basename(target_path))[0] + return os.path.join(TEMP_DIRECTORY, f"{base}_result.png") return output_path def create_temp(target_path: str) -> None: + """Create a temp directory for a given target path.""" temp_directory_path = get_temp_directory_path(target_path) - Path(temp_directory_path).mkdir(parents=True, exist_ok=True) + os.makedirs(temp_directory_path, exist_ok=True) def move_temp(target_path: str, output_path: str) -> None: + """Move temp output to the final output path.""" temp_output_path = get_temp_output_path(target_path) - if os.path.isfile(temp_output_path): - if os.path.isfile(output_path): - os.remove(output_path) - shutil.move(temp_output_path, output_path) + try: + os.rename(temp_output_path, output_path) + except Exception as e: + print(f"Error moving temp output: {e}") def clean_temp(target_path: str) -> None: + """Remove temp directory and files for a given target path.""" temp_directory_path = get_temp_directory_path(target_path) - parent_directory_path = os.path.dirname(temp_directory_path) - if not modules.globals.keep_frames and os.path.isdir(temp_directory_path): - shutil.rmtree(temp_directory_path) - if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): - os.rmdir(parent_directory_path) + try: + for p in Path(temp_directory_path).glob("*"): + p.unlink() + os.rmdir(temp_directory_path) + except Exception as e: + print(f"Error cleaning temp directory: {e}") def has_image_extension(image_path: str) -> bool: - return image_path.lower().endswith(("png", "jpg", "jpeg")) + """Check if a file has an image extension.""" + return os.path.splitext(image_path)[1].lower() in [ + ".png", ".jpg", ".jpeg", ".gif", ".bmp" + ] def is_image(image_path: str) -> bool: - if image_path and os.path.isfile(image_path): - mimetype, _ = mimetypes.guess_type(image_path) - return bool(mimetype and mimetype.startswith("image/")) - return False + """Check if a file is an image.""" + return has_image_extension(image_path) def is_video(video_path: str) -> bool: - if video_path and os.path.isfile(video_path): - mimetype, _ = mimetypes.guess_type(video_path) - return bool(mimetype and mimetype.startswith("video/")) - return False + """Check if a file is a video.""" + return os.path.splitext(video_path)[1].lower() in [ + ".mp4", ".mkv" + ] def conditional_download(download_directory_path: str, urls: List[str]) -> None: - if not os.path.exists(download_directory_path): - os.makedirs(download_directory_path) + """Download files from URLs if they do not exist in the directory.""" + import requests for url in urls: - download_file_path = os.path.join( - download_directory_path, os.path.basename(url) - ) - if not os.path.exists(download_file_path): - request = urllib.request.urlopen(url) # type: ignore[attr-defined] - total = int(request.headers.get("Content-Length", 0)) - with tqdm( - total=total, - desc="Downloading", - unit="B", - unit_scale=True, - unit_divisor=1024, - ) as progress: - urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] + filename = os.path.basename(url) + file_path = os.path.join(download_directory_path, filename) + if not os.path.exists(file_path): + try: + print(f"Downloading {url}...") + r = requests.get(url, stream=True) + with open(file_path, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + print(f"Downloaded {filename}") + except Exception as e: + print(f"Error downloading {url}: {e}") def resolve_relative_path(path: str) -> str: - return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) + """Resolve a relative path to an absolute path.""" + return os.path.abspath(path) diff --git a/modules/video_capture.py b/modules/video_capture.py index cab223d..c15c5b6 100644 --- a/modules/video_capture.py +++ b/modules/video_capture.py @@ -1,8 +1,8 @@ import cv2 import numpy as np -from typing import Optional, Tuple, Callable import platform import threading +from typing import Optional, Tuple, Callable # Only import Windows-specific library if on Windows if platform.system() == "Windows": @@ -11,17 +11,15 @@ if platform.system() == "Windows": class VideoCapturer: def __init__(self, device_index: int): + """Initialize the video capturer for a given device index.""" self.device_index = device_index self.frame_callback = None self._current_frame = None self._frame_ready = threading.Event() self.is_running = False self.cap = None - - # Initialize Windows-specific components if on Windows if platform.system() == "Windows": self.graph = FilterGraph() - # Verify device exists devices = self.graph.get_input_devices() if self.device_index >= len(devices): raise ValueError( @@ -29,40 +27,31 @@ class VideoCapturer: ) def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: - """Initialize and start video capture""" + """Initialize and start video capture.""" try: if platform.system() == "Windows": - # Windows-specific capture methods capture_methods = [ - (self.device_index, cv2.CAP_DSHOW), # Try DirectShow first - (self.device_index, cv2.CAP_ANY), # Then try default backend - (-1, cv2.CAP_ANY), # Try -1 as fallback - (0, cv2.CAP_ANY), # Finally try 0 without specific backend + (self.device_index, cv2.CAP_DSHOW), + (self.device_index, cv2.CAP_ANY), + (-1, cv2.CAP_ANY), + (0, cv2.CAP_ANY), ] - for dev_id, backend in capture_methods: try: self.cap = cv2.VideoCapture(dev_id, backend) if self.cap.isOpened(): break - self.cap.release() - except Exception: - continue + except Exception as e: + print(f"Error opening camera with backend {backend}: {e}") else: - # Unix-like systems (Linux/Mac) capture method self.cap = cv2.VideoCapture(self.device_index) - if not self.cap or not self.cap.isOpened(): raise RuntimeError("Failed to open camera") - - # Configure format self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) self.cap.set(cv2.CAP_PROP_FPS, fps) - self.is_running = True return True - except Exception as e: print(f"Failed to start capture: {str(e)}") if self.cap: @@ -70,10 +59,9 @@ class VideoCapturer: return False def read(self) -> Tuple[bool, Optional[np.ndarray]]: - """Read a frame from the camera""" + """Read a frame from the camera.""" if not self.is_running or self.cap is None: return False, None - ret, frame = self.cap.read() if ret: self._current_frame = frame @@ -83,12 +71,12 @@ class VideoCapturer: return False, None def release(self) -> None: - """Stop capture and release resources""" + """Stop capture and release resources.""" if self.is_running and self.cap is not None: self.cap.release() self.is_running = False self.cap = None def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: - """Set callback for frame processing""" + """Set callback for frame processing.""" self.frame_callback = callback diff --git a/push_to_new_branch.sh b/push_to_new_branch.sh new file mode 100644 index 0000000..b290cab --- /dev/null +++ b/push_to_new_branch.sh @@ -0,0 +1,19 @@ +#!/bin/zsh +# push_to_new_branch.sh - Commit and push changes to a new branch in Deep-Live-Cam-remote + +REPO_DIR="Deep-Live-Cam-remote" +BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)" + +if [ ! -d "$REPO_DIR/.git" ]; then + echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first." + exit 1 +fi + +cd "$REPO_DIR" +git add . +echo "Enter a commit message: " +read COMMIT_MSG +git commit -m "$COMMIT_MSG" +git checkout -b "$BRANCH_NAME" +git push origin "$BRANCH_NAME" +echo "Pushed to branch $BRANCH_NAME on remote." diff --git a/push_to_rehanbgmi.sh b/push_to_rehanbgmi.sh new file mode 100755 index 0000000..e0f0f3c --- /dev/null +++ b/push_to_rehanbgmi.sh @@ -0,0 +1,23 @@ +#!/bin/zsh +# push_to_rehanbgmi.sh - Commit and push changes to your fork (rehanbgmi/deeplivceam) in Deep-Live-Cam-remote + +REPO_DIR="Deep-Live-Cam-remote" +FORK_URL="https://github.com/rehanbgmi/deeplivceam.git" +BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)" + +if [ ! -d "$REPO_DIR/.git" ]; then + echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first." + exit 1 +fi + +cd "$REPO_DIR" +# Set your fork as a remote if not already set +git remote | grep rehanbgmi > /dev/null || git remote add rehanbgmi "$FORK_URL" + +git add . +echo "Enter a commit message: " +read COMMIT_MSG +git commit -m "$COMMIT_MSG" +git checkout -b "$BRANCH_NAME" +git push rehanbgmi "$BRANCH_NAME" +echo "Pushed to branch $BRANCH_NAME on your fork (rehanbgmi/deeplivceam)." diff --git a/run-coreml-macos.sh b/run-coreml-macos.sh new file mode 100644 index 0000000..c87f43f --- /dev/null +++ b/run-coreml-macos.sh @@ -0,0 +1,4 @@ +#!/bin/zsh +# run-coreml-macos.sh - Run Deep-Live-Cam with CoreML (Apple Silicon) on macOS +source venv/bin/activate +python3.10 run.py --execution-provider coreml diff --git a/run-coreml.bat b/run-coreml.bat new file mode 100644 index 0000000..095fa8b --- /dev/null +++ b/run-coreml.bat @@ -0,0 +1,4 @@ +@echo off +REM run-coreml.bat - Run Deep-Live-Cam with CoreML (Apple Silicon) on Windows (for reference, not for actual use) +call venv\Scripts\activate +python run.py --execution-provider coreml diff --git a/run-cuda-macos.sh b/run-cuda-macos.sh new file mode 100644 index 0000000..47d5299 --- /dev/null +++ b/run-cuda-macos.sh @@ -0,0 +1,4 @@ +#!/bin/zsh +# run-cuda-macos.sh - Run Deep-Live-Cam with CUDA (Nvidia GPU) on macOS +source venv/bin/activate +python run.py --execution-provider cuda diff --git a/run-cuda.bat b/run-cuda.bat index 93042a7..aad12f5 100644 --- a/run-cuda.bat +++ b/run-cuda.bat @@ -1 +1,2 @@ -python run.py --execution-provider cuda +call venv\Scripts\activate +python run.py --execution-provider cuda \ No newline at end of file