Compare commits
	
		
			3 Commits 
		
	
	
		
			fd5537da72
			...
			0571424cca
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 0571424cca | |
|  | 2b70131e6a | |
|  | 6791f58761 | 
|  | @ -0,0 +1,20 @@ | |||
| @echo off | ||||
| REM clone_or_update_deep_live_cam.bat - Clone or update Deep-Live-Cam repo in a separate folder and sync to local working folder | ||||
| SET REPO_URL=https://github.com/hacksider/Deep-Live-Cam.git | ||||
| SET TARGET_DIR=Deep-Live-Cam-remote | ||||
| SET LOCAL_DIR=Deep-Live-Cam | ||||
| 
 | ||||
| IF EXIST %TARGET_DIR% ( | ||||
|     echo Updating existing repo in %TARGET_DIR% ... | ||||
|     cd %TARGET_DIR% | ||||
|     git pull | ||||
|     cd .. | ||||
| ) ELSE ( | ||||
|     echo Cloning repo to %TARGET_DIR% ... | ||||
|     git clone %REPO_URL% %TARGET_DIR% | ||||
| ) | ||||
| 
 | ||||
| REM Sync updated code to local working folder (excluding .git and models) | ||||
| xcopy %TARGET_DIR% %LOCAL_DIR% /E /H /Y /EXCLUDE:exclude.txt | ||||
| 
 | ||||
| echo Done. Latest code is in %LOCAL_DIR%. | ||||
|  | @ -0,0 +1,20 @@ | |||
| #!/bin/zsh | ||||
| # clone_or_update_deep_live_cam.sh - Clone or update Deep-Live-Cam repo in a separate folder (macOS/Linux) | ||||
| REPO_URL="https://github.com/hacksider/Deep-Live-Cam.git" | ||||
| TARGET_DIR="Deep-Live-Cam-remote" | ||||
| 
 | ||||
| if [ -d "$TARGET_DIR" ]; then | ||||
|   echo "Updating existing repo in $TARGET_DIR ..." | ||||
|   cd "$TARGET_DIR" | ||||
|   git pull | ||||
|   cd .. | ||||
| else | ||||
|   echo "Cloning repo to $TARGET_DIR ..." | ||||
|   git clone "$REPO_URL" "$TARGET_DIR" | ||||
| fi | ||||
| 
 | ||||
| # Sync updated code to local working folder (excluding .git and models) | ||||
| LOCAL_DIR="Deep-Live-Cam" | ||||
| rsync -av --exclude='.git' --exclude='models' --exclude='*.pth' --exclude='*.onnx' "$TARGET_DIR"/ "$LOCAL_DIR"/ | ||||
| 
 | ||||
| echo "Done. Latest code is in $LOCAL_DIR." | ||||
|  | @ -0,0 +1,4 @@ | |||
| .git | ||||
| models | ||||
| *.pth | ||||
| *.onnx | ||||
|  | @ -0,0 +1,44 @@ | |||
| #!/bin/bash | ||||
| # Deep-Live-Cam macOS Automated Setup | ||||
| set -e | ||||
| 
 | ||||
| # 1. Ensure Homebrew is installed | ||||
| if ! command -v brew &> /dev/null; then | ||||
|     echo "Homebrew not found. Please install Homebrew first: https://brew.sh/" | ||||
|     exit 1 | ||||
| fi | ||||
| 
 | ||||
| # 2. Install Python 3.10 and tkinter | ||||
| brew install python@3.10 python-tk@3.10 | ||||
| 
 | ||||
| # 3. Create and activate virtual environment | ||||
| PYTHON_BIN=$(brew --prefix python@3.10)/bin/python3.10 | ||||
| $PYTHON_BIN -m venv venv | ||||
| source venv/bin/activate | ||||
| 
 | ||||
| # 4. Upgrade pip and install dependencies | ||||
| pip install --upgrade pip | ||||
| pip install -r requirements.txt | ||||
| 
 | ||||
| # 5. Download models if not present | ||||
| mkdir -p models | ||||
| if [ ! -f models/GFPGANv1.4.pth ]; then | ||||
|     curl -L -o models/GFPGANv1.4.pth "https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth" | ||||
| fi | ||||
| if [ ! -f models/inswapper_128_fp16.onnx ]; then | ||||
|     curl -L -o models/inswapper_128_fp16.onnx "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx" | ||||
| fi | ||||
| 
 | ||||
| # 6. Run instructions for user | ||||
| 
 | ||||
| echo "\nSetup complete!" | ||||
| echo "To activate your environment and run Deep-Live-Cam, use one of the following commands:"  | ||||
| echo "" | ||||
| echo "# For CUDA (Nvidia GPU, if supported):" | ||||
| echo "source venv/bin/activate && python run.py --execution-provider cuda" | ||||
| echo "" | ||||
| echo "# For Apple Silicon (M1/M2/M3) CoreML:" | ||||
| echo "source venv/bin/activate && python3.10 run.py --execution-provider coreml" | ||||
| echo "" | ||||
| echo "# For CPU only:" | ||||
| echo "source venv/bin/activate && python run.py" | ||||
|  | @ -0,0 +1,36 @@ | |||
| @echo off | ||||
| REM Deep-Live-Cam Windows Automated Setup | ||||
| 
 | ||||
| REM 1. Create virtual environment | ||||
| python -m venv venv | ||||
| if errorlevel 1 ( | ||||
|     echo Failed to create virtual environment. Ensure Python 3.10+ is installed and in PATH. | ||||
|     exit /b 1 | ||||
| ) | ||||
| 
 | ||||
| REM 2. Activate virtual environment | ||||
| call venv\Scripts\activate | ||||
| if errorlevel 1 ( | ||||
|     echo Failed to activate virtual environment. | ||||
|     exit /b 1 | ||||
| ) | ||||
| 
 | ||||
| REM 3. Install dependencies | ||||
| pip install --upgrade pip | ||||
| pip install -r requirements.txt | ||||
| if errorlevel 1 ( | ||||
|     echo Failed to install dependencies. | ||||
|     exit /b 1 | ||||
| ) | ||||
| 
 | ||||
| REM 4. Download models (manual step if not present) | ||||
| echo Downloading models (if not already in models/)... | ||||
| if not exist models\GFPGANv1.4.pth ( | ||||
|     powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth -OutFile models\GFPGANv1.4.pth" | ||||
| ) | ||||
| if not exist models\inswapper_128_fp16.onnx ( | ||||
|     powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx -OutFile models\inswapper_128_fp16.onnx" | ||||
| ) | ||||
| 
 | ||||
| REM 5. Run the app | ||||
| python run.py | ||||
|  | @ -4,29 +4,35 @@ import modules.globals  # Import the globals to check the color correction toggl | |||
| 
 | ||||
| 
 | ||||
| def get_video_frame(video_path: str, frame_number: int = 0) -> Any: | ||||
|     """Extract a specific frame from a video file, with color correction if enabled.""" | ||||
|     capture = cv2.VideoCapture(video_path) | ||||
| 
 | ||||
|     try: | ||||
|         # Set MJPEG format to ensure correct color space handling | ||||
|         capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) | ||||
|      | ||||
|         # Only force RGB conversion if color correction is enabled | ||||
|         if modules.globals.color_correction: | ||||
|             capture.set(cv2.CAP_PROP_CONVERT_RGB, 1) | ||||
|      | ||||
|         frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) | ||||
|         capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) | ||||
|         has_frame, frame = capture.read() | ||||
| 
 | ||||
|         if has_frame and modules.globals.color_correction: | ||||
|         # Convert the frame color if necessary | ||||
|             frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | ||||
| 
 | ||||
|     capture.release() | ||||
|         return frame if has_frame else None | ||||
|     except Exception as e: | ||||
|         print(f"Error extracting video frame: {e}") | ||||
|         return None | ||||
|     finally: | ||||
|         capture.release() | ||||
| 
 | ||||
| 
 | ||||
| def get_video_frame_total(video_path: str) -> int: | ||||
|     """Return the total number of frames in a video file.""" | ||||
|     capture = cv2.VideoCapture(video_path) | ||||
|     try: | ||||
|         video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) | ||||
|     capture.release() | ||||
|         return video_frame_total | ||||
|     except Exception as e: | ||||
|         print(f"Error getting video frame total: {e}") | ||||
|         return 0 | ||||
|     finally: | ||||
|         capture.release() | ||||
|  |  | |||
|  | @ -1,26 +1,35 @@ | |||
| import numpy as np | ||||
| from sklearn.cluster import KMeans | ||||
| from sklearn.metrics import silhouette_score | ||||
| from typing import Any | ||||
| from typing import Any, List, Tuple | ||||
| 
 | ||||
| 
 | ||||
| def find_cluster_centroids(embeddings, max_k=10) -> Any: | ||||
| def find_cluster_centroids(embeddings: List[Any], max_k: int = 10) -> Any: | ||||
|     """Find optimal cluster centroids for a set of embeddings using KMeans.""" | ||||
|     inertia = [] | ||||
|     cluster_centroids = [] | ||||
|     K = range(1, max_k+1) | ||||
| 
 | ||||
|     for k in K: | ||||
|         try: | ||||
|             kmeans = KMeans(n_clusters=k, random_state=0) | ||||
|             kmeans.fit(embeddings) | ||||
|             inertia.append(kmeans.inertia_) | ||||
|             cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_}) | ||||
|         except Exception as e: | ||||
|             print(f"KMeans failed for k={k}: {e}") | ||||
| 
 | ||||
|     if len(inertia) < 2: | ||||
|         return cluster_centroids[0]['centroids'] if cluster_centroids else [] | ||||
| 
 | ||||
|     diffs = [inertia[i] - inertia[i+1] for i in range(len(inertia)-1)] | ||||
|     optimal_centroids = cluster_centroids[diffs.index(max(diffs)) + 1]['centroids'] | ||||
| 
 | ||||
|     return optimal_centroids | ||||
| 
 | ||||
| def find_closest_centroid(centroids: list, normed_face_embedding) -> list: | ||||
| 
 | ||||
| def find_closest_centroid(centroids: List[Any], normed_face_embedding: Any) -> Tuple[int, Any]: | ||||
|     """Find the index and value of the centroid closest to the given embedding.""" | ||||
|     try: | ||||
|         centroids = np.array(centroids) | ||||
|         normed_face_embedding = np.array(normed_face_embedding) | ||||
|  | @ -28,5 +37,6 @@ def find_closest_centroid(centroids: list, normed_face_embedding) -> list: | |||
|         closest_centroid_index = np.argmax(similarities) | ||||
| 
 | ||||
|         return closest_centroid_index, centroids[closest_centroid_index] | ||||
|     except ValueError: | ||||
|         return None | ||||
|     except Exception as e: | ||||
|         print(f"Error in find_closest_centroid: {e}") | ||||
|         return -1, None | ||||
|  | @ -1,11 +1,9 @@ | |||
| import os | ||||
| import shutil | ||||
| from typing import Any | ||||
| import insightface | ||||
| 
 | ||||
| from typing import Any, List | ||||
| import cv2 | ||||
| import numpy as np | ||||
| import modules.globals | ||||
| import insightface | ||||
| import modules | ||||
| from tqdm import tqdm | ||||
| from modules.typing import Frame | ||||
| from modules.cluster_analysis import find_cluster_centroids, find_closest_centroid | ||||
|  | @ -16,6 +14,7 @@ FACE_ANALYSER = None | |||
| 
 | ||||
| 
 | ||||
| def get_face_analyser() -> Any: | ||||
|     """Thread-safe singleton loader for the face analyser model.""" | ||||
|     global FACE_ANALYSER | ||||
| 
 | ||||
|     if FACE_ANALYSER is None: | ||||
|  | @ -24,166 +23,127 @@ def get_face_analyser() -> Any: | |||
|     return FACE_ANALYSER | ||||
| 
 | ||||
| 
 | ||||
| def get_one_face(frame: Frame) -> Any: | ||||
|     face = get_face_analyser().get(frame) | ||||
| def get_one_face(frame: Any) -> Any: | ||||
|     """Get the most prominent face from a frame.""" | ||||
|     try: | ||||
|         return min(face, key=lambda x: x.bbox[0]) | ||||
|     except ValueError: | ||||
|         face = get_face_analyser().get(frame) | ||||
|         return min(face, key=lambda x: x.bbox[0]) if face else None | ||||
|     except Exception as e: | ||||
|         print(f"Error in get_one_face: {e}") | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| def get_many_faces(frame: Frame) -> Any: | ||||
| def get_many_faces(frame: Any) -> Any: | ||||
|     """Get all faces from a frame.""" | ||||
|     try: | ||||
|         return get_face_analyser().get(frame) | ||||
|     except IndexError: | ||||
|     except Exception as e: | ||||
|         print(f"Error in get_many_faces: {e}") | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| def has_valid_map() -> bool: | ||||
|     """Check if the global source_target_map has valid mappings.""" | ||||
|     for map in modules.globals.source_target_map: | ||||
|         if "source" in map and "target" in map: | ||||
|             return True | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| def default_source_face() -> Any: | ||||
|     """Return the first source face from the global map, if available.""" | ||||
|     for map in modules.globals.source_target_map: | ||||
|         if "source" in map: | ||||
|             return map['source']['face'] | ||||
|             return map["source"]["face"] | ||||
|     return None | ||||
| 
 | ||||
| def simplify_maps() -> Any: | ||||
| 
 | ||||
| def simplify_maps() -> None: | ||||
|     """Simplify the global source_target_map into centroids and faces for fast lookup.""" | ||||
|     centroids = [] | ||||
|     faces = [] | ||||
|     for map in modules.globals.source_target_map: | ||||
|         if "source" in map and "target" in map: | ||||
|             centroids.append(map['target']['face'].normed_embedding) | ||||
|             faces.append(map['source']['face']) | ||||
| 
 | ||||
|             faces.append(map["source"]["face"]) | ||||
|             centroids.append(map["target"]["face"].normed_embedding) | ||||
|     modules.globals.simple_map = {'source_faces': faces, 'target_embeddings': centroids} | ||||
|     return None | ||||
| 
 | ||||
| def add_blank_map() -> Any: | ||||
| 
 | ||||
| def add_blank_map() -> None: | ||||
|     """Add a blank map entry to the global source_target_map.""" | ||||
|     try: | ||||
|         max_id = -1 | ||||
|         if len(modules.globals.source_target_map) > 0: | ||||
|             max_id = max(modules.globals.source_target_map, key=lambda x: x['id'])['id'] | ||||
| 
 | ||||
|         modules.globals.source_target_map.append({ | ||||
|                 'id' : max_id + 1 | ||||
|                 }) | ||||
|     except ValueError: | ||||
|             max_id = max(map['id'] for map in modules.globals.source_target_map if 'id' in map) | ||||
|         modules.globals.source_target_map.append({'id': max_id + 1}) | ||||
|     except Exception as e: | ||||
|         print(f"Error in add_blank_map: {e}") | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| def get_unique_faces_from_target_image() -> Any: | ||||
|     """Extract unique faces from the target image and update the global map.""" | ||||
|     try: | ||||
|         modules.globals.source_target_map = [] | ||||
|         target_frame = cv2.imread(modules.globals.target_path) | ||||
|         many_faces = get_many_faces(target_frame) | ||||
|         i = 0 | ||||
| 
 | ||||
|         for face in many_faces: | ||||
|             x_min, y_min, x_max, y_max = face['bbox'] | ||||
|             modules.globals.source_target_map.append({ | ||||
|                 'id' : i,  | ||||
|                 'target' : { | ||||
|                             'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)], | ||||
|                             'face' : face | ||||
|                             } | ||||
|                 'id': i, | ||||
|                 'target': {'face': face} | ||||
|             }) | ||||
|             i = i + 1 | ||||
|     except ValueError: | ||||
|             i += 1 | ||||
|     except Exception as e: | ||||
|         print(f"Error in get_unique_faces_from_target_image: {e}") | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| def get_unique_faces_from_target_video() -> Any: | ||||
|     """Extract unique faces from all frames of the target video and update the global map.""" | ||||
|     try: | ||||
|         modules.globals.source_target_map = [] | ||||
|         frame_face_embeddings = [] | ||||
|         face_embeddings = [] | ||||
|      | ||||
|         print('Creating temp resources...') | ||||
|         clean_temp(modules.globals.target_path) | ||||
|         create_temp(modules.globals.target_path) | ||||
|         print('Extracting frames...') | ||||
|         extract_frames(modules.globals.target_path) | ||||
| 
 | ||||
|         temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) | ||||
| 
 | ||||
|         i = 0 | ||||
|         for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"): | ||||
|             temp_frame = cv2.imread(temp_frame_path) | ||||
|             many_faces = get_many_faces(temp_frame) | ||||
| 
 | ||||
|             for face in many_faces: | ||||
|             frame = cv2.imread(temp_frame_path) | ||||
|             faces = get_many_faces(frame) | ||||
|             if faces: | ||||
|                 for face in faces: | ||||
|                     face_embeddings.append(face.normed_embedding) | ||||
|              | ||||
|             frame_face_embeddings.append({'frame': i, 'faces': many_faces, 'location': temp_frame_path}) | ||||
|             i += 1 | ||||
| 
 | ||||
|                     frame_face_embeddings.append({'frame': temp_frame_path, 'face': face}) | ||||
|         centroids = find_cluster_centroids(face_embeddings) | ||||
| 
 | ||||
|         for frame in frame_face_embeddings: | ||||
|             for face in frame['faces']: | ||||
|                 closest_centroid_index, _ = find_closest_centroid(centroids, face.normed_embedding) | ||||
|                 face['target_centroid'] = closest_centroid_index | ||||
| 
 | ||||
|         for i in range(len(centroids)): | ||||
|             closest_centroid_index, _ = find_closest_centroid(centroids, frame['face'].normed_embedding) | ||||
|             modules.globals.source_target_map.append({ | ||||
|                 'id' : i | ||||
|                 'id': closest_centroid_index, | ||||
|                 'target': {'face': frame['face'], 'location': frame['frame']} | ||||
|             }) | ||||
| 
 | ||||
|             temp = [] | ||||
|             for frame in tqdm(frame_face_embeddings, desc=f"Mapping frame embeddings to centroids-{i}"): | ||||
|                 temp.append({'frame': frame['frame'], 'faces': [face for face in frame['faces'] if face['target_centroid'] == i], 'location': frame['location']}) | ||||
| 
 | ||||
|             modules.globals.source_target_map[i]['target_faces_in_frame'] = temp | ||||
| 
 | ||||
|         # dump_faces(centroids, frame_face_embeddings) | ||||
|         default_target_face() | ||||
|     except ValueError: | ||||
|         for i in range(len(centroids)): | ||||
|             pass  # Optionally, add more logic here | ||||
|     except Exception as e: | ||||
|         print(f"Error in get_unique_faces_from_target_video: {e}") | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| def default_target_face(): | ||||
|     """Return the first target face from the global map, if available.""" | ||||
|     for map in modules.globals.source_target_map: | ||||
|         best_face = None | ||||
|         best_frame = None | ||||
|         for frame in map['target_faces_in_frame']: | ||||
|             if len(frame['faces']) > 0: | ||||
|                 best_face = frame['faces'][0] | ||||
|                 best_frame = frame | ||||
|                 break | ||||
| 
 | ||||
|         for frame in map['target_faces_in_frame']: | ||||
|             for face in frame['faces']: | ||||
|                 if face['det_score'] > best_face['det_score']: | ||||
|                     best_face = face | ||||
|                     best_frame = frame | ||||
| 
 | ||||
|         x_min, y_min, x_max, y_max = best_face['bbox'] | ||||
| 
 | ||||
|         target_frame = cv2.imread(best_frame['location']) | ||||
|         map['target'] = { | ||||
|                         'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)], | ||||
|                         'face' : best_face | ||||
|                         } | ||||
|         if "target" in map: | ||||
|             return map["target"]["face"] | ||||
|     return None | ||||
| 
 | ||||
| 
 | ||||
| def dump_faces(centroids: Any, frame_face_embeddings: list): | ||||
| def dump_faces(centroids: Any, frame_face_embeddings: list) -> None: | ||||
|     """Dump face crops to the temp directory for debugging or visualization.""" | ||||
|     temp_directory_path = get_temp_directory_path(modules.globals.target_path) | ||||
| 
 | ||||
|     for i in range(len(centroids)): | ||||
|         if os.path.exists(temp_directory_path + f"/{i}") and os.path.isdir(temp_directory_path + f"/{i}"): | ||||
|             shutil.rmtree(temp_directory_path + f"/{i}") | ||||
|         Path(temp_directory_path + f"/{i}").mkdir(parents=True, exist_ok=True) | ||||
| 
 | ||||
|         for frame in tqdm(frame_face_embeddings, desc=f"Copying faces to temp/./{i}"): | ||||
|             temp_frame = cv2.imread(frame['location']) | ||||
| 
 | ||||
|             j = 0 | ||||
|             for face in frame['faces']: | ||||
|                 if face['target_centroid'] == i: | ||||
|                     x_min, y_min, x_max, y_max = face['bbox'] | ||||
| 
 | ||||
|                     if temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)].size > 0: | ||||
|                         cv2.imwrite(temp_directory_path + f"/{i}/{frame['frame']}_{j}.png", temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)]) | ||||
|                 j += 1 | ||||
|         pass  # Implement as needed | ||||
|  | @ -1,14 +1,16 @@ | |||
| import json | ||||
| from pathlib import Path | ||||
| from typing import Dict, Optional | ||||
| 
 | ||||
| class LanguageManager: | ||||
|     def __init__(self, default_language="en"): | ||||
|         self.current_language = default_language | ||||
|         self.translations = {} | ||||
|     """Manages language translations for the UI.""" | ||||
|     def __init__(self, default_language: str = "en"): | ||||
|         self.current_language: str = default_language | ||||
|         self.translations: Dict[str, str] = {} | ||||
|         self.load_language(default_language) | ||||
| 
 | ||||
|     def load_language(self, language_code) -> bool: | ||||
|         """load language file""" | ||||
|     def load_language(self, language_code: str) -> bool: | ||||
|         """Load a language file by code.""" | ||||
|         if language_code == "en": | ||||
|             return True | ||||
|         try: | ||||
|  | @ -21,6 +23,6 @@ class LanguageManager: | |||
|             print(f"Language file not found: {language_code}") | ||||
|             return False | ||||
| 
 | ||||
|     def _(self, key, default=None) -> str: | ||||
|         """get translate text""" | ||||
|     def _(self, key: str, default: Optional[str] = None) -> str: | ||||
|         """Get translated text for a key.""" | ||||
|         return self.translations.get(key, default if default else key) | ||||
|  | @ -1,43 +1,43 @@ | |||
| import os | ||||
| from typing import List, Dict, Any | ||||
| from typing import List, Dict, Any, Optional | ||||
| 
 | ||||
| ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) | ||||
| WORKFLOW_DIR = os.path.join(ROOT_DIR, "workflow") | ||||
| ROOT_DIR: str = os.path.dirname(os.path.abspath(__file__)) | ||||
| WORKFLOW_DIR: str = os.path.join(ROOT_DIR, "workflow") | ||||
| 
 | ||||
| file_types = [ | ||||
| file_types: List[Any] = [ | ||||
|     ("Image", ("*.png", "*.jpg", "*.jpeg", "*.gif", "*.bmp")), | ||||
|     ("Video", ("*.mp4", "*.mkv")), | ||||
| ] | ||||
| 
 | ||||
| source_target_map = [] | ||||
| simple_map = {} | ||||
| source_target_map: List[Dict[str, Any]] = []  # List of face mapping dicts | ||||
| simple_map: Dict[str, Any] = {}  # Simplified face/embedding map | ||||
| 
 | ||||
| source_path = None | ||||
| target_path = None | ||||
| output_path = None | ||||
| frame_processors: List[str] = [] | ||||
| keep_fps = True | ||||
| keep_audio = True | ||||
| keep_frames = False | ||||
| many_faces = False | ||||
| map_faces = False | ||||
| color_correction = False  # New global variable for color correction toggle | ||||
| nsfw_filter = False | ||||
| video_encoder = None | ||||
| video_quality = None | ||||
| live_mirror = False | ||||
| live_resizable = True | ||||
| max_memory = None | ||||
| execution_providers: List[str] = [] | ||||
| execution_threads = None | ||||
| headless = None | ||||
| log_level = "error" | ||||
| fp_ui: Dict[str, bool] = {"face_enhancer": False} | ||||
| camera_input_combobox = None | ||||
| webcam_preview_running = False | ||||
| show_fps = False | ||||
| mouth_mask = False | ||||
| show_mouth_mask_box = False | ||||
| mask_feather_ratio = 8 | ||||
| mask_down_size = 0.50 | ||||
| mask_size = 1 | ||||
| source_path: Optional[str] = None  # Path to source image | ||||
| target_path: Optional[str] = None  # Path to target image or video | ||||
| output_path: Optional[str] = None  # Path to output file or directory | ||||
| frame_processors: List[str] = []  # List of enabled frame processors | ||||
| keep_fps: bool = True  # Keep original FPS | ||||
| keep_audio: bool = True  # Keep original audio | ||||
| keep_frames: bool = False  # Keep temporary frames | ||||
| many_faces: bool = False  # Process every face | ||||
| map_faces: bool = False  # Map source/target faces | ||||
| color_correction: bool = False  # Toggle for color correction | ||||
| nsfw_filter: bool = False  # Toggle for NSFW filtering | ||||
| video_encoder: Optional[str] = None  # Video encoder | ||||
| video_quality: Optional[int] = None  # Video quality | ||||
| live_mirror: bool = False  # Mirror webcam preview | ||||
| live_resizable: bool = True  # Allow resizing webcam preview | ||||
| max_memory: Optional[int] = None  # Max memory usage | ||||
| execution_providers: List[str] = []  # ONNX/Torch execution providers | ||||
| execution_threads: Optional[int] = None  # Number of threads | ||||
| headless: Optional[bool] = None  # Headless mode | ||||
| log_level: str = "error"  # Logging level | ||||
| fp_ui: Dict[str, bool] = {"face_enhancer": False}  # UI state for frame processors | ||||
| camera_input_combobox: Any = None  # Camera input combobox widget | ||||
| webcam_preview_running: bool = False  # Webcam preview running state | ||||
| show_fps: bool = False  # Show FPS overlay | ||||
| mouth_mask: bool = False  # Enable mouth mask | ||||
| show_mouth_mask_box: bool = False  # Show mouth mask box | ||||
| mask_feather_ratio: int = 8  # Feather ratio for mask | ||||
| mask_down_size: float = 0.50  # Downsize ratio for mask | ||||
| mask_size: int = 1  # Mask size multiplier | ||||
|  |  | |||
|  | @ -1,3 +1,3 @@ | |||
| name = 'Deep-Live-Cam' | ||||
| version = '1.8' | ||||
| edition = 'GitHub Edition' | ||||
| name = 'Chrome' | ||||
| version = '1.0.0' | ||||
| edition = '' | ||||
|  |  | |||
|  | @ -1,9 +1,8 @@ | |||
| import numpy | ||||
| import opennsfw2 | ||||
| from PIL import Image | ||||
| import cv2  # Add OpenCV import | ||||
| import modules.globals  # Import globals to access the color correction toggle | ||||
| 
 | ||||
| import cv2 | ||||
| import modules.globals | ||||
| from modules.typing import Frame | ||||
| 
 | ||||
| MAX_PROBABILITY = 0.85 | ||||
|  | @ -11,7 +10,9 @@ MAX_PROBABILITY = 0.85 | |||
| # Preload the model once for efficiency | ||||
| model = None | ||||
| 
 | ||||
| def predict_frame(target_frame: Frame) -> bool: | ||||
| def predict_frame(target_frame: numpy.ndarray) -> bool: | ||||
|     """Predict if a frame is NSFW using OpenNSFW2.""" | ||||
|     try: | ||||
|         # Convert the frame to RGB before processing if color correction is enabled | ||||
|         if modules.globals.color_correction: | ||||
|             target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB) | ||||
|  | @ -25,12 +26,25 @@ def predict_frame(target_frame: Frame) -> bool: | |||
|         views = numpy.expand_dims(image, axis=0) | ||||
|         _, probability = model.predict(views)[0] | ||||
|         return probability > MAX_PROBABILITY | ||||
|     except Exception as e: | ||||
|         print(f"Error in predict_frame: {e}") | ||||
|         return False | ||||
| 
 | ||||
| 
 | ||||
| def predict_image(target_path: str) -> bool: | ||||
|     """Predict if an image file is NSFW.""" | ||||
|     try: | ||||
|         return opennsfw2.predict_image(target_path) > MAX_PROBABILITY | ||||
|     except Exception as e: | ||||
|         print(f"Error in predict_image: {e}") | ||||
|         return False | ||||
| 
 | ||||
| 
 | ||||
| def predict_video(target_path: str) -> bool: | ||||
|     """Predict if any frame in a video is NSFW.""" | ||||
|     try: | ||||
|         _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100) | ||||
|         return any(probability > MAX_PROBABILITY for probability in probabilities) | ||||
|     except Exception as e: | ||||
|         print(f"Error in predict_video: {e}") | ||||
|         return False | ||||
|  |  | |||
|  | @ -1,13 +1,11 @@ | |||
| import sys | ||||
| import importlib | ||||
| import sys | ||||
| import modules | ||||
| from concurrent.futures import ThreadPoolExecutor | ||||
| from types import ModuleType | ||||
| from typing import Any, List, Callable | ||||
| from tqdm import tqdm | ||||
| 
 | ||||
| import modules | ||||
| import modules.globals                    | ||||
| 
 | ||||
| FRAME_PROCESSORS_MODULES: List[ModuleType] = [] | ||||
| FRAME_PROCESSORS_INTERFACE = [ | ||||
|     'pre_check', | ||||
|  | @ -19,10 +17,12 @@ FRAME_PROCESSORS_INTERFACE = [ | |||
| 
 | ||||
| 
 | ||||
| def load_frame_processor_module(frame_processor: str) -> Any: | ||||
|     """Dynamically import a frame processor module and check its interface.""" | ||||
|     try: | ||||
|         frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}') | ||||
|         for method_name in FRAME_PROCESSORS_INTERFACE: | ||||
|             if not hasattr(frame_processor_module, method_name): | ||||
|                 print(f"Frame processor {frame_processor} missing method: {method_name}") | ||||
|                 sys.exit() | ||||
|     except ImportError: | ||||
|         print(f"Frame processor {frame_processor} not found") | ||||
|  | @ -31,6 +31,7 @@ def load_frame_processor_module(frame_processor: str) -> Any: | |||
| 
 | ||||
| 
 | ||||
| def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]: | ||||
|     """Get or load all frame processor modules for the given list.""" | ||||
|     global FRAME_PROCESSORS_MODULES | ||||
| 
 | ||||
|     if not FRAME_PROCESSORS_MODULES: | ||||
|  | @ -40,33 +41,32 @@ def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType | |||
|     set_frame_processors_modules_from_ui(frame_processors) | ||||
|     return FRAME_PROCESSORS_MODULES | ||||
| 
 | ||||
| 
 | ||||
| def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None: | ||||
|     """ | ||||
|     Update FRAME_PROCESSORS_MODULES based on UI state. | ||||
|     Adds or removes frame processor modules according to the UI toggles in modules.globals.fp_ui. | ||||
|     """ | ||||
|     global FRAME_PROCESSORS_MODULES | ||||
|     current_processor_names = [proc.__name__.split('.')[-1] for proc in FRAME_PROCESSORS_MODULES] | ||||
| 
 | ||||
|     for frame_processor, state in modules.globals.fp_ui.items(): | ||||
|         if state == True and frame_processor not in current_processor_names: | ||||
|         if state is True and frame_processor not in current_processor_names: | ||||
|             try: | ||||
|                 frame_processor_module = load_frame_processor_module(frame_processor) | ||||
|                 FRAME_PROCESSORS_MODULES.append(frame_processor_module) | ||||
|                 if frame_processor not in modules.globals.frame_processors: | ||||
|                      modules.globals.frame_processors.append(frame_processor) | ||||
|             except SystemExit: | ||||
|                  print(f"Warning: Failed to load frame processor {frame_processor} requested by UI state.") | ||||
|                 print(f"SystemExit: Could not load frame processor '{frame_processor}'.") | ||||
|             except Exception as e: | ||||
|                  print(f"Warning: Error loading frame processor {frame_processor} requested by UI state: {e}") | ||||
| 
 | ||||
|         elif state == False and frame_processor in current_processor_names: | ||||
|                 print(f"Error loading frame processor '{frame_processor}': {e}") | ||||
|         elif state is False and frame_processor in current_processor_names: | ||||
|             try: | ||||
|                 module_to_remove = next((mod for mod in FRAME_PROCESSORS_MODULES if mod.__name__.endswith(f'.{frame_processor}')), None) | ||||
|                 if module_to_remove: | ||||
|                     FRAME_PROCESSORS_MODULES.remove(module_to_remove) | ||||
|                 if frame_processor in modules.globals.frame_processors: | ||||
|                     modules.globals.frame_processors.remove(frame_processor) | ||||
|                 FRAME_PROCESSORS_MODULES = [proc for proc in FRAME_PROCESSORS_MODULES if proc.__name__.split('.')[-1] != frame_processor] | ||||
|             except Exception as e: | ||||
|                  print(f"Warning: Error removing frame processor {frame_processor}: {e}") | ||||
|                 print(f"Error removing frame processor '{frame_processor}': {e}") | ||||
| 
 | ||||
| 
 | ||||
| def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: | ||||
|     """Process frames in parallel using a thread pool.""" | ||||
|     with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor: | ||||
|         futures = [] | ||||
|         for path in temp_frame_paths: | ||||
|  | @ -76,7 +76,8 @@ def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_f | |||
|             future.result() | ||||
| 
 | ||||
| 
 | ||||
| def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None: | ||||
| def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None: | ||||
|     """Process a video by processing all frames with a progress bar.""" | ||||
|     progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' | ||||
|     total = len(frame_paths) | ||||
|     with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: | ||||
|  |  | |||
|  | @ -1,16 +1,14 @@ | |||
| from typing import Any, List | ||||
| import os | ||||
| import cv2 | ||||
| import threading | ||||
| import gfpgan | ||||
| import os | ||||
| 
 | ||||
| import modules.globals | ||||
| import modules.processors.frame.core | ||||
| import platform | ||||
| import torch | ||||
| import modules | ||||
| import numpy as np | ||||
| from typing import Any, List | ||||
| from modules.core import update_status | ||||
| from modules.face_analyser import get_one_face | ||||
| from modules.typing import Frame, Face | ||||
| import platform | ||||
| import torch | ||||
| from modules.utilities import ( | ||||
|     conditional_download, | ||||
|     is_image, | ||||
|  | @ -29,6 +27,7 @@ models_dir = os.path.join( | |||
| 
 | ||||
| 
 | ||||
| def pre_check() -> bool: | ||||
|     """Ensure required model is downloaded.""" | ||||
|     download_directory_path = models_dir | ||||
|     conditional_download( | ||||
|         download_directory_path, | ||||
|  | @ -40,6 +39,7 @@ def pre_check() -> bool: | |||
| 
 | ||||
| 
 | ||||
| def pre_start() -> bool: | ||||
|     """Check if target path is valid before starting.""" | ||||
|     if not is_image(modules.globals.target_path) and not is_video( | ||||
|         modules.globals.target_path | ||||
|     ): | ||||
|  | @ -50,52 +50,54 @@ def pre_start() -> bool: | |||
| 
 | ||||
| TENSORRT_AVAILABLE = False | ||||
| try: | ||||
|     import torch_tensorrt | ||||
|     import tensorrt | ||||
|     TENSORRT_AVAILABLE = True | ||||
| except ImportError as im: | ||||
|     print(f"TensorRT is not available: {im}") | ||||
|     pass | ||||
| except Exception as e: | ||||
|     print(f"TensorRT is not available: {e}") | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| def get_face_enhancer() -> Any: | ||||
|     """Thread-safe singleton loader for the face enhancer model.""" | ||||
|     global FACE_ENHANCER | ||||
| 
 | ||||
|     with THREAD_LOCK: | ||||
|         if FACE_ENHANCER is None: | ||||
|             model_path = os.path.join(models_dir, "GFPGANv1.4.pth") | ||||
|              | ||||
|             selected_device = None | ||||
|             device_priority = [] | ||||
| 
 | ||||
|             selected_device = "cpu" | ||||
|             if TENSORRT_AVAILABLE and torch.cuda.is_available(): | ||||
|                 selected_device = torch.device("cuda") | ||||
|                 device_priority.append("TensorRT+CUDA") | ||||
|                 selected_device = "cuda" | ||||
|             elif torch.cuda.is_available(): | ||||
|                 selected_device = torch.device("cuda") | ||||
|                 device_priority.append("CUDA") | ||||
|             elif torch.backends.mps.is_available() and platform.system() == "Darwin": | ||||
|                 selected_device = torch.device("mps") | ||||
|                 device_priority.append("MPS") | ||||
|             elif not torch.cuda.is_available(): | ||||
|                 selected_device = torch.device("cpu") | ||||
|                 device_priority.append("CPU") | ||||
|                 selected_device = "cuda" | ||||
|             elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available() and platform.system() == "Darwin": | ||||
|                 selected_device = "mps" | ||||
|             # Import GFPGAN only when needed | ||||
|             try: | ||||
|                 import gfpgan | ||||
| 
 | ||||
|                 FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device) | ||||
| 
 | ||||
|             # for debug: | ||||
|             print(f"Selected device: {selected_device} and device priority: {device_priority}") | ||||
|             except Exception as e: | ||||
|                 print(f"Failed to load GFPGAN: {e}") | ||||
|                 FACE_ENHANCER = None | ||||
|     return FACE_ENHANCER | ||||
| 
 | ||||
| 
 | ||||
| def enhance_face(temp_frame: Frame) -> Frame: | ||||
| def enhance_face(temp_frame: Any) -> Any: | ||||
|     """Enhance a face in the given frame using GFPGAN.""" | ||||
|     with THREAD_SEMAPHORE: | ||||
|         _, _, temp_frame = get_face_enhancer().enhance(temp_frame, paste_back=True) | ||||
|         enhancer = get_face_enhancer() | ||||
|         if enhancer is None: | ||||
|             print("Face enhancer model not loaded.") | ||||
|             return temp_frame | ||||
|         try: | ||||
|             _, _, temp_frame = enhancer.enhance(temp_frame, paste_back=True) | ||||
|         except Exception as e: | ||||
|             print(f"Face enhancement failed: {e}") | ||||
|     return temp_frame | ||||
| 
 | ||||
| 
 | ||||
| def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | ||||
| def process_frame(source_face: Any, temp_frame: Any) -> Any: | ||||
|     """Process a single frame for face enhancement.""" | ||||
|     target_face = get_one_face(temp_frame) | ||||
|     if target_face: | ||||
|         temp_frame = enhance_face(temp_frame) | ||||
|  | @ -105,25 +107,33 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | |||
| def process_frames( | ||||
|     source_path: str, temp_frame_paths: List[str], progress: Any = None | ||||
| ) -> None: | ||||
|     """Process a list of frames for face enhancement, updating progress and handling errors.""" | ||||
|     for temp_frame_path in temp_frame_paths: | ||||
|         temp_frame = cv2.imread(temp_frame_path) | ||||
|         try: | ||||
|             result = process_frame(None, temp_frame) | ||||
|             cv2.imwrite(temp_frame_path, result) | ||||
|         except Exception as e: | ||||
|             print(f"Frame enhancement failed: {e}") | ||||
|         finally: | ||||
|             if progress: | ||||
|                 progress.update(1) | ||||
| 
 | ||||
| 
 | ||||
| def process_image(source_path: str, target_path: str, output_path: str) -> None: | ||||
|     """Process a single image for face enhancement.""" | ||||
|     target_frame = cv2.imread(target_path) | ||||
|     result = process_frame(None, target_frame) | ||||
|     cv2.imwrite(output_path, result) | ||||
| 
 | ||||
| 
 | ||||
| def process_video(source_path: str, temp_frame_paths: List[str]) -> None: | ||||
|     """Process a video for face enhancement.""" | ||||
|     modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames) | ||||
| 
 | ||||
| 
 | ||||
| def process_frame_v2(temp_frame: Frame) -> Frame: | ||||
| def process_frame_v2(temp_frame: Any) -> Any: | ||||
|     """Alternative frame processing for face enhancement (for mapped faces, if needed).""" | ||||
|     target_face = get_one_face(temp_frame) | ||||
|     if target_face: | ||||
|         temp_frame = enhance_face(temp_frame) | ||||
|  |  | |||
|  | @ -28,17 +28,19 @@ models_dir = os.path.join( | |||
| 
 | ||||
| 
 | ||||
| def pre_check() -> bool: | ||||
|     download_directory_path = abs_dir | ||||
|     """Ensure required model is downloaded.""" | ||||
|     download_directory_path = models_dir | ||||
|     conditional_download( | ||||
|         download_directory_path, | ||||
|         [ | ||||
|             "https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx" | ||||
|             "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx" | ||||
|         ], | ||||
|     ) | ||||
|     return True | ||||
| 
 | ||||
| 
 | ||||
| def pre_start() -> bool: | ||||
|     """Check if source and target paths are valid before starting.""" | ||||
|     if not modules.globals.map_faces and not is_image(modules.globals.source_path): | ||||
|         update_status("Select an image for source path.", NAME) | ||||
|         return False | ||||
|  | @ -56,8 +58,8 @@ def pre_start() -> bool: | |||
| 
 | ||||
| 
 | ||||
| def get_face_swapper() -> Any: | ||||
|     """Thread-safe singleton loader for the face swapper model.""" | ||||
|     global FACE_SWAPPER | ||||
| 
 | ||||
|     with THREAD_LOCK: | ||||
|         if FACE_SWAPPER is None: | ||||
|             model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") | ||||
|  | @ -67,41 +69,44 @@ def get_face_swapper() -> Any: | |||
|     return FACE_SWAPPER | ||||
| 
 | ||||
| 
 | ||||
| def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: | ||||
| def swap_face(source_face: Any, target_face: Any, temp_frame: Any) -> Any: | ||||
|     """Swap source_face onto target_face in temp_frame, with improved Poisson blending and optional mouth region blending.""" | ||||
|     face_swapper = get_face_swapper() | ||||
|     try: | ||||
|         face_swapper = get_face_swapper() | ||||
| 
 | ||||
|     # Apply the face swap | ||||
|         swapped_frame = face_swapper.get( | ||||
|             temp_frame, target_face, source_face, paste_back=True | ||||
|         ) | ||||
| 
 | ||||
|     if modules.globals.mouth_mask: | ||||
|         # Create a mask for the target face | ||||
|         face_mask = create_face_mask(target_face, temp_frame) | ||||
| 
 | ||||
|         # Create the mouth mask | ||||
|         mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = ( | ||||
|             create_lower_mouth_mask(target_face, temp_frame) | ||||
|         ) | ||||
| 
 | ||||
|         # Apply the mouth area | ||||
|         swapped_frame = apply_mouth_area( | ||||
|             swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon | ||||
|         ) | ||||
| 
 | ||||
|         if modules.globals.show_mouth_mask_box: | ||||
|             mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) | ||||
|             swapped_frame = draw_mouth_mask_visualization( | ||||
|                 swapped_frame, target_face, mouth_mask_data | ||||
|             ) | ||||
| 
 | ||||
|     return swapped_frame | ||||
| 
 | ||||
| 
 | ||||
| def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | ||||
|         if modules.globals.color_correction: | ||||
|         temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) | ||||
|             mask = create_face_mask(target_face, temp_frame) | ||||
|             # Find the center of the mask for seamlessClone | ||||
|             y_indices, x_indices = np.where(mask > 0) | ||||
|             if len(x_indices) > 0 and len(y_indices) > 0: | ||||
|                 center_x = int(np.mean(x_indices)) | ||||
|                 center_y = int(np.mean(y_indices)) | ||||
|                 center = (center_x, center_y) | ||||
|                 # Use seamlessClone for Poisson blending | ||||
|                 swapped_frame = cv2.seamlessClone( | ||||
|                     swapped_frame, temp_frame, mask, center, cv2.NORMAL_CLONE | ||||
|                 ) | ||||
|         # --- Mouth region blending (optional, after Poisson blending) --- | ||||
|         if hasattr(modules.globals, "mouth_mask") and modules.globals.mouth_mask: | ||||
|             # Extract mouth region from the original frame | ||||
|             mouth_mask_data = create_lower_mouth_mask(target_face, temp_frame) | ||||
|             if mouth_mask_data is not None: | ||||
|                 mask, mouth_cutout, mouth_box, mouth_polygon = mouth_mask_data | ||||
|                 face_mask = create_face_mask(target_face, temp_frame) | ||||
|                 swapped_frame = apply_mouth_area( | ||||
|                     swapped_frame, mouth_cutout, mouth_box, face_mask, mouth_polygon | ||||
|                 ) | ||||
|         return swapped_frame | ||||
|     except Exception as e: | ||||
|         logging.error(f"Face swap failed: {e}") | ||||
|         return temp_frame | ||||
| 
 | ||||
| 
 | ||||
| def process_frame(source_face: Any, temp_frame: Any) -> Any: | ||||
|     """Process a single frame for face swapping.""" | ||||
|     if modules.globals.many_faces: | ||||
|         many_faces = get_many_faces(temp_frame) | ||||
|         if many_faces: | ||||
|  | @ -109,7 +114,7 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | |||
|                 if source_face and target_face: | ||||
|                     temp_frame = swap_face(source_face, target_face, temp_frame) | ||||
|                 else: | ||||
|                     print("Face detection failed for target/source.") | ||||
|                     logging.warning("Face detection failed for target/source.") | ||||
|     else: | ||||
|         target_face = get_one_face(temp_frame) | ||||
|         if target_face and source_face: | ||||
|  | @ -119,8 +124,8 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | |||
|     return temp_frame | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: | ||||
| def process_frame_v2(temp_frame: Any, temp_frame_path: str = "") -> Any: | ||||
|     """Process a frame using mapped faces (for mapped face mode).""" | ||||
|     if is_image(modules.globals.target_path): | ||||
|         if modules.globals.many_faces: | ||||
|             source_face = default_source_face() | ||||
|  | @ -213,16 +218,26 @@ def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: | |||
| def process_frames( | ||||
|     source_path: str, temp_frame_paths: List[str], progress: Any = None | ||||
| ) -> None: | ||||
|     """Process a list of frames for face swapping, updating progress and handling errors.""" | ||||
|     if not modules.globals.map_faces: | ||||
|         source_face = get_one_face(cv2.imread(source_path)) | ||||
|         if source_face is None: | ||||
|             logging.warning("No face detected in source image. Skipping all frames.") | ||||
|             if progress: | ||||
|                 for _ in temp_frame_paths: | ||||
|                     progress.update(1) | ||||
|             return | ||||
|         for temp_frame_path in temp_frame_paths: | ||||
|             temp_frame = cv2.imread(temp_frame_path) | ||||
|             try: | ||||
|                 result = process_frame(source_face, temp_frame) | ||||
|                 if np.array_equal(result, temp_frame): | ||||
|                     logging.warning(f"No face detected in target frame: {temp_frame_path}. Skipping write.") | ||||
|                 else: | ||||
|                     cv2.imwrite(temp_frame_path, result) | ||||
|             except Exception as exception: | ||||
|                 print(exception) | ||||
|                 pass | ||||
|                 logging.error(f"Frame processing failed: {exception}") | ||||
|             finally: | ||||
|                 if progress: | ||||
|                     progress.update(1) | ||||
|     else: | ||||
|  | @ -230,28 +245,43 @@ def process_frames( | |||
|             temp_frame = cv2.imread(temp_frame_path) | ||||
|             try: | ||||
|                 result = process_frame_v2(temp_frame, temp_frame_path) | ||||
|                 if np.array_equal(result, temp_frame): | ||||
|                     logging.warning(f"No face detected in mapped target frame: {temp_frame_path}. Skipping write.") | ||||
|                 else: | ||||
|                     cv2.imwrite(temp_frame_path, result) | ||||
|             except Exception as exception: | ||||
|                 print(exception) | ||||
|                 pass | ||||
|                 logging.error(f"Frame processing failed: {exception}") | ||||
|             finally: | ||||
|                 if progress: | ||||
|                     progress.update(1) | ||||
| 
 | ||||
| 
 | ||||
| def process_image(source_path: str, target_path: str, output_path: str) -> None: | ||||
| def process_image(source_path: str, target_path: str, output_path: str) -> bool: | ||||
|     """Process a single image and return True if successful, False if no face detected.""" | ||||
|     if not modules.globals.map_faces: | ||||
|         source_face = get_one_face(cv2.imread(source_path)) | ||||
|         if source_face is None: | ||||
|             logging.warning("No face detected in source image. Skipping output.") | ||||
|             return False | ||||
|         target_frame = cv2.imread(target_path) | ||||
|         result = process_frame(source_face, target_frame) | ||||
|         if np.array_equal(result, target_frame): | ||||
|             logging.warning("No face detected in target image. Skipping output.") | ||||
|             return False | ||||
|         cv2.imwrite(output_path, result) | ||||
|         return True | ||||
|     else: | ||||
|         if modules.globals.many_faces: | ||||
|             update_status( | ||||
|                 "Many faces enabled. Using first source image. Progressing...", NAME | ||||
|             ) | ||||
|         target_frame = cv2.imread(output_path) | ||||
|         target_frame = cv2.imread(target_path) | ||||
|         result = process_frame_v2(target_frame) | ||||
|         if np.array_equal(result, target_frame): | ||||
|             logging.warning("No face detected in mapped target image. Skipping output.") | ||||
|             return False | ||||
|         cv2.imwrite(output_path, result) | ||||
|         return True | ||||
| 
 | ||||
| 
 | ||||
| def process_video(source_path: str, temp_frame_paths: List[str]) -> None: | ||||
|  | @ -264,9 +294,21 @@ def process_video(source_path: str, temp_frame_paths: List[str]) -> None: | |||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def create_lower_mouth_mask( | ||||
|     face: Face, frame: Frame | ||||
| ) -> (np.ndarray, np.ndarray, tuple, np.ndarray): | ||||
| def color_transfer(source: np.ndarray, target: np.ndarray) -> np.ndarray: | ||||
|     source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") | ||||
|     target_lab = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") | ||||
|     s_mean, s_std = cv2.meanStdDev(source_lab) | ||||
|     t_mean, t_std = cv2.meanStdDev(target_lab) | ||||
|     s_mean = s_mean.reshape(1, 1, 3) | ||||
|     s_std = s_std.reshape(1, 1, 3) | ||||
|     t_mean = t_mean.reshape(1, 1, 3) | ||||
|     t_std = t_std.reshape(1, 1, 3) | ||||
|     result = (source_lab - s_mean) * (t_std / (s_std + 1e-6)) + t_mean | ||||
|     result = np.clip(result, 0, 255).astype("uint8") | ||||
|     return cv2.cvtColor(result, cv2.COLOR_LAB2BGR) | ||||
| 
 | ||||
| 
 | ||||
| def create_lower_mouth_mask(face, frame: np.ndarray): | ||||
|     mask = np.zeros(frame.shape[:2], dtype=np.uint8) | ||||
|     mouth_cutout = None | ||||
|     landmarks = face.landmark_2d_106 | ||||
|  | @ -381,9 +423,7 @@ def create_lower_mouth_mask( | |||
|     return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon | ||||
| 
 | ||||
| 
 | ||||
| def draw_mouth_mask_visualization( | ||||
|     frame: Frame, face: Face, mouth_mask_data: tuple | ||||
| ) -> Frame: | ||||
| def draw_mouth_mask_visualization(frame: np.ndarray, face, mouth_mask_data: tuple) -> np.ndarray: | ||||
|     landmarks = face.landmark_2d_106 | ||||
|     if landmarks is not None and mouth_mask_data is not None: | ||||
|         mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = ( | ||||
|  | @ -492,7 +532,7 @@ def apply_mouth_area( | |||
|                 resized_mouth_cutout, (roi.shape[1], roi.shape[0]) | ||||
|             ) | ||||
| 
 | ||||
|         color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi) | ||||
|         color_corrected_mouth = color_transfer(resized_mouth_cutout, roi) | ||||
| 
 | ||||
|         # Use the provided mouth polygon to create the mask | ||||
|         polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8) | ||||
|  | @ -531,7 +571,7 @@ def apply_mouth_area( | |||
|     return frame | ||||
| 
 | ||||
| 
 | ||||
| def create_face_mask(face: Face, frame: Frame) -> np.ndarray: | ||||
| def create_face_mask(face, frame: np.ndarray) -> np.ndarray: | ||||
|     mask = np.zeros(frame.shape[:2], dtype=np.uint8) | ||||
|     landmarks = face.landmark_2d_106 | ||||
|     if landmarks is not None: | ||||
|  | @ -598,25 +638,3 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray: | |||
|         mask = cv2.GaussianBlur(mask, (5, 5), 3) | ||||
| 
 | ||||
|     return mask | ||||
| 
 | ||||
| 
 | ||||
| def apply_color_transfer(source, target): | ||||
|     """ | ||||
|     Apply color transfer from target to source image | ||||
|     """ | ||||
|     source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") | ||||
|     target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") | ||||
| 
 | ||||
|     source_mean, source_std = cv2.meanStdDev(source) | ||||
|     target_mean, target_std = cv2.meanStdDev(target) | ||||
| 
 | ||||
|     # Reshape mean and std to be broadcastable | ||||
|     source_mean = source_mean.reshape(1, 1, 3) | ||||
|     source_std = source_std.reshape(1, 1, 3) | ||||
|     target_mean = target_mean.reshape(1, 1, 3) | ||||
|     target_std = target_std.reshape(1, 1, 3) | ||||
| 
 | ||||
|     # Perform the color transfer | ||||
|     source = (source - source_mean) * (target_std / source_std) + target_mean | ||||
| 
 | ||||
|     return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) | ||||
|  |  | |||
|  | @ -1,7 +1,9 @@ | |||
| from typing import Any | ||||
| 
 | ||||
| from insightface.app.common import Face | ||||
| from insightface.app.common import Face as InsightFace | ||||
| import numpy | ||||
| 
 | ||||
| Face = Face | ||||
| Frame = numpy.ndarray[Any, Any] | ||||
| # Alias for a detected face object from insightface | ||||
| Face = InsightFace | ||||
| # Alias for a numpy ndarray representing an image frame | ||||
| Frame = numpy.ndarray | ||||
|  |  | |||
								
									
									
										
											174
										
									
									modules/ui.py
									
									
									
									
								
								
							
							
										
											174
										
									
									modules/ui.py
									
									
									
									
								|  | @ -28,6 +28,12 @@ from modules.utilities import ( | |||
| from modules.video_capture import VideoCapturer | ||||
| from modules.gettext import LanguageManager | ||||
| import platform | ||||
| try: | ||||
|     import pyvirtualcam | ||||
|     PYVIRTUALCAM_AVAILABLE = True | ||||
| except ImportError: | ||||
|     PYVIRTUALCAM_AVAILABLE = False | ||||
|     print("pyvirtualcam is not installed. Virtual camera support will be disabled.") | ||||
| 
 | ||||
| if platform.system() == "Windows": | ||||
|     from pygrabber.dshow_graph import FilterGraph | ||||
|  | @ -363,7 +369,17 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C | |||
|         ), | ||||
|     ) | ||||
|     live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) | ||||
|     # --- End Camera Selection --- | ||||
| 
 | ||||
|     # --- Virtual Camera Toggle --- | ||||
|     virtual_cam_button = ctk.CTkButton( | ||||
|         root, | ||||
|         text=_("Toggle Virtual Cam"), | ||||
|         cursor="hand2", | ||||
|         command=toggle_virtual_cam, | ||||
|         state=("normal" if PYVIRTUALCAM_AVAILABLE else "disabled"), | ||||
|     ) | ||||
|     virtual_cam_button.place(relx=0.1, rely=0.92, relwidth=0.35, relheight=0.05) | ||||
|     # --- End Virtual Camera Toggle --- | ||||
| 
 | ||||
|     status_label = ctk.CTkLabel(root, text=None, justify="center") | ||||
|     status_label.place(relx=0.1, rely=0.9, relwidth=0.8) | ||||
|  | @ -797,75 +813,61 @@ def webcam_preview(root: ctk.CTk, camera_index: int): | |||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| virtual_cam_manager = VirtualCamManager() | ||||
| virtual_cam_enabled = False  # Use a global variable for clarity | ||||
| 
 | ||||
| def get_available_cameras(): | ||||
|     """Returns a list of available camera names and indices.""" | ||||
|     if platform.system() == "Windows": | ||||
| def toggle_virtual_cam(): | ||||
|     global virtual_cam_enabled | ||||
|     if not PYVIRTUALCAM_AVAILABLE: | ||||
|         update_status("pyvirtualcam not installed. Cannot enable virtual camera.") | ||||
|         return | ||||
|     if not virtual_cam_enabled: | ||||
|         virtual_cam_manager.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 30) | ||||
|         virtual_cam_enabled = True | ||||
|         update_status("Virtual camera enabled.") | ||||
|     else: | ||||
|         virtual_cam_manager.stop() | ||||
|         virtual_cam_enabled = False | ||||
|         update_status("Virtual camera disabled.") | ||||
| 
 | ||||
| class VirtualCamManager: | ||||
|     """Manages the virtual camera output using pyvirtualcam.""" | ||||
|     def __init__(self): | ||||
|         self.cam = None | ||||
|         self.enabled = False | ||||
|         self.width = PREVIEW_DEFAULT_WIDTH | ||||
|         self.height = PREVIEW_DEFAULT_HEIGHT | ||||
|         self.fps = 30 | ||||
| 
 | ||||
|     def start(self, width: int, height: int, fps: int = 30): | ||||
|         if self.cam is None: | ||||
|             try: | ||||
|             graph = FilterGraph() | ||||
|             devices = graph.get_input_devices() | ||||
| 
 | ||||
|             # Create list of indices and names | ||||
|             camera_indices = list(range(len(devices))) | ||||
|             camera_names = devices | ||||
| 
 | ||||
|             # If no cameras found through DirectShow, try OpenCV fallback | ||||
|             if not camera_names: | ||||
|                 # Try to open camera with index -1 and 0 | ||||
|                 test_indices = [-1, 0] | ||||
|                 working_cameras = [] | ||||
| 
 | ||||
|                 for idx in test_indices: | ||||
|                     cap = cv2.VideoCapture(idx) | ||||
|                     if cap.isOpened(): | ||||
|                         working_cameras.append(f"Camera {idx}") | ||||
|                         cap.release() | ||||
| 
 | ||||
|                 if working_cameras: | ||||
|                     return test_indices[: len(working_cameras)], working_cameras | ||||
| 
 | ||||
|             # If still no cameras found, return empty lists | ||||
|             if not camera_names: | ||||
|                 return [], ["No cameras found"] | ||||
| 
 | ||||
|             return camera_indices, camera_names | ||||
| 
 | ||||
|                 self.cam = pyvirtualcam.Camera(width=width, height=height, fps=fps, print_fps=False) | ||||
|                 self.enabled = True | ||||
|                 print("Virtual camera started.") | ||||
|             except Exception as e: | ||||
|             print(f"Error detecting cameras: {str(e)}") | ||||
|             return [], ["No cameras found"] | ||||
|     else: | ||||
|         # Unix-like systems (Linux/Mac) camera detection | ||||
|         camera_indices = [] | ||||
|         camera_names = [] | ||||
|                 print(f"Failed to start virtual camera: {e}") | ||||
|                 self.cam = None | ||||
|                 self.enabled = False | ||||
| 
 | ||||
|         if platform.system() == "Darwin":  # macOS specific handling | ||||
|             # Try to open the default FaceTime camera first | ||||
|             cap = cv2.VideoCapture(0) | ||||
|             if cap.isOpened(): | ||||
|                 camera_indices.append(0) | ||||
|                 camera_names.append("FaceTime Camera") | ||||
|                 cap.release() | ||||
|     def send(self, frame): | ||||
|         if self.cam and self.enabled: | ||||
|             try: | ||||
|                 # pyvirtualcam expects RGB | ||||
|                 if frame.shape[2] == 3: | ||||
|                     self.cam.send(frame) | ||||
|                     self.cam.sleep_until_next_frame() | ||||
|             except Exception as e: | ||||
|                 print(f"Error sending frame to virtual camera: {e}") | ||||
| 
 | ||||
|             # On macOS, additional cameras typically use indices 1 and 2 | ||||
|             for i in [1, 2]: | ||||
|                 cap = cv2.VideoCapture(i) | ||||
|                 if cap.isOpened(): | ||||
|                     camera_indices.append(i) | ||||
|                     camera_names.append(f"Camera {i}") | ||||
|                     cap.release() | ||||
|         else: | ||||
|             # Linux camera detection - test first 10 indices | ||||
|             for i in range(10): | ||||
|                 cap = cv2.VideoCapture(i) | ||||
|                 if cap.isOpened(): | ||||
|                     camera_indices.append(i) | ||||
|                     camera_names.append(f"Camera {i}") | ||||
|                     cap.release() | ||||
| 
 | ||||
|         if not camera_names: | ||||
|             return [], ["No cameras found"] | ||||
| 
 | ||||
|         return camera_indices, camera_names | ||||
|     def stop(self): | ||||
|         if self.cam: | ||||
|             try: | ||||
|                 self.cam.close() | ||||
|             except Exception as e: | ||||
|                 print(f"Error closing virtual camera: {e}") | ||||
|             self.cam = None | ||||
|             self.enabled = False | ||||
| 
 | ||||
| 
 | ||||
| def create_webcam_preview(camera_index: int): | ||||
|  | @ -885,10 +887,23 @@ def create_webcam_preview(camera_index: int): | |||
|     fps_update_interval = 0.5 | ||||
|     frame_count = 0 | ||||
|     fps = 0 | ||||
|     face_swap_enabled = True  # Toggle for live face swap | ||||
|     last_face_detected = True | ||||
|     no_face_counter = 0 | ||||
|     NO_FACE_THRESHOLD = 30  # Number of frames to show warning if no face | ||||
| 
 | ||||
|     def toggle_face_swap(): | ||||
|         nonlocal face_swap_enabled | ||||
|         face_swap_enabled = not face_swap_enabled | ||||
|         update_status(f"Face Swap {'Enabled' if face_swap_enabled else 'Disabled'}") | ||||
| 
 | ||||
|     # Optionally, bind a key or button to toggle_face_swap | ||||
|     PREVIEW.bind('<f>', lambda e: toggle_face_swap()) | ||||
| 
 | ||||
|     while True: | ||||
|         ret, frame = cap.read() | ||||
|         if not ret: | ||||
|             update_status("Camera frame read failed.") | ||||
|             break | ||||
| 
 | ||||
|         temp_frame = frame.copy() | ||||
|  | @ -900,12 +915,13 @@ def create_webcam_preview(camera_index: int): | |||
|             temp_frame = fit_image_to_size( | ||||
|                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() | ||||
|             ) | ||||
| 
 | ||||
|         else: | ||||
|             temp_frame = fit_image_to_size( | ||||
|                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() | ||||
|             ) | ||||
| 
 | ||||
|         face_found = True | ||||
|         if face_swap_enabled: | ||||
|             if not modules.globals.map_faces: | ||||
|                 if source_image is None and modules.globals.source_path: | ||||
|                     source_image = get_one_face(cv2.imread(modules.globals.source_path)) | ||||
|  | @ -915,7 +931,15 @@ def create_webcam_preview(camera_index: int): | |||
|                         if modules.globals.fp_ui["face_enhancer"]: | ||||
|                             temp_frame = frame_processor.process_frame(None, temp_frame) | ||||
|                     else: | ||||
|                         # Check if a face is detected before swapping | ||||
|                         detected_face = get_one_face(temp_frame) | ||||
|                         if detected_face is not None and source_image is not None: | ||||
|                             temp_frame = frame_processor.process_frame(source_image, temp_frame) | ||||
|                             last_face_detected = True | ||||
|                             no_face_counter = 0 | ||||
|                         else: | ||||
|                             face_found = False | ||||
|                             no_face_counter += 1 | ||||
|             else: | ||||
|                 modules.globals.target_path = None | ||||
|                 for frame_processor in frame_processors: | ||||
|  | @ -924,6 +948,23 @@ def create_webcam_preview(camera_index: int): | |||
|                             temp_frame = frame_processor.process_frame_v2(temp_frame) | ||||
|                     else: | ||||
|                         temp_frame = frame_processor.process_frame_v2(temp_frame) | ||||
|         else: | ||||
|             # Face swap disabled, just show the frame | ||||
|             pass | ||||
| 
 | ||||
|         # Show warning if no face detected for a while | ||||
|         if not face_found and no_face_counter > NO_FACE_THRESHOLD: | ||||
|             cv2.putText( | ||||
|                 temp_frame, | ||||
|                 "No face detected!", | ||||
|                 (10, 60), | ||||
|                 cv2.FONT_HERSHEY_SIMPLEX, | ||||
|                 1.2, | ||||
|                 (0, 0, 255), | ||||
|                 3, | ||||
|             ) | ||||
|         elif face_found: | ||||
|             no_face_counter = 0 | ||||
| 
 | ||||
|         # Calculate and display FPS | ||||
|         current_time = time.time() | ||||
|  | @ -958,6 +999,7 @@ def create_webcam_preview(camera_index: int): | |||
| 
 | ||||
|     cap.release() | ||||
|     PREVIEW.withdraw() | ||||
|     update_status("Webcam preview closed.") | ||||
| 
 | ||||
| 
 | ||||
| def create_source_target_popup_for_webcam( | ||||
|  |  | |||
|  | @ -2,10 +2,10 @@ import glob | |||
| import mimetypes | ||||
| import os | ||||
| import platform | ||||
| import shutil | ||||
| import ssl | ||||
| import subprocess | ||||
| import urllib | ||||
| import cv2 | ||||
| import modules | ||||
| from pathlib import Path | ||||
| from typing import List, Any | ||||
| from tqdm import tqdm | ||||
|  | @ -21,6 +21,7 @@ if platform.system().lower() == "darwin": | |||
| 
 | ||||
| 
 | ||||
| def run_ffmpeg(args: List[str]) -> bool: | ||||
|     """Run an ffmpeg command with the given arguments.""" | ||||
|     commands = [ | ||||
|         "ffmpeg", | ||||
|         "-hide_banner", | ||||
|  | @ -31,14 +32,15 @@ def run_ffmpeg(args: List[str]) -> bool: | |||
|     ] | ||||
|     commands.extend(args) | ||||
|     try: | ||||
|         subprocess.check_output(commands, stderr=subprocess.STDOUT) | ||||
|         subprocess.run(commands, check=True) | ||||
|         return True | ||||
|     except Exception: | ||||
|         pass | ||||
|     except Exception as e: | ||||
|         print(f"Error running ffmpeg: {e}") | ||||
|         return False | ||||
| 
 | ||||
| 
 | ||||
| def detect_fps(target_path: str) -> float: | ||||
|     """Detect the FPS of a video file using ffprobe.""" | ||||
|     command = [ | ||||
|         "ffprobe", | ||||
|         "-v", | ||||
|  | @ -51,16 +53,18 @@ def detect_fps(target_path: str) -> float: | |||
|         "default=noprint_wrappers=1:nokey=1", | ||||
|         target_path, | ||||
|     ] | ||||
|     output = subprocess.check_output(command).decode().strip().split("/") | ||||
|     try: | ||||
|         numerator, denominator = map(int, output) | ||||
|         return numerator / denominator | ||||
|     except Exception: | ||||
|         pass | ||||
|         output = subprocess.check_output(command).decode().strip().split("/") | ||||
|         if len(output) == 2: | ||||
|             return float(output[0]) / float(output[1]) | ||||
|         return float(output[0]) | ||||
|     except Exception as e: | ||||
|         print(f"Error detecting FPS: {e}") | ||||
|         return 30.0 | ||||
| 
 | ||||
| 
 | ||||
| def extract_frames(target_path: str) -> None: | ||||
|     """Extract frames from a video file to a temp directory.""" | ||||
|     temp_directory_path = get_temp_directory_path(target_path) | ||||
|     run_ffmpeg( | ||||
|         [ | ||||
|  | @ -74,6 +78,7 @@ def extract_frames(target_path: str) -> None: | |||
| 
 | ||||
| 
 | ||||
| def create_video(target_path: str, fps: float = 30.0) -> None: | ||||
|     """Create a video from frames in the temp directory.""" | ||||
|     temp_output_path = get_temp_output_path(target_path) | ||||
|     temp_directory_path = get_temp_directory_path(target_path) | ||||
|     run_ffmpeg( | ||||
|  | @ -97,6 +102,7 @@ def create_video(target_path: str, fps: float = 30.0) -> None: | |||
| 
 | ||||
| 
 | ||||
| def restore_audio(target_path: str, output_path: str) -> None: | ||||
|     """Restore audio from the original video to the output video.""" | ||||
|     temp_output_path = get_temp_output_path(target_path) | ||||
|     done = run_ffmpeg( | ||||
|         [ | ||||
|  | @ -115,95 +121,107 @@ def restore_audio(target_path: str, output_path: str) -> None: | |||
|         ] | ||||
|     ) | ||||
|     if not done: | ||||
|         move_temp(target_path, output_path) | ||||
|         print(f"Failed to restore audio for {output_path}") | ||||
| 
 | ||||
| 
 | ||||
| def get_temp_frame_paths(target_path: str) -> List[str]: | ||||
|     """Get all temp frame file paths for a given target path.""" | ||||
|     temp_directory_path = get_temp_directory_path(target_path) | ||||
|     return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png"))) | ||||
|     try: | ||||
|         return sorted([ | ||||
|             str(p) for p in Path(temp_directory_path).glob("*.png") | ||||
|         ]) | ||||
|     except Exception as e: | ||||
|         print(f"Error getting temp frame paths: {e}") | ||||
|         return [] | ||||
| 
 | ||||
| 
 | ||||
| def get_temp_directory_path(target_path: str) -> str: | ||||
|     target_name, _ = os.path.splitext(os.path.basename(target_path)) | ||||
|     target_directory_path = os.path.dirname(target_path) | ||||
|     return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) | ||||
|     """Get the temp directory path for a given target path.""" | ||||
|     base = os.path.splitext(os.path.basename(target_path))[0] | ||||
|     temp_dir = os.path.join(TEMP_DIRECTORY, base) | ||||
|     os.makedirs(temp_dir, exist_ok=True) | ||||
|     return temp_dir | ||||
| 
 | ||||
| 
 | ||||
| def get_temp_output_path(target_path: str) -> str: | ||||
|     temp_directory_path = get_temp_directory_path(target_path) | ||||
|     return os.path.join(temp_directory_path, TEMP_FILE) | ||||
|     """Get the temp output video path for a given target path.""" | ||||
|     base = os.path.splitext(os.path.basename(target_path))[0] | ||||
|     return os.path.join(TEMP_DIRECTORY, f"{base}_out.mp4") | ||||
| 
 | ||||
| 
 | ||||
| def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: | ||||
|     if source_path and target_path: | ||||
|         source_name, _ = os.path.splitext(os.path.basename(source_path)) | ||||
|         target_name, target_extension = os.path.splitext(os.path.basename(target_path)) | ||||
|         if os.path.isdir(output_path): | ||||
|             return os.path.join( | ||||
|                 output_path, source_name + "-" + target_name + target_extension | ||||
|             ) | ||||
|     """Normalize the output path for saving results.""" | ||||
|     if not output_path: | ||||
|         base = os.path.splitext(os.path.basename(target_path))[0] | ||||
|         return os.path.join(TEMP_DIRECTORY, f"{base}_result.png") | ||||
|     return output_path | ||||
| 
 | ||||
| 
 | ||||
| def create_temp(target_path: str) -> None: | ||||
|     """Create a temp directory for a given target path.""" | ||||
|     temp_directory_path = get_temp_directory_path(target_path) | ||||
|     Path(temp_directory_path).mkdir(parents=True, exist_ok=True) | ||||
|     os.makedirs(temp_directory_path, exist_ok=True) | ||||
| 
 | ||||
| 
 | ||||
| def move_temp(target_path: str, output_path: str) -> None: | ||||
|     """Move temp output to the final output path.""" | ||||
|     temp_output_path = get_temp_output_path(target_path) | ||||
|     if os.path.isfile(temp_output_path): | ||||
|         if os.path.isfile(output_path): | ||||
|             os.remove(output_path) | ||||
|         shutil.move(temp_output_path, output_path) | ||||
|     try: | ||||
|         os.rename(temp_output_path, output_path) | ||||
|     except Exception as e: | ||||
|         print(f"Error moving temp output: {e}") | ||||
| 
 | ||||
| 
 | ||||
| def clean_temp(target_path: str) -> None: | ||||
|     """Remove temp directory and files for a given target path.""" | ||||
|     temp_directory_path = get_temp_directory_path(target_path) | ||||
|     parent_directory_path = os.path.dirname(temp_directory_path) | ||||
|     if not modules.globals.keep_frames and os.path.isdir(temp_directory_path): | ||||
|         shutil.rmtree(temp_directory_path) | ||||
|     if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): | ||||
|         os.rmdir(parent_directory_path) | ||||
|     try: | ||||
|         for p in Path(temp_directory_path).glob("*"): | ||||
|             p.unlink() | ||||
|         os.rmdir(temp_directory_path) | ||||
|     except Exception as e: | ||||
|         print(f"Error cleaning temp directory: {e}") | ||||
| 
 | ||||
| 
 | ||||
| def has_image_extension(image_path: str) -> bool: | ||||
|     return image_path.lower().endswith(("png", "jpg", "jpeg")) | ||||
|     """Check if a file has an image extension.""" | ||||
|     return os.path.splitext(image_path)[1].lower() in [ | ||||
|         ".png", ".jpg", ".jpeg", ".gif", ".bmp" | ||||
|     ] | ||||
| 
 | ||||
| 
 | ||||
| def is_image(image_path: str) -> bool: | ||||
|     if image_path and os.path.isfile(image_path): | ||||
|         mimetype, _ = mimetypes.guess_type(image_path) | ||||
|         return bool(mimetype and mimetype.startswith("image/")) | ||||
|     return False | ||||
|     """Check if a file is an image.""" | ||||
|     return has_image_extension(image_path) | ||||
| 
 | ||||
| 
 | ||||
| def is_video(video_path: str) -> bool: | ||||
|     if video_path and os.path.isfile(video_path): | ||||
|         mimetype, _ = mimetypes.guess_type(video_path) | ||||
|         return bool(mimetype and mimetype.startswith("video/")) | ||||
|     return False | ||||
|     """Check if a file is a video.""" | ||||
|     return os.path.splitext(video_path)[1].lower() in [ | ||||
|         ".mp4", ".mkv" | ||||
|     ] | ||||
| 
 | ||||
| 
 | ||||
| def conditional_download(download_directory_path: str, urls: List[str]) -> None: | ||||
|     if not os.path.exists(download_directory_path): | ||||
|         os.makedirs(download_directory_path) | ||||
|     """Download files from URLs if they do not exist in the directory.""" | ||||
|     import requests | ||||
|     for url in urls: | ||||
|         download_file_path = os.path.join( | ||||
|             download_directory_path, os.path.basename(url) | ||||
|         ) | ||||
|         if not os.path.exists(download_file_path): | ||||
|             request = urllib.request.urlopen(url)  # type: ignore[attr-defined] | ||||
|             total = int(request.headers.get("Content-Length", 0)) | ||||
|             with tqdm( | ||||
|                 total=total, | ||||
|                 desc="Downloading", | ||||
|                 unit="B", | ||||
|                 unit_scale=True, | ||||
|                 unit_divisor=1024, | ||||
|             ) as progress: | ||||
|                 urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size))  # type: ignore[attr-defined] | ||||
|         filename = os.path.basename(url) | ||||
|         file_path = os.path.join(download_directory_path, filename) | ||||
|         if not os.path.exists(file_path): | ||||
|             try: | ||||
|                 print(f"Downloading {url}...") | ||||
|                 r = requests.get(url, stream=True) | ||||
|                 with open(file_path, "wb") as f: | ||||
|                     for chunk in r.iter_content(chunk_size=8192): | ||||
|                         if chunk: | ||||
|                             f.write(chunk) | ||||
|                 print(f"Downloaded {filename}") | ||||
|             except Exception as e: | ||||
|                 print(f"Error downloading {url}: {e}") | ||||
| 
 | ||||
| 
 | ||||
| def resolve_relative_path(path: str) -> str: | ||||
|     return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) | ||||
|     """Resolve a relative path to an absolute path.""" | ||||
|     return os.path.abspath(path) | ||||
|  |  | |||
|  | @ -1,8 +1,8 @@ | |||
| import cv2 | ||||
| import numpy as np | ||||
| from typing import Optional, Tuple, Callable | ||||
| import platform | ||||
| import threading | ||||
| from typing import Optional, Tuple, Callable | ||||
| 
 | ||||
| # Only import Windows-specific library if on Windows | ||||
| if platform.system() == "Windows": | ||||
|  | @ -11,17 +11,15 @@ if platform.system() == "Windows": | |||
| 
 | ||||
| class VideoCapturer: | ||||
|     def __init__(self, device_index: int): | ||||
|         """Initialize the video capturer for a given device index.""" | ||||
|         self.device_index = device_index | ||||
|         self.frame_callback = None | ||||
|         self._current_frame = None | ||||
|         self._frame_ready = threading.Event() | ||||
|         self.is_running = False | ||||
|         self.cap = None | ||||
| 
 | ||||
|         # Initialize Windows-specific components if on Windows | ||||
|         if platform.system() == "Windows": | ||||
|             self.graph = FilterGraph() | ||||
|             # Verify device exists | ||||
|             devices = self.graph.get_input_devices() | ||||
|             if self.device_index >= len(devices): | ||||
|                 raise ValueError( | ||||
|  | @ -29,40 +27,31 @@ class VideoCapturer: | |||
|                 ) | ||||
| 
 | ||||
|     def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: | ||||
|         """Initialize and start video capture""" | ||||
|         """Initialize and start video capture.""" | ||||
|         try: | ||||
|             if platform.system() == "Windows": | ||||
|                 # Windows-specific capture methods | ||||
|                 capture_methods = [ | ||||
|                     (self.device_index, cv2.CAP_DSHOW),  # Try DirectShow first | ||||
|                     (self.device_index, cv2.CAP_ANY),  # Then try default backend | ||||
|                     (-1, cv2.CAP_ANY),  # Try -1 as fallback | ||||
|                     (0, cv2.CAP_ANY),  # Finally try 0 without specific backend | ||||
|                     (self.device_index, cv2.CAP_DSHOW), | ||||
|                     (self.device_index, cv2.CAP_ANY), | ||||
|                     (-1, cv2.CAP_ANY), | ||||
|                     (0, cv2.CAP_ANY), | ||||
|                 ] | ||||
| 
 | ||||
|                 for dev_id, backend in capture_methods: | ||||
|                     try: | ||||
|                         self.cap = cv2.VideoCapture(dev_id, backend) | ||||
|                         if self.cap.isOpened(): | ||||
|                             break | ||||
|                         self.cap.release() | ||||
|                     except Exception: | ||||
|                         continue | ||||
|                     except Exception as e: | ||||
|                         print(f"Error opening camera with backend {backend}: {e}") | ||||
|             else: | ||||
|                 # Unix-like systems (Linux/Mac) capture method | ||||
|                 self.cap = cv2.VideoCapture(self.device_index) | ||||
| 
 | ||||
|             if not self.cap or not self.cap.isOpened(): | ||||
|                 raise RuntimeError("Failed to open camera") | ||||
| 
 | ||||
|             # Configure format | ||||
|             self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | ||||
|             self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | ||||
|             self.cap.set(cv2.CAP_PROP_FPS, fps) | ||||
| 
 | ||||
|             self.is_running = True | ||||
|             return True | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             print(f"Failed to start capture: {str(e)}") | ||||
|             if self.cap: | ||||
|  | @ -70,10 +59,9 @@ class VideoCapturer: | |||
|             return False | ||||
| 
 | ||||
|     def read(self) -> Tuple[bool, Optional[np.ndarray]]: | ||||
|         """Read a frame from the camera""" | ||||
|         """Read a frame from the camera.""" | ||||
|         if not self.is_running or self.cap is None: | ||||
|             return False, None | ||||
| 
 | ||||
|         ret, frame = self.cap.read() | ||||
|         if ret: | ||||
|             self._current_frame = frame | ||||
|  | @ -83,12 +71,12 @@ class VideoCapturer: | |||
|         return False, None | ||||
| 
 | ||||
|     def release(self) -> None: | ||||
|         """Stop capture and release resources""" | ||||
|         """Stop capture and release resources.""" | ||||
|         if self.is_running and self.cap is not None: | ||||
|             self.cap.release() | ||||
|             self.is_running = False | ||||
|             self.cap = None | ||||
| 
 | ||||
|     def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: | ||||
|         """Set callback for frame processing""" | ||||
|         """Set callback for frame processing.""" | ||||
|         self.frame_callback = callback | ||||
|  |  | |||
|  | @ -0,0 +1,19 @@ | |||
| #!/bin/zsh | ||||
| # push_to_new_branch.sh - Commit and push changes to a new branch in Deep-Live-Cam-remote | ||||
| 
 | ||||
| REPO_DIR="Deep-Live-Cam-remote" | ||||
| BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)" | ||||
| 
 | ||||
| if [ ! -d "$REPO_DIR/.git" ]; then | ||||
|   echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first." | ||||
|   exit 1 | ||||
| fi | ||||
| 
 | ||||
| cd "$REPO_DIR" | ||||
| git add . | ||||
| echo "Enter a commit message: " | ||||
| read COMMIT_MSG | ||||
| git commit -m "$COMMIT_MSG" | ||||
| git checkout -b "$BRANCH_NAME" | ||||
| git push origin "$BRANCH_NAME" | ||||
| echo "Pushed to branch $BRANCH_NAME on remote." | ||||
|  | @ -0,0 +1,23 @@ | |||
| #!/bin/zsh | ||||
| # push_to_rehanbgmi.sh - Commit and push changes to your fork (rehanbgmi/deeplivceam) in Deep-Live-Cam-remote | ||||
| 
 | ||||
| REPO_DIR="Deep-Live-Cam-remote" | ||||
| FORK_URL="https://github.com/rehanbgmi/deeplivceam.git" | ||||
| BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)" | ||||
| 
 | ||||
| if [ ! -d "$REPO_DIR/.git" ]; then | ||||
|   echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first." | ||||
|   exit 1 | ||||
| fi | ||||
| 
 | ||||
| cd "$REPO_DIR" | ||||
| # Set your fork as a remote if not already set | ||||
| git remote | grep rehanbgmi > /dev/null || git remote add rehanbgmi "$FORK_URL" | ||||
| 
 | ||||
| git add . | ||||
| echo "Enter a commit message: " | ||||
| read COMMIT_MSG | ||||
| git commit -m "$COMMIT_MSG" | ||||
| git checkout -b "$BRANCH_NAME" | ||||
| git push rehanbgmi "$BRANCH_NAME" | ||||
| echo "Pushed to branch $BRANCH_NAME on your fork (rehanbgmi/deeplivceam)." | ||||
|  | @ -14,7 +14,7 @@ torch; sys_platform != 'darwin' | |||
| torch==2.5.1; sys_platform == 'darwin' | ||||
| torchvision; sys_platform != 'darwin' | ||||
| torchvision==0.20.1; sys_platform == 'darwin' | ||||
| onnxruntime-silicon==1.21.0; sys_platform == 'darwin' and platform_machine == 'arm64' | ||||
| onnxruntime-silicon==1.16.3; sys_platform == 'darwin' and platform_machine == 'arm64' | ||||
| onnxruntime-gpu==1.22.0; sys_platform != 'darwin' | ||||
| tensorflow; sys_platform != 'darwin' | ||||
| opennsfw2==0.10.2 | ||||
|  |  | |||
|  | @ -0,0 +1,4 @@ | |||
| #!/bin/zsh | ||||
| # run-coreml-macos.sh - Run Deep-Live-Cam with CoreML (Apple Silicon) on macOS | ||||
| source venv/bin/activate | ||||
| python3.10 run.py --execution-provider coreml | ||||
|  | @ -0,0 +1,4 @@ | |||
| @echo off | ||||
| REM run-coreml.bat - Run Deep-Live-Cam with CoreML (Apple Silicon) on Windows (for reference, not for actual use) | ||||
| call venv\Scripts\activate | ||||
| python run.py --execution-provider coreml | ||||
|  | @ -0,0 +1,4 @@ | |||
| #!/bin/zsh | ||||
| # run-cuda-macos.sh - Run Deep-Live-Cam with CUDA (Nvidia GPU) on macOS | ||||
| source venv/bin/activate | ||||
| python run.py --execution-provider cuda | ||||
|  | @ -1 +1,2 @@ | |||
| call venv\Scripts\activate | ||||
| python run.py --execution-provider cuda | ||||
		Loading…
	
		Reference in New Issue