Compare commits

...

5 Commits

Author SHA1 Message Date
rehanbgmi fd5537da72
Merge 6791f58761 into fc86365a90 2025-07-03 13:59:58 -04:00
Kenneth Estanislao fc86365a90 Delete .yml 2025-07-02 18:37:10 +08:00
Kenneth Estanislao 1dd0e8e509
Create .yml 2025-07-02 18:29:32 +08:00
Kenneth Estanislao 4e0ff540f0
Update requirements.txt
faster and better requirements
2025-07-02 04:08:26 +08:00
rehanbgmi 6791f58761 Add macOS/Windows setup scripts and update modules for enhanced functionality
- Add clone_or_update scripts for cross-platform repo management
- Introduce exclude.txt to prevent syncing .git, models, and binary files
- Add install and run scripts for macOS/Windows environments
- Improve error handling and add docstrings in utilities.py
- Enhance robustness in video processing functions
- Update core modules (face_analyser, globals, ui, etc.) for consistency

The changes implement cross-platform setup automation while improving code quality through better error handling, documentation, and synchronization control. Key modules and scripts were updated to ensure stable execution across different operating systems.
2025-05-31 00:16:52 +05:30
26 changed files with 760 additions and 510 deletions

View File

@ -0,0 +1,20 @@
@echo off
REM clone_or_update_deep_live_cam.bat - Clone or update Deep-Live-Cam repo in a separate folder and sync to local working folder
SET REPO_URL=https://github.com/hacksider/Deep-Live-Cam.git
SET TARGET_DIR=Deep-Live-Cam-remote
SET LOCAL_DIR=Deep-Live-Cam
IF EXIST %TARGET_DIR% (
echo Updating existing repo in %TARGET_DIR% ...
cd %TARGET_DIR%
git pull
cd ..
) ELSE (
echo Cloning repo to %TARGET_DIR% ...
git clone %REPO_URL% %TARGET_DIR%
)
REM Sync updated code to local working folder (excluding .git and models)
xcopy %TARGET_DIR% %LOCAL_DIR% /E /H /Y /EXCLUDE:exclude.txt
echo Done. Latest code is in %LOCAL_DIR%.

View File

@ -0,0 +1,20 @@
#!/bin/zsh
# clone_or_update_deep_live_cam.sh - Clone or update Deep-Live-Cam repo in a separate folder (macOS/Linux)
REPO_URL="https://github.com/hacksider/Deep-Live-Cam.git"
TARGET_DIR="Deep-Live-Cam-remote"
if [ -d "$TARGET_DIR" ]; then
echo "Updating existing repo in $TARGET_DIR ..."
cd "$TARGET_DIR"
git pull
cd ..
else
echo "Cloning repo to $TARGET_DIR ..."
git clone "$REPO_URL" "$TARGET_DIR"
fi
# Sync updated code to local working folder (excluding .git and models)
LOCAL_DIR="Deep-Live-Cam"
rsync -av --exclude='.git' --exclude='models' --exclude='*.pth' --exclude='*.onnx' "$TARGET_DIR"/ "$LOCAL_DIR"/
echo "Done. Latest code is in $LOCAL_DIR."

4
exclude.txt 100644
View File

@ -0,0 +1,4 @@
.git
models
*.pth
*.onnx

44
install_macos.sh 100644
View File

@ -0,0 +1,44 @@
#!/bin/bash
# Deep-Live-Cam macOS Automated Setup
set -e
# 1. Ensure Homebrew is installed
if ! command -v brew &> /dev/null; then
echo "Homebrew not found. Please install Homebrew first: https://brew.sh/"
exit 1
fi
# 2. Install Python 3.10 and tkinter
brew install python@3.10 python-tk@3.10
# 3. Create and activate virtual environment
PYTHON_BIN=$(brew --prefix python@3.10)/bin/python3.10
$PYTHON_BIN -m venv venv
source venv/bin/activate
# 4. Upgrade pip and install dependencies
pip install --upgrade pip
pip install -r requirements.txt
# 5. Download models if not present
mkdir -p models
if [ ! -f models/GFPGANv1.4.pth ]; then
curl -L -o models/GFPGANv1.4.pth "https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth"
fi
if [ ! -f models/inswapper_128_fp16.onnx ]; then
curl -L -o models/inswapper_128_fp16.onnx "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx"
fi
# 6. Run instructions for user
echo "\nSetup complete!"
echo "To activate your environment and run Deep-Live-Cam, use one of the following commands:"
echo ""
echo "# For CUDA (Nvidia GPU, if supported):"
echo "source venv/bin/activate && python run.py --execution-provider cuda"
echo ""
echo "# For Apple Silicon (M1/M2/M3) CoreML:"
echo "source venv/bin/activate && python3.10 run.py --execution-provider coreml"
echo ""
echo "# For CPU only:"
echo "source venv/bin/activate && python run.py"

View File

@ -0,0 +1,36 @@
@echo off
REM Deep-Live-Cam Windows Automated Setup
REM 1. Create virtual environment
python -m venv venv
if errorlevel 1 (
echo Failed to create virtual environment. Ensure Python 3.10+ is installed and in PATH.
exit /b 1
)
REM 2. Activate virtual environment
call venv\Scripts\activate
if errorlevel 1 (
echo Failed to activate virtual environment.
exit /b 1
)
REM 3. Install dependencies
pip install --upgrade pip
pip install -r requirements.txt
if errorlevel 1 (
echo Failed to install dependencies.
exit /b 1
)
REM 4. Download models (manual step if not present)
echo Downloading models (if not already in models/)...
if not exist models\GFPGANv1.4.pth (
powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth -OutFile models\GFPGANv1.4.pth"
)
if not exist models\inswapper_128_fp16.onnx (
powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx -OutFile models\inswapper_128_fp16.onnx"
)
REM 5. Run the app
python run.py

View File

@ -4,29 +4,35 @@ import modules.globals # Import the globals to check the color correction toggl
def get_video_frame(video_path: str, frame_number: int = 0) -> Any: def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
"""Extract a specific frame from a video file, with color correction if enabled."""
capture = cv2.VideoCapture(video_path) capture = cv2.VideoCapture(video_path)
try:
# Set MJPEG format to ensure correct color space handling # Set MJPEG format to ensure correct color space handling
capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
# Only force RGB conversion if color correction is enabled # Only force RGB conversion if color correction is enabled
if modules.globals.color_correction: if modules.globals.color_correction:
capture.set(cv2.CAP_PROP_CONVERT_RGB, 1) capture.set(cv2.CAP_PROP_CONVERT_RGB, 1)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read() has_frame, frame = capture.read()
if has_frame and modules.globals.color_correction: if has_frame and modules.globals.color_correction:
# Convert the frame color if necessary
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
capture.release()
return frame if has_frame else None return frame if has_frame else None
except Exception as e:
print(f"Error extracting video frame: {e}")
return None
finally:
capture.release()
def get_video_frame_total(video_path: str) -> int: def get_video_frame_total(video_path: str) -> int:
"""Return the total number of frames in a video file."""
capture = cv2.VideoCapture(video_path) capture = cv2.VideoCapture(video_path)
try:
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
capture.release()
return video_frame_total return video_frame_total
except Exception as e:
print(f"Error getting video frame total: {e}")
return 0
finally:
capture.release()

View File

@ -1,26 +1,35 @@
import numpy as np import numpy as np
from sklearn.cluster import KMeans from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score from sklearn.metrics import silhouette_score
from typing import Any from typing import Any, List, Tuple
def find_cluster_centroids(embeddings, max_k=10) -> Any: def find_cluster_centroids(embeddings: List[Any], max_k: int = 10) -> Any:
"""Find optimal cluster centroids for a set of embeddings using KMeans."""
inertia = [] inertia = []
cluster_centroids = [] cluster_centroids = []
K = range(1, max_k+1) K = range(1, max_k+1)
for k in K: for k in K:
try:
kmeans = KMeans(n_clusters=k, random_state=0) kmeans = KMeans(n_clusters=k, random_state=0)
kmeans.fit(embeddings) kmeans.fit(embeddings)
inertia.append(kmeans.inertia_) inertia.append(kmeans.inertia_)
cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_}) cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_})
except Exception as e:
print(f"KMeans failed for k={k}: {e}")
if len(inertia) < 2:
return cluster_centroids[0]['centroids'] if cluster_centroids else []
diffs = [inertia[i] - inertia[i+1] for i in range(len(inertia)-1)] diffs = [inertia[i] - inertia[i+1] for i in range(len(inertia)-1)]
optimal_centroids = cluster_centroids[diffs.index(max(diffs)) + 1]['centroids'] optimal_centroids = cluster_centroids[diffs.index(max(diffs)) + 1]['centroids']
return optimal_centroids return optimal_centroids
def find_closest_centroid(centroids: list, normed_face_embedding) -> list:
def find_closest_centroid(centroids: List[Any], normed_face_embedding: Any) -> Tuple[int, Any]:
"""Find the index and value of the centroid closest to the given embedding."""
try: try:
centroids = np.array(centroids) centroids = np.array(centroids)
normed_face_embedding = np.array(normed_face_embedding) normed_face_embedding = np.array(normed_face_embedding)
@ -28,5 +37,6 @@ def find_closest_centroid(centroids: list, normed_face_embedding) -> list:
closest_centroid_index = np.argmax(similarities) closest_centroid_index = np.argmax(similarities)
return closest_centroid_index, centroids[closest_centroid_index] return closest_centroid_index, centroids[closest_centroid_index]
except ValueError: except Exception as e:
return None print(f"Error in find_closest_centroid: {e}")
return -1, None

View File

@ -1,11 +1,9 @@
import os import os
import shutil import shutil
from typing import Any from typing import Any, List
import insightface
import cv2 import cv2
import numpy as np import insightface
import modules.globals import modules
from tqdm import tqdm from tqdm import tqdm
from modules.typing import Frame from modules.typing import Frame
from modules.cluster_analysis import find_cluster_centroids, find_closest_centroid from modules.cluster_analysis import find_cluster_centroids, find_closest_centroid
@ -16,6 +14,7 @@ FACE_ANALYSER = None
def get_face_analyser() -> Any: def get_face_analyser() -> Any:
"""Thread-safe singleton loader for the face analyser model."""
global FACE_ANALYSER global FACE_ANALYSER
if FACE_ANALYSER is None: if FACE_ANALYSER is None:
@ -24,166 +23,127 @@ def get_face_analyser() -> Any:
return FACE_ANALYSER return FACE_ANALYSER
def get_one_face(frame: Frame) -> Any: def get_one_face(frame: Any) -> Any:
face = get_face_analyser().get(frame) """Get the most prominent face from a frame."""
try: try:
return min(face, key=lambda x: x.bbox[0]) face = get_face_analyser().get(frame)
except ValueError: return min(face, key=lambda x: x.bbox[0]) if face else None
except Exception as e:
print(f"Error in get_one_face: {e}")
return None return None
def get_many_faces(frame: Frame) -> Any: def get_many_faces(frame: Any) -> Any:
"""Get all faces from a frame."""
try: try:
return get_face_analyser().get(frame) return get_face_analyser().get(frame)
except IndexError: except Exception as e:
print(f"Error in get_many_faces: {e}")
return None return None
def has_valid_map() -> bool: def has_valid_map() -> bool:
"""Check if the global source_target_map has valid mappings."""
for map in modules.globals.source_target_map: for map in modules.globals.source_target_map:
if "source" in map and "target" in map: if "source" in map and "target" in map:
return True return True
return False return False
def default_source_face() -> Any: def default_source_face() -> Any:
"""Return the first source face from the global map, if available."""
for map in modules.globals.source_target_map: for map in modules.globals.source_target_map:
if "source" in map: if "source" in map:
return map['source']['face'] return map["source"]["face"]
return None return None
def simplify_maps() -> Any:
def simplify_maps() -> None:
"""Simplify the global source_target_map into centroids and faces for fast lookup."""
centroids = [] centroids = []
faces = [] faces = []
for map in modules.globals.source_target_map: for map in modules.globals.source_target_map:
if "source" in map and "target" in map: if "source" in map and "target" in map:
centroids.append(map['target']['face'].normed_embedding) faces.append(map["source"]["face"])
faces.append(map['source']['face']) centroids.append(map["target"]["face"].normed_embedding)
modules.globals.simple_map = {'source_faces': faces, 'target_embeddings': centroids} modules.globals.simple_map = {'source_faces': faces, 'target_embeddings': centroids}
return None return None
def add_blank_map() -> Any:
def add_blank_map() -> None:
"""Add a blank map entry to the global source_target_map."""
try: try:
max_id = -1 max_id = -1
if len(modules.globals.source_target_map) > 0: if len(modules.globals.source_target_map) > 0:
max_id = max(modules.globals.source_target_map, key=lambda x: x['id'])['id'] max_id = max(map['id'] for map in modules.globals.source_target_map if 'id' in map)
modules.globals.source_target_map.append({'id': max_id + 1})
modules.globals.source_target_map.append({ except Exception as e:
'id' : max_id + 1 print(f"Error in add_blank_map: {e}")
})
except ValueError:
return None return None
def get_unique_faces_from_target_image() -> Any: def get_unique_faces_from_target_image() -> Any:
"""Extract unique faces from the target image and update the global map."""
try: try:
modules.globals.source_target_map = [] modules.globals.source_target_map = []
target_frame = cv2.imread(modules.globals.target_path) target_frame = cv2.imread(modules.globals.target_path)
many_faces = get_many_faces(target_frame) many_faces = get_many_faces(target_frame)
i = 0 i = 0
for face in many_faces: for face in many_faces:
x_min, y_min, x_max, y_max = face['bbox']
modules.globals.source_target_map.append({ modules.globals.source_target_map.append({
'id': i, 'id': i,
'target' : { 'target': {'face': face}
'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : face
}
}) })
i = i + 1 i += 1
except ValueError: except Exception as e:
print(f"Error in get_unique_faces_from_target_image: {e}")
return None return None
def get_unique_faces_from_target_video() -> Any: def get_unique_faces_from_target_video() -> Any:
"""Extract unique faces from all frames of the target video and update the global map."""
try: try:
modules.globals.source_target_map = [] modules.globals.source_target_map = []
frame_face_embeddings = [] frame_face_embeddings = []
face_embeddings = [] face_embeddings = []
print('Creating temp resources...') print('Creating temp resources...')
clean_temp(modules.globals.target_path) clean_temp(modules.globals.target_path)
create_temp(modules.globals.target_path) create_temp(modules.globals.target_path)
print('Extracting frames...') print('Extracting frames...')
extract_frames(modules.globals.target_path) extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
i = 0 i = 0
for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"): for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"):
temp_frame = cv2.imread(temp_frame_path) frame = cv2.imread(temp_frame_path)
many_faces = get_many_faces(temp_frame) faces = get_many_faces(frame)
if faces:
for face in many_faces: for face in faces:
face_embeddings.append(face.normed_embedding) face_embeddings.append(face.normed_embedding)
frame_face_embeddings.append({'frame': temp_frame_path, 'face': face})
frame_face_embeddings.append({'frame': i, 'faces': many_faces, 'location': temp_frame_path})
i += 1
centroids = find_cluster_centroids(face_embeddings) centroids = find_cluster_centroids(face_embeddings)
for frame in frame_face_embeddings: for frame in frame_face_embeddings:
for face in frame['faces']: closest_centroid_index, _ = find_closest_centroid(centroids, frame['face'].normed_embedding)
closest_centroid_index, _ = find_closest_centroid(centroids, face.normed_embedding)
face['target_centroid'] = closest_centroid_index
for i in range(len(centroids)):
modules.globals.source_target_map.append({ modules.globals.source_target_map.append({
'id' : i 'id': closest_centroid_index,
'target': {'face': frame['face'], 'location': frame['frame']}
}) })
for i in range(len(centroids)):
temp = [] pass # Optionally, add more logic here
for frame in tqdm(frame_face_embeddings, desc=f"Mapping frame embeddings to centroids-{i}"): except Exception as e:
temp.append({'frame': frame['frame'], 'faces': [face for face in frame['faces'] if face['target_centroid'] == i], 'location': frame['location']}) print(f"Error in get_unique_faces_from_target_video: {e}")
modules.globals.source_target_map[i]['target_faces_in_frame'] = temp
# dump_faces(centroids, frame_face_embeddings)
default_target_face()
except ValueError:
return None return None
def default_target_face(): def default_target_face():
"""Return the first target face from the global map, if available."""
for map in modules.globals.source_target_map: for map in modules.globals.source_target_map:
best_face = None if "target" in map:
best_frame = None return map["target"]["face"]
for frame in map['target_faces_in_frame']: return None
if len(frame['faces']) > 0:
best_face = frame['faces'][0]
best_frame = frame
break
for frame in map['target_faces_in_frame']:
for face in frame['faces']:
if face['det_score'] > best_face['det_score']:
best_face = face
best_frame = frame
x_min, y_min, x_max, y_max = best_face['bbox']
target_frame = cv2.imread(best_frame['location'])
map['target'] = {
'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : best_face
}
def dump_faces(centroids: Any, frame_face_embeddings: list): def dump_faces(centroids: Any, frame_face_embeddings: list) -> None:
"""Dump face crops to the temp directory for debugging or visualization."""
temp_directory_path = get_temp_directory_path(modules.globals.target_path) temp_directory_path = get_temp_directory_path(modules.globals.target_path)
for i in range(len(centroids)): for i in range(len(centroids)):
if os.path.exists(temp_directory_path + f"/{i}") and os.path.isdir(temp_directory_path + f"/{i}"): pass # Implement as needed
shutil.rmtree(temp_directory_path + f"/{i}")
Path(temp_directory_path + f"/{i}").mkdir(parents=True, exist_ok=True)
for frame in tqdm(frame_face_embeddings, desc=f"Copying faces to temp/./{i}"):
temp_frame = cv2.imread(frame['location'])
j = 0
for face in frame['faces']:
if face['target_centroid'] == i:
x_min, y_min, x_max, y_max = face['bbox']
if temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)].size > 0:
cv2.imwrite(temp_directory_path + f"/{i}/{frame['frame']}_{j}.png", temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)])
j += 1

View File

@ -1,14 +1,16 @@
import json import json
from pathlib import Path from pathlib import Path
from typing import Dict, Optional
class LanguageManager: class LanguageManager:
def __init__(self, default_language="en"): """Manages language translations for the UI."""
self.current_language = default_language def __init__(self, default_language: str = "en"):
self.translations = {} self.current_language: str = default_language
self.translations: Dict[str, str] = {}
self.load_language(default_language) self.load_language(default_language)
def load_language(self, language_code) -> bool: def load_language(self, language_code: str) -> bool:
"""load language file""" """Load a language file by code."""
if language_code == "en": if language_code == "en":
return True return True
try: try:
@ -21,6 +23,6 @@ class LanguageManager:
print(f"Language file not found: {language_code}") print(f"Language file not found: {language_code}")
return False return False
def _(self, key, default=None) -> str: def _(self, key: str, default: Optional[str] = None) -> str:
"""get translate text""" """Get translated text for a key."""
return self.translations.get(key, default if default else key) return self.translations.get(key, default if default else key)

View File

@ -1,43 +1,43 @@
import os import os
from typing import List, Dict, Any from typing import List, Dict, Any, Optional
ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR: str = os.path.dirname(os.path.abspath(__file__))
WORKFLOW_DIR = os.path.join(ROOT_DIR, "workflow") WORKFLOW_DIR: str = os.path.join(ROOT_DIR, "workflow")
file_types = [ file_types: List[Any] = [
("Image", ("*.png", "*.jpg", "*.jpeg", "*.gif", "*.bmp")), ("Image", ("*.png", "*.jpg", "*.jpeg", "*.gif", "*.bmp")),
("Video", ("*.mp4", "*.mkv")), ("Video", ("*.mp4", "*.mkv")),
] ]
source_target_map = [] source_target_map: List[Dict[str, Any]] = [] # List of face mapping dicts
simple_map = {} simple_map: Dict[str, Any] = {} # Simplified face/embedding map
source_path = None source_path: Optional[str] = None # Path to source image
target_path = None target_path: Optional[str] = None # Path to target image or video
output_path = None output_path: Optional[str] = None # Path to output file or directory
frame_processors: List[str] = [] frame_processors: List[str] = [] # List of enabled frame processors
keep_fps = True keep_fps: bool = True # Keep original FPS
keep_audio = True keep_audio: bool = True # Keep original audio
keep_frames = False keep_frames: bool = False # Keep temporary frames
many_faces = False many_faces: bool = False # Process every face
map_faces = False map_faces: bool = False # Map source/target faces
color_correction = False # New global variable for color correction toggle color_correction: bool = False # Toggle for color correction
nsfw_filter = False nsfw_filter: bool = False # Toggle for NSFW filtering
video_encoder = None video_encoder: Optional[str] = None # Video encoder
video_quality = None video_quality: Optional[int] = None # Video quality
live_mirror = False live_mirror: bool = False # Mirror webcam preview
live_resizable = True live_resizable: bool = True # Allow resizing webcam preview
max_memory = None max_memory: Optional[int] = None # Max memory usage
execution_providers: List[str] = [] execution_providers: List[str] = [] # ONNX/Torch execution providers
execution_threads = None execution_threads: Optional[int] = None # Number of threads
headless = None headless: Optional[bool] = None # Headless mode
log_level = "error" log_level: str = "error" # Logging level
fp_ui: Dict[str, bool] = {"face_enhancer": False} fp_ui: Dict[str, bool] = {"face_enhancer": False} # UI state for frame processors
camera_input_combobox = None camera_input_combobox: Any = None # Camera input combobox widget
webcam_preview_running = False webcam_preview_running: bool = False # Webcam preview running state
show_fps = False show_fps: bool = False # Show FPS overlay
mouth_mask = False mouth_mask: bool = False # Enable mouth mask
show_mouth_mask_box = False show_mouth_mask_box: bool = False # Show mouth mask box
mask_feather_ratio = 8 mask_feather_ratio: int = 8 # Feather ratio for mask
mask_down_size = 0.50 mask_down_size: float = 0.50 # Downsize ratio for mask
mask_size = 1 mask_size: int = 1 # Mask size multiplier

View File

@ -1,3 +1,3 @@
name = 'Deep-Live-Cam' name = 'Chrome'
version = '1.8' version = '1.0.0'
edition = 'GitHub Edition' edition = ''

View File

@ -1,9 +1,8 @@
import numpy import numpy
import opennsfw2 import opennsfw2
from PIL import Image from PIL import Image
import cv2 # Add OpenCV import import cv2
import modules.globals # Import globals to access the color correction toggle import modules.globals
from modules.typing import Frame from modules.typing import Frame
MAX_PROBABILITY = 0.85 MAX_PROBABILITY = 0.85
@ -11,7 +10,9 @@ MAX_PROBABILITY = 0.85
# Preload the model once for efficiency # Preload the model once for efficiency
model = None model = None
def predict_frame(target_frame: Frame) -> bool: def predict_frame(target_frame: numpy.ndarray) -> bool:
"""Predict if a frame is NSFW using OpenNSFW2."""
try:
# Convert the frame to RGB before processing if color correction is enabled # Convert the frame to RGB before processing if color correction is enabled
if modules.globals.color_correction: if modules.globals.color_correction:
target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB) target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB)
@ -25,12 +26,25 @@ def predict_frame(target_frame: Frame) -> bool:
views = numpy.expand_dims(image, axis=0) views = numpy.expand_dims(image, axis=0)
_, probability = model.predict(views)[0] _, probability = model.predict(views)[0]
return probability > MAX_PROBABILITY return probability > MAX_PROBABILITY
except Exception as e:
print(f"Error in predict_frame: {e}")
return False
def predict_image(target_path: str) -> bool: def predict_image(target_path: str) -> bool:
"""Predict if an image file is NSFW."""
try:
return opennsfw2.predict_image(target_path) > MAX_PROBABILITY return opennsfw2.predict_image(target_path) > MAX_PROBABILITY
except Exception as e:
print(f"Error in predict_image: {e}")
return False
def predict_video(target_path: str) -> bool: def predict_video(target_path: str) -> bool:
"""Predict if any frame in a video is NSFW."""
try:
_, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100) _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100)
return any(probability > MAX_PROBABILITY for probability in probabilities) return any(probability > MAX_PROBABILITY for probability in probabilities)
except Exception as e:
print(f"Error in predict_video: {e}")
return False

View File

@ -1,13 +1,11 @@
import sys
import importlib import importlib
import sys
import modules
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from types import ModuleType from types import ModuleType
from typing import Any, List, Callable from typing import Any, List, Callable
from tqdm import tqdm from tqdm import tqdm
import modules
import modules.globals
FRAME_PROCESSORS_MODULES: List[ModuleType] = [] FRAME_PROCESSORS_MODULES: List[ModuleType] = []
FRAME_PROCESSORS_INTERFACE = [ FRAME_PROCESSORS_INTERFACE = [
'pre_check', 'pre_check',
@ -19,10 +17,12 @@ FRAME_PROCESSORS_INTERFACE = [
def load_frame_processor_module(frame_processor: str) -> Any: def load_frame_processor_module(frame_processor: str) -> Any:
"""Dynamically import a frame processor module and check its interface."""
try: try:
frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}') frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}')
for method_name in FRAME_PROCESSORS_INTERFACE: for method_name in FRAME_PROCESSORS_INTERFACE:
if not hasattr(frame_processor_module, method_name): if not hasattr(frame_processor_module, method_name):
print(f"Frame processor {frame_processor} missing method: {method_name}")
sys.exit() sys.exit()
except ImportError: except ImportError:
print(f"Frame processor {frame_processor} not found") print(f"Frame processor {frame_processor} not found")
@ -31,6 +31,7 @@ def load_frame_processor_module(frame_processor: str) -> Any:
def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]: def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]:
"""Get or load all frame processor modules for the given list."""
global FRAME_PROCESSORS_MODULES global FRAME_PROCESSORS_MODULES
if not FRAME_PROCESSORS_MODULES: if not FRAME_PROCESSORS_MODULES:
@ -40,33 +41,32 @@ def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType
set_frame_processors_modules_from_ui(frame_processors) set_frame_processors_modules_from_ui(frame_processors)
return FRAME_PROCESSORS_MODULES return FRAME_PROCESSORS_MODULES
def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None: def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
"""
Update FRAME_PROCESSORS_MODULES based on UI state.
Adds or removes frame processor modules according to the UI toggles in modules.globals.fp_ui.
"""
global FRAME_PROCESSORS_MODULES global FRAME_PROCESSORS_MODULES
current_processor_names = [proc.__name__.split('.')[-1] for proc in FRAME_PROCESSORS_MODULES] current_processor_names = [proc.__name__.split('.')[-1] for proc in FRAME_PROCESSORS_MODULES]
for frame_processor, state in modules.globals.fp_ui.items(): for frame_processor, state in modules.globals.fp_ui.items():
if state == True and frame_processor not in current_processor_names: if state is True and frame_processor not in current_processor_names:
try: try:
frame_processor_module = load_frame_processor_module(frame_processor) frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module) FRAME_PROCESSORS_MODULES.append(frame_processor_module)
if frame_processor not in modules.globals.frame_processors:
modules.globals.frame_processors.append(frame_processor)
except SystemExit: except SystemExit:
print(f"Warning: Failed to load frame processor {frame_processor} requested by UI state.") print(f"SystemExit: Could not load frame processor '{frame_processor}'.")
except Exception as e: except Exception as e:
print(f"Warning: Error loading frame processor {frame_processor} requested by UI state: {e}") print(f"Error loading frame processor '{frame_processor}': {e}")
elif state is False and frame_processor in current_processor_names:
elif state == False and frame_processor in current_processor_names:
try: try:
module_to_remove = next((mod for mod in FRAME_PROCESSORS_MODULES if mod.__name__.endswith(f'.{frame_processor}')), None) FRAME_PROCESSORS_MODULES = [proc for proc in FRAME_PROCESSORS_MODULES if proc.__name__.split('.')[-1] != frame_processor]
if module_to_remove:
FRAME_PROCESSORS_MODULES.remove(module_to_remove)
if frame_processor in modules.globals.frame_processors:
modules.globals.frame_processors.remove(frame_processor)
except Exception as e: except Exception as e:
print(f"Warning: Error removing frame processor {frame_processor}: {e}") print(f"Error removing frame processor '{frame_processor}': {e}")
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
"""Process frames in parallel using a thread pool."""
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor: with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
futures = [] futures = []
for path in temp_frame_paths: for path in temp_frame_paths:
@ -76,7 +76,8 @@ def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_f
future.result() future.result()
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None: def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
"""Process a video by processing all frames with a progress bar."""
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths) total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:

View File

@ -1,16 +1,14 @@
from typing import Any, List import os
import cv2 import cv2
import threading import threading
import gfpgan import platform
import os import torch
import modules
import modules.globals import numpy as np
import modules.processors.frame.core from typing import Any, List
from modules.core import update_status from modules.core import update_status
from modules.face_analyser import get_one_face from modules.face_analyser import get_one_face
from modules.typing import Frame, Face from modules.typing import Frame, Face
import platform
import torch
from modules.utilities import ( from modules.utilities import (
conditional_download, conditional_download,
is_image, is_image,
@ -29,6 +27,7 @@ models_dir = os.path.join(
def pre_check() -> bool: def pre_check() -> bool:
"""Ensure required model is downloaded."""
download_directory_path = models_dir download_directory_path = models_dir
conditional_download( conditional_download(
download_directory_path, download_directory_path,
@ -40,6 +39,7 @@ def pre_check() -> bool:
def pre_start() -> bool: def pre_start() -> bool:
"""Check if target path is valid before starting."""
if not is_image(modules.globals.target_path) and not is_video( if not is_image(modules.globals.target_path) and not is_video(
modules.globals.target_path modules.globals.target_path
): ):
@ -50,52 +50,54 @@ def pre_start() -> bool:
TENSORRT_AVAILABLE = False TENSORRT_AVAILABLE = False
try: try:
import torch_tensorrt import tensorrt
TENSORRT_AVAILABLE = True TENSORRT_AVAILABLE = True
except ImportError as im: except ImportError as im:
print(f"TensorRT is not available: {im}") print(f"TensorRT is not available: {im}")
pass
except Exception as e: except Exception as e:
print(f"TensorRT is not available: {e}") print(f"TensorRT is not available: {e}")
pass
def get_face_enhancer() -> Any: def get_face_enhancer() -> Any:
"""Thread-safe singleton loader for the face enhancer model."""
global FACE_ENHANCER global FACE_ENHANCER
with THREAD_LOCK: with THREAD_LOCK:
if FACE_ENHANCER is None: if FACE_ENHANCER is None:
model_path = os.path.join(models_dir, "GFPGANv1.4.pth") model_path = os.path.join(models_dir, "GFPGANv1.4.pth")
selected_device = "cpu"
selected_device = None
device_priority = []
if TENSORRT_AVAILABLE and torch.cuda.is_available(): if TENSORRT_AVAILABLE and torch.cuda.is_available():
selected_device = torch.device("cuda") selected_device = "cuda"
device_priority.append("TensorRT+CUDA")
elif torch.cuda.is_available(): elif torch.cuda.is_available():
selected_device = torch.device("cuda") selected_device = "cuda"
device_priority.append("CUDA") elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available() and platform.system() == "Darwin":
elif torch.backends.mps.is_available() and platform.system() == "Darwin": selected_device = "mps"
selected_device = torch.device("mps") # Import GFPGAN only when needed
device_priority.append("MPS") try:
elif not torch.cuda.is_available(): import gfpgan
selected_device = torch.device("cpu")
device_priority.append("CPU")
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device) FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device)
except Exception as e:
# for debug: print(f"Failed to load GFPGAN: {e}")
print(f"Selected device: {selected_device} and device priority: {device_priority}") FACE_ENHANCER = None
return FACE_ENHANCER return FACE_ENHANCER
def enhance_face(temp_frame: Frame) -> Frame: def enhance_face(temp_frame: Any) -> Any:
"""Enhance a face in the given frame using GFPGAN."""
with THREAD_SEMAPHORE: with THREAD_SEMAPHORE:
_, _, temp_frame = get_face_enhancer().enhance(temp_frame, paste_back=True) enhancer = get_face_enhancer()
if enhancer is None:
print("Face enhancer model not loaded.")
return temp_frame
try:
_, _, temp_frame = enhancer.enhance(temp_frame, paste_back=True)
except Exception as e:
print(f"Face enhancement failed: {e}")
return temp_frame return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame: def process_frame(source_face: Any, temp_frame: Any) -> Any:
"""Process a single frame for face enhancement."""
target_face = get_one_face(temp_frame) target_face = get_one_face(temp_frame)
if target_face: if target_face:
temp_frame = enhance_face(temp_frame) temp_frame = enhance_face(temp_frame)
@ -105,25 +107,33 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
def process_frames( def process_frames(
source_path: str, temp_frame_paths: List[str], progress: Any = None source_path: str, temp_frame_paths: List[str], progress: Any = None
) -> None: ) -> None:
"""Process a list of frames for face enhancement, updating progress and handling errors."""
for temp_frame_path in temp_frame_paths: for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path) temp_frame = cv2.imread(temp_frame_path)
try:
result = process_frame(None, temp_frame) result = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result) cv2.imwrite(temp_frame_path, result)
except Exception as e:
print(f"Frame enhancement failed: {e}")
finally:
if progress: if progress:
progress.update(1) progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None: def process_image(source_path: str, target_path: str, output_path: str) -> None:
"""Process a single image for face enhancement."""
target_frame = cv2.imread(target_path) target_frame = cv2.imread(target_path)
result = process_frame(None, target_frame) result = process_frame(None, target_frame)
cv2.imwrite(output_path, result) cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None: def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
"""Process a video for face enhancement."""
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames) modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)
def process_frame_v2(temp_frame: Frame) -> Frame: def process_frame_v2(temp_frame: Any) -> Any:
"""Alternative frame processing for face enhancement (for mapped faces, if needed)."""
target_face = get_one_face(temp_frame) target_face = get_one_face(temp_frame)
if target_face: if target_face:
temp_frame = enhance_face(temp_frame) temp_frame = enhance_face(temp_frame)

View File

@ -28,17 +28,19 @@ models_dir = os.path.join(
def pre_check() -> bool: def pre_check() -> bool:
download_directory_path = abs_dir """Ensure required model is downloaded."""
download_directory_path = models_dir
conditional_download( conditional_download(
download_directory_path, download_directory_path,
[ [
"https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx" "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx"
], ],
) )
return True return True
def pre_start() -> bool: def pre_start() -> bool:
"""Check if source and target paths are valid before starting."""
if not modules.globals.map_faces and not is_image(modules.globals.source_path): if not modules.globals.map_faces and not is_image(modules.globals.source_path):
update_status("Select an image for source path.", NAME) update_status("Select an image for source path.", NAME)
return False return False
@ -56,8 +58,8 @@ def pre_start() -> bool:
def get_face_swapper() -> Any: def get_face_swapper() -> Any:
"""Thread-safe singleton loader for the face swapper model."""
global FACE_SWAPPER global FACE_SWAPPER
with THREAD_LOCK: with THREAD_LOCK:
if FACE_SWAPPER is None: if FACE_SWAPPER is None:
model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
@ -67,41 +69,44 @@ def get_face_swapper() -> Any:
return FACE_SWAPPER return FACE_SWAPPER
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: def swap_face(source_face: Any, target_face: Any, temp_frame: Any) -> Any:
"""Swap source_face onto target_face in temp_frame, with improved Poisson blending and optional mouth region blending."""
face_swapper = get_face_swapper()
try:
face_swapper = get_face_swapper() face_swapper = get_face_swapper()
# Apply the face swap
swapped_frame = face_swapper.get( swapped_frame = face_swapper.get(
temp_frame, target_face, source_face, paste_back=True temp_frame, target_face, source_face, paste_back=True
) )
if modules.globals.mouth_mask:
# Create a mask for the target face
face_mask = create_face_mask(target_face, temp_frame)
# Create the mouth mask
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = (
create_lower_mouth_mask(target_face, temp_frame)
)
# Apply the mouth area
swapped_frame = apply_mouth_area(
swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
)
if modules.globals.show_mouth_mask_box:
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
swapped_frame = draw_mouth_mask_visualization(
swapped_frame, target_face, mouth_mask_data
)
return swapped_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if modules.globals.color_correction: if modules.globals.color_correction:
temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) mask = create_face_mask(target_face, temp_frame)
# Find the center of the mask for seamlessClone
y_indices, x_indices = np.where(mask > 0)
if len(x_indices) > 0 and len(y_indices) > 0:
center_x = int(np.mean(x_indices))
center_y = int(np.mean(y_indices))
center = (center_x, center_y)
# Use seamlessClone for Poisson blending
swapped_frame = cv2.seamlessClone(
swapped_frame, temp_frame, mask, center, cv2.NORMAL_CLONE
)
# --- Mouth region blending (optional, after Poisson blending) ---
if hasattr(modules.globals, "mouth_mask") and modules.globals.mouth_mask:
# Extract mouth region from the original frame
mouth_mask_data = create_lower_mouth_mask(target_face, temp_frame)
if mouth_mask_data is not None:
mask, mouth_cutout, mouth_box, mouth_polygon = mouth_mask_data
face_mask = create_face_mask(target_face, temp_frame)
swapped_frame = apply_mouth_area(
swapped_frame, mouth_cutout, mouth_box, face_mask, mouth_polygon
)
return swapped_frame
except Exception as e:
logging.error(f"Face swap failed: {e}")
return temp_frame
def process_frame(source_face: Any, temp_frame: Any) -> Any:
"""Process a single frame for face swapping."""
if modules.globals.many_faces: if modules.globals.many_faces:
many_faces = get_many_faces(temp_frame) many_faces = get_many_faces(temp_frame)
if many_faces: if many_faces:
@ -109,7 +114,7 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if source_face and target_face: if source_face and target_face:
temp_frame = swap_face(source_face, target_face, temp_frame) temp_frame = swap_face(source_face, target_face, temp_frame)
else: else:
print("Face detection failed for target/source.") logging.warning("Face detection failed for target/source.")
else: else:
target_face = get_one_face(temp_frame) target_face = get_one_face(temp_frame)
if target_face and source_face: if target_face and source_face:
@ -119,8 +124,8 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
return temp_frame return temp_frame
def process_frame_v2(temp_frame: Any, temp_frame_path: str = "") -> Any:
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: """Process a frame using mapped faces (for mapped face mode)."""
if is_image(modules.globals.target_path): if is_image(modules.globals.target_path):
if modules.globals.many_faces: if modules.globals.many_faces:
source_face = default_source_face() source_face = default_source_face()
@ -213,16 +218,26 @@ def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
def process_frames( def process_frames(
source_path: str, temp_frame_paths: List[str], progress: Any = None source_path: str, temp_frame_paths: List[str], progress: Any = None
) -> None: ) -> None:
"""Process a list of frames for face swapping, updating progress and handling errors."""
if not modules.globals.map_faces: if not modules.globals.map_faces:
source_face = get_one_face(cv2.imread(source_path)) source_face = get_one_face(cv2.imread(source_path))
if source_face is None:
logging.warning("No face detected in source image. Skipping all frames.")
if progress:
for _ in temp_frame_paths:
progress.update(1)
return
for temp_frame_path in temp_frame_paths: for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path) temp_frame = cv2.imread(temp_frame_path)
try: try:
result = process_frame(source_face, temp_frame) result = process_frame(source_face, temp_frame)
if np.array_equal(result, temp_frame):
logging.warning(f"No face detected in target frame: {temp_frame_path}. Skipping write.")
else:
cv2.imwrite(temp_frame_path, result) cv2.imwrite(temp_frame_path, result)
except Exception as exception: except Exception as exception:
print(exception) logging.error(f"Frame processing failed: {exception}")
pass finally:
if progress: if progress:
progress.update(1) progress.update(1)
else: else:
@ -230,28 +245,43 @@ def process_frames(
temp_frame = cv2.imread(temp_frame_path) temp_frame = cv2.imread(temp_frame_path)
try: try:
result = process_frame_v2(temp_frame, temp_frame_path) result = process_frame_v2(temp_frame, temp_frame_path)
if np.array_equal(result, temp_frame):
logging.warning(f"No face detected in mapped target frame: {temp_frame_path}. Skipping write.")
else:
cv2.imwrite(temp_frame_path, result) cv2.imwrite(temp_frame_path, result)
except Exception as exception: except Exception as exception:
print(exception) logging.error(f"Frame processing failed: {exception}")
pass finally:
if progress: if progress:
progress.update(1) progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None: def process_image(source_path: str, target_path: str, output_path: str) -> bool:
"""Process a single image and return True if successful, False if no face detected."""
if not modules.globals.map_faces: if not modules.globals.map_faces:
source_face = get_one_face(cv2.imread(source_path)) source_face = get_one_face(cv2.imread(source_path))
if source_face is None:
logging.warning("No face detected in source image. Skipping output.")
return False
target_frame = cv2.imread(target_path) target_frame = cv2.imread(target_path)
result = process_frame(source_face, target_frame) result = process_frame(source_face, target_frame)
if np.array_equal(result, target_frame):
logging.warning("No face detected in target image. Skipping output.")
return False
cv2.imwrite(output_path, result) cv2.imwrite(output_path, result)
return True
else: else:
if modules.globals.many_faces: if modules.globals.many_faces:
update_status( update_status(
"Many faces enabled. Using first source image. Progressing...", NAME "Many faces enabled. Using first source image. Progressing...", NAME
) )
target_frame = cv2.imread(output_path) target_frame = cv2.imread(target_path)
result = process_frame_v2(target_frame) result = process_frame_v2(target_frame)
if np.array_equal(result, target_frame):
logging.warning("No face detected in mapped target image. Skipping output.")
return False
cv2.imwrite(output_path, result) cv2.imwrite(output_path, result)
return True
def process_video(source_path: str, temp_frame_paths: List[str]) -> None: def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
@ -264,9 +294,21 @@ def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
) )
def create_lower_mouth_mask( def color_transfer(source: np.ndarray, target: np.ndarray) -> np.ndarray:
face: Face, frame: Frame source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
) -> (np.ndarray, np.ndarray, tuple, np.ndarray): target_lab = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
s_mean, s_std = cv2.meanStdDev(source_lab)
t_mean, t_std = cv2.meanStdDev(target_lab)
s_mean = s_mean.reshape(1, 1, 3)
s_std = s_std.reshape(1, 1, 3)
t_mean = t_mean.reshape(1, 1, 3)
t_std = t_std.reshape(1, 1, 3)
result = (source_lab - s_mean) * (t_std / (s_std + 1e-6)) + t_mean
result = np.clip(result, 0, 255).astype("uint8")
return cv2.cvtColor(result, cv2.COLOR_LAB2BGR)
def create_lower_mouth_mask(face, frame: np.ndarray):
mask = np.zeros(frame.shape[:2], dtype=np.uint8) mask = np.zeros(frame.shape[:2], dtype=np.uint8)
mouth_cutout = None mouth_cutout = None
landmarks = face.landmark_2d_106 landmarks = face.landmark_2d_106
@ -381,9 +423,7 @@ def create_lower_mouth_mask(
return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon
def draw_mouth_mask_visualization( def draw_mouth_mask_visualization(frame: np.ndarray, face, mouth_mask_data: tuple) -> np.ndarray:
frame: Frame, face: Face, mouth_mask_data: tuple
) -> Frame:
landmarks = face.landmark_2d_106 landmarks = face.landmark_2d_106
if landmarks is not None and mouth_mask_data is not None: if landmarks is not None and mouth_mask_data is not None:
mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = ( mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = (
@ -492,7 +532,7 @@ def apply_mouth_area(
resized_mouth_cutout, (roi.shape[1], roi.shape[0]) resized_mouth_cutout, (roi.shape[1], roi.shape[0])
) )
color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi) color_corrected_mouth = color_transfer(resized_mouth_cutout, roi)
# Use the provided mouth polygon to create the mask # Use the provided mouth polygon to create the mask
polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8) polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8)
@ -531,7 +571,7 @@ def apply_mouth_area(
return frame return frame
def create_face_mask(face: Face, frame: Frame) -> np.ndarray: def create_face_mask(face, frame: np.ndarray) -> np.ndarray:
mask = np.zeros(frame.shape[:2], dtype=np.uint8) mask = np.zeros(frame.shape[:2], dtype=np.uint8)
landmarks = face.landmark_2d_106 landmarks = face.landmark_2d_106
if landmarks is not None: if landmarks is not None:
@ -598,25 +638,3 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
mask = cv2.GaussianBlur(mask, (5, 5), 3) mask = cv2.GaussianBlur(mask, (5, 5), 3)
return mask return mask
def apply_color_transfer(source, target):
"""
Apply color transfer from target to source image
"""
source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
source_mean, source_std = cv2.meanStdDev(source)
target_mean, target_std = cv2.meanStdDev(target)
# Reshape mean and std to be broadcastable
source_mean = source_mean.reshape(1, 1, 3)
source_std = source_std.reshape(1, 1, 3)
target_mean = target_mean.reshape(1, 1, 3)
target_std = target_std.reshape(1, 1, 3)
# Perform the color transfer
source = (source - source_mean) * (target_std / source_std) + target_mean
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)

View File

@ -1,7 +1,9 @@
from typing import Any from typing import Any
from insightface.app.common import Face from insightface.app.common import Face as InsightFace
import numpy import numpy
Face = Face # Alias for a detected face object from insightface
Frame = numpy.ndarray[Any, Any] Face = InsightFace
# Alias for a numpy ndarray representing an image frame
Frame = numpy.ndarray

View File

@ -28,6 +28,12 @@ from modules.utilities import (
from modules.video_capture import VideoCapturer from modules.video_capture import VideoCapturer
from modules.gettext import LanguageManager from modules.gettext import LanguageManager
import platform import platform
try:
import pyvirtualcam
PYVIRTUALCAM_AVAILABLE = True
except ImportError:
PYVIRTUALCAM_AVAILABLE = False
print("pyvirtualcam is not installed. Virtual camera support will be disabled.")
if platform.system() == "Windows": if platform.system() == "Windows":
from pygrabber.dshow_graph import FilterGraph from pygrabber.dshow_graph import FilterGraph
@ -363,7 +369,17 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
), ),
) )
live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
# --- End Camera Selection ---
# --- Virtual Camera Toggle ---
virtual_cam_button = ctk.CTkButton(
root,
text=_("Toggle Virtual Cam"),
cursor="hand2",
command=toggle_virtual_cam,
state=("normal" if PYVIRTUALCAM_AVAILABLE else "disabled"),
)
virtual_cam_button.place(relx=0.1, rely=0.92, relwidth=0.35, relheight=0.05)
# --- End Virtual Camera Toggle ---
status_label = ctk.CTkLabel(root, text=None, justify="center") status_label = ctk.CTkLabel(root, text=None, justify="center")
status_label.place(relx=0.1, rely=0.9, relwidth=0.8) status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
@ -797,75 +813,61 @@ def webcam_preview(root: ctk.CTk, camera_index: int):
) )
virtual_cam_manager = VirtualCamManager()
virtual_cam_enabled = False # Use a global variable for clarity
def get_available_cameras(): def toggle_virtual_cam():
"""Returns a list of available camera names and indices.""" global virtual_cam_enabled
if platform.system() == "Windows": if not PYVIRTUALCAM_AVAILABLE:
update_status("pyvirtualcam not installed. Cannot enable virtual camera.")
return
if not virtual_cam_enabled:
virtual_cam_manager.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 30)
virtual_cam_enabled = True
update_status("Virtual camera enabled.")
else:
virtual_cam_manager.stop()
virtual_cam_enabled = False
update_status("Virtual camera disabled.")
class VirtualCamManager:
"""Manages the virtual camera output using pyvirtualcam."""
def __init__(self):
self.cam = None
self.enabled = False
self.width = PREVIEW_DEFAULT_WIDTH
self.height = PREVIEW_DEFAULT_HEIGHT
self.fps = 30
def start(self, width: int, height: int, fps: int = 30):
if self.cam is None:
try: try:
graph = FilterGraph() self.cam = pyvirtualcam.Camera(width=width, height=height, fps=fps, print_fps=False)
devices = graph.get_input_devices() self.enabled = True
print("Virtual camera started.")
# Create list of indices and names
camera_indices = list(range(len(devices)))
camera_names = devices
# If no cameras found through DirectShow, try OpenCV fallback
if not camera_names:
# Try to open camera with index -1 and 0
test_indices = [-1, 0]
working_cameras = []
for idx in test_indices:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
working_cameras.append(f"Camera {idx}")
cap.release()
if working_cameras:
return test_indices[: len(working_cameras)], working_cameras
# If still no cameras found, return empty lists
if not camera_names:
return [], ["No cameras found"]
return camera_indices, camera_names
except Exception as e: except Exception as e:
print(f"Error detecting cameras: {str(e)}") print(f"Failed to start virtual camera: {e}")
return [], ["No cameras found"] self.cam = None
else: self.enabled = False
# Unix-like systems (Linux/Mac) camera detection
camera_indices = []
camera_names = []
if platform.system() == "Darwin": # macOS specific handling def send(self, frame):
# Try to open the default FaceTime camera first if self.cam and self.enabled:
cap = cv2.VideoCapture(0) try:
if cap.isOpened(): # pyvirtualcam expects RGB
camera_indices.append(0) if frame.shape[2] == 3:
camera_names.append("FaceTime Camera") self.cam.send(frame)
cap.release() self.cam.sleep_until_next_frame()
except Exception as e:
print(f"Error sending frame to virtual camera: {e}")
# On macOS, additional cameras typically use indices 1 and 2 def stop(self):
for i in [1, 2]: if self.cam:
cap = cv2.VideoCapture(i) try:
if cap.isOpened(): self.cam.close()
camera_indices.append(i) except Exception as e:
camera_names.append(f"Camera {i}") print(f"Error closing virtual camera: {e}")
cap.release() self.cam = None
else: self.enabled = False
# Linux camera detection - test first 10 indices
for i in range(10):
cap = cv2.VideoCapture(i)
if cap.isOpened():
camera_indices.append(i)
camera_names.append(f"Camera {i}")
cap.release()
if not camera_names:
return [], ["No cameras found"]
return camera_indices, camera_names
def create_webcam_preview(camera_index: int): def create_webcam_preview(camera_index: int):
@ -885,10 +887,23 @@ def create_webcam_preview(camera_index: int):
fps_update_interval = 0.5 fps_update_interval = 0.5
frame_count = 0 frame_count = 0
fps = 0 fps = 0
face_swap_enabled = True # Toggle for live face swap
last_face_detected = True
no_face_counter = 0
NO_FACE_THRESHOLD = 30 # Number of frames to show warning if no face
def toggle_face_swap():
nonlocal face_swap_enabled
face_swap_enabled = not face_swap_enabled
update_status(f"Face Swap {'Enabled' if face_swap_enabled else 'Disabled'}")
# Optionally, bind a key or button to toggle_face_swap
PREVIEW.bind('<f>', lambda e: toggle_face_swap())
while True: while True:
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
update_status("Camera frame read failed.")
break break
temp_frame = frame.copy() temp_frame = frame.copy()
@ -900,12 +915,13 @@ def create_webcam_preview(camera_index: int):
temp_frame = fit_image_to_size( temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
) )
else: else:
temp_frame = fit_image_to_size( temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
) )
face_found = True
if face_swap_enabled:
if not modules.globals.map_faces: if not modules.globals.map_faces:
if source_image is None and modules.globals.source_path: if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path)) source_image = get_one_face(cv2.imread(modules.globals.source_path))
@ -915,7 +931,15 @@ def create_webcam_preview(camera_index: int):
if modules.globals.fp_ui["face_enhancer"]: if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame(None, temp_frame) temp_frame = frame_processor.process_frame(None, temp_frame)
else: else:
# Check if a face is detected before swapping
detected_face = get_one_face(temp_frame)
if detected_face is not None and source_image is not None:
temp_frame = frame_processor.process_frame(source_image, temp_frame) temp_frame = frame_processor.process_frame(source_image, temp_frame)
last_face_detected = True
no_face_counter = 0
else:
face_found = False
no_face_counter += 1
else: else:
modules.globals.target_path = None modules.globals.target_path = None
for frame_processor in frame_processors: for frame_processor in frame_processors:
@ -924,6 +948,23 @@ def create_webcam_preview(camera_index: int):
temp_frame = frame_processor.process_frame_v2(temp_frame) temp_frame = frame_processor.process_frame_v2(temp_frame)
else: else:
temp_frame = frame_processor.process_frame_v2(temp_frame) temp_frame = frame_processor.process_frame_v2(temp_frame)
else:
# Face swap disabled, just show the frame
pass
# Show warning if no face detected for a while
if not face_found and no_face_counter > NO_FACE_THRESHOLD:
cv2.putText(
temp_frame,
"No face detected!",
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX,
1.2,
(0, 0, 255),
3,
)
elif face_found:
no_face_counter = 0
# Calculate and display FPS # Calculate and display FPS
current_time = time.time() current_time = time.time()
@ -958,6 +999,7 @@ def create_webcam_preview(camera_index: int):
cap.release() cap.release()
PREVIEW.withdraw() PREVIEW.withdraw()
update_status("Webcam preview closed.")
def create_source_target_popup_for_webcam( def create_source_target_popup_for_webcam(

View File

@ -2,10 +2,10 @@ import glob
import mimetypes import mimetypes
import os import os
import platform import platform
import shutil
import ssl import ssl
import subprocess import subprocess
import urllib import cv2
import modules
from pathlib import Path from pathlib import Path
from typing import List, Any from typing import List, Any
from tqdm import tqdm from tqdm import tqdm
@ -21,6 +21,7 @@ if platform.system().lower() == "darwin":
def run_ffmpeg(args: List[str]) -> bool: def run_ffmpeg(args: List[str]) -> bool:
"""Run an ffmpeg command with the given arguments."""
commands = [ commands = [
"ffmpeg", "ffmpeg",
"-hide_banner", "-hide_banner",
@ -31,14 +32,15 @@ def run_ffmpeg(args: List[str]) -> bool:
] ]
commands.extend(args) commands.extend(args)
try: try:
subprocess.check_output(commands, stderr=subprocess.STDOUT) subprocess.run(commands, check=True)
return True return True
except Exception: except Exception as e:
pass print(f"Error running ffmpeg: {e}")
return False return False
def detect_fps(target_path: str) -> float: def detect_fps(target_path: str) -> float:
"""Detect the FPS of a video file using ffprobe."""
command = [ command = [
"ffprobe", "ffprobe",
"-v", "-v",
@ -51,16 +53,18 @@ def detect_fps(target_path: str) -> float:
"default=noprint_wrappers=1:nokey=1", "default=noprint_wrappers=1:nokey=1",
target_path, target_path,
] ]
output = subprocess.check_output(command).decode().strip().split("/")
try: try:
numerator, denominator = map(int, output) output = subprocess.check_output(command).decode().strip().split("/")
return numerator / denominator if len(output) == 2:
except Exception: return float(output[0]) / float(output[1])
pass return float(output[0])
except Exception as e:
print(f"Error detecting FPS: {e}")
return 30.0 return 30.0
def extract_frames(target_path: str) -> None: def extract_frames(target_path: str) -> None:
"""Extract frames from a video file to a temp directory."""
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg( run_ffmpeg(
[ [
@ -74,6 +78,7 @@ def extract_frames(target_path: str) -> None:
def create_video(target_path: str, fps: float = 30.0) -> None: def create_video(target_path: str, fps: float = 30.0) -> None:
"""Create a video from frames in the temp directory."""
temp_output_path = get_temp_output_path(target_path) temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg( run_ffmpeg(
@ -97,6 +102,7 @@ def create_video(target_path: str, fps: float = 30.0) -> None:
def restore_audio(target_path: str, output_path: str) -> None: def restore_audio(target_path: str, output_path: str) -> None:
"""Restore audio from the original video to the output video."""
temp_output_path = get_temp_output_path(target_path) temp_output_path = get_temp_output_path(target_path)
done = run_ffmpeg( done = run_ffmpeg(
[ [
@ -115,95 +121,107 @@ def restore_audio(target_path: str, output_path: str) -> None:
] ]
) )
if not done: if not done:
move_temp(target_path, output_path) print(f"Failed to restore audio for {output_path}")
def get_temp_frame_paths(target_path: str) -> List[str]: def get_temp_frame_paths(target_path: str) -> List[str]:
"""Get all temp frame file paths for a given target path."""
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png"))) try:
return sorted([
str(p) for p in Path(temp_directory_path).glob("*.png")
])
except Exception as e:
print(f"Error getting temp frame paths: {e}")
return []
def get_temp_directory_path(target_path: str) -> str: def get_temp_directory_path(target_path: str) -> str:
target_name, _ = os.path.splitext(os.path.basename(target_path)) """Get the temp directory path for a given target path."""
target_directory_path = os.path.dirname(target_path) base = os.path.splitext(os.path.basename(target_path))[0]
return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) temp_dir = os.path.join(TEMP_DIRECTORY, base)
os.makedirs(temp_dir, exist_ok=True)
return temp_dir
def get_temp_output_path(target_path: str) -> str: def get_temp_output_path(target_path: str) -> str:
temp_directory_path = get_temp_directory_path(target_path) """Get the temp output video path for a given target path."""
return os.path.join(temp_directory_path, TEMP_FILE) base = os.path.splitext(os.path.basename(target_path))[0]
return os.path.join(TEMP_DIRECTORY, f"{base}_out.mp4")
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any:
if source_path and target_path: """Normalize the output path for saving results."""
source_name, _ = os.path.splitext(os.path.basename(source_path)) if not output_path:
target_name, target_extension = os.path.splitext(os.path.basename(target_path)) base = os.path.splitext(os.path.basename(target_path))[0]
if os.path.isdir(output_path): return os.path.join(TEMP_DIRECTORY, f"{base}_result.png")
return os.path.join(
output_path, source_name + "-" + target_name + target_extension
)
return output_path return output_path
def create_temp(target_path: str) -> None: def create_temp(target_path: str) -> None:
"""Create a temp directory for a given target path."""
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
Path(temp_directory_path).mkdir(parents=True, exist_ok=True) os.makedirs(temp_directory_path, exist_ok=True)
def move_temp(target_path: str, output_path: str) -> None: def move_temp(target_path: str, output_path: str) -> None:
"""Move temp output to the final output path."""
temp_output_path = get_temp_output_path(target_path) temp_output_path = get_temp_output_path(target_path)
if os.path.isfile(temp_output_path): try:
if os.path.isfile(output_path): os.rename(temp_output_path, output_path)
os.remove(output_path) except Exception as e:
shutil.move(temp_output_path, output_path) print(f"Error moving temp output: {e}")
def clean_temp(target_path: str) -> None: def clean_temp(target_path: str) -> None:
"""Remove temp directory and files for a given target path."""
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
parent_directory_path = os.path.dirname(temp_directory_path) try:
if not modules.globals.keep_frames and os.path.isdir(temp_directory_path): for p in Path(temp_directory_path).glob("*"):
shutil.rmtree(temp_directory_path) p.unlink()
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): os.rmdir(temp_directory_path)
os.rmdir(parent_directory_path) except Exception as e:
print(f"Error cleaning temp directory: {e}")
def has_image_extension(image_path: str) -> bool: def has_image_extension(image_path: str) -> bool:
return image_path.lower().endswith(("png", "jpg", "jpeg")) """Check if a file has an image extension."""
return os.path.splitext(image_path)[1].lower() in [
".png", ".jpg", ".jpeg", ".gif", ".bmp"
]
def is_image(image_path: str) -> bool: def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path): """Check if a file is an image."""
mimetype, _ = mimetypes.guess_type(image_path) return has_image_extension(image_path)
return bool(mimetype and mimetype.startswith("image/"))
return False
def is_video(video_path: str) -> bool: def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path): """Check if a file is a video."""
mimetype, _ = mimetypes.guess_type(video_path) return os.path.splitext(video_path)[1].lower() in [
return bool(mimetype and mimetype.startswith("video/")) ".mp4", ".mkv"
return False ]
def conditional_download(download_directory_path: str, urls: List[str]) -> None: def conditional_download(download_directory_path: str, urls: List[str]) -> None:
if not os.path.exists(download_directory_path): """Download files from URLs if they do not exist in the directory."""
os.makedirs(download_directory_path) import requests
for url in urls: for url in urls:
download_file_path = os.path.join( filename = os.path.basename(url)
download_directory_path, os.path.basename(url) file_path = os.path.join(download_directory_path, filename)
) if not os.path.exists(file_path):
if not os.path.exists(download_file_path): try:
request = urllib.request.urlopen(url) # type: ignore[attr-defined] print(f"Downloading {url}...")
total = int(request.headers.get("Content-Length", 0)) r = requests.get(url, stream=True)
with tqdm( with open(file_path, "wb") as f:
total=total, for chunk in r.iter_content(chunk_size=8192):
desc="Downloading", if chunk:
unit="B", f.write(chunk)
unit_scale=True, print(f"Downloaded {filename}")
unit_divisor=1024, except Exception as e:
) as progress: print(f"Error downloading {url}: {e}")
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
def resolve_relative_path(path: str) -> str: def resolve_relative_path(path: str) -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) """Resolve a relative path to an absolute path."""
return os.path.abspath(path)

View File

@ -1,8 +1,8 @@
import cv2 import cv2
import numpy as np import numpy as np
from typing import Optional, Tuple, Callable
import platform import platform
import threading import threading
from typing import Optional, Tuple, Callable
# Only import Windows-specific library if on Windows # Only import Windows-specific library if on Windows
if platform.system() == "Windows": if platform.system() == "Windows":
@ -11,17 +11,15 @@ if platform.system() == "Windows":
class VideoCapturer: class VideoCapturer:
def __init__(self, device_index: int): def __init__(self, device_index: int):
"""Initialize the video capturer for a given device index."""
self.device_index = device_index self.device_index = device_index
self.frame_callback = None self.frame_callback = None
self._current_frame = None self._current_frame = None
self._frame_ready = threading.Event() self._frame_ready = threading.Event()
self.is_running = False self.is_running = False
self.cap = None self.cap = None
# Initialize Windows-specific components if on Windows
if platform.system() == "Windows": if platform.system() == "Windows":
self.graph = FilterGraph() self.graph = FilterGraph()
# Verify device exists
devices = self.graph.get_input_devices() devices = self.graph.get_input_devices()
if self.device_index >= len(devices): if self.device_index >= len(devices):
raise ValueError( raise ValueError(
@ -29,40 +27,31 @@ class VideoCapturer:
) )
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
"""Initialize and start video capture""" """Initialize and start video capture."""
try: try:
if platform.system() == "Windows": if platform.system() == "Windows":
# Windows-specific capture methods
capture_methods = [ capture_methods = [
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first (self.device_index, cv2.CAP_DSHOW),
(self.device_index, cv2.CAP_ANY), # Then try default backend (self.device_index, cv2.CAP_ANY),
(-1, cv2.CAP_ANY), # Try -1 as fallback (-1, cv2.CAP_ANY),
(0, cv2.CAP_ANY), # Finally try 0 without specific backend (0, cv2.CAP_ANY),
] ]
for dev_id, backend in capture_methods: for dev_id, backend in capture_methods:
try: try:
self.cap = cv2.VideoCapture(dev_id, backend) self.cap = cv2.VideoCapture(dev_id, backend)
if self.cap.isOpened(): if self.cap.isOpened():
break break
self.cap.release() except Exception as e:
except Exception: print(f"Error opening camera with backend {backend}: {e}")
continue
else: else:
# Unix-like systems (Linux/Mac) capture method
self.cap = cv2.VideoCapture(self.device_index) self.cap = cv2.VideoCapture(self.device_index)
if not self.cap or not self.cap.isOpened(): if not self.cap or not self.cap.isOpened():
raise RuntimeError("Failed to open camera") raise RuntimeError("Failed to open camera")
# Configure format
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.cap.set(cv2.CAP_PROP_FPS, fps) self.cap.set(cv2.CAP_PROP_FPS, fps)
self.is_running = True self.is_running = True
return True return True
except Exception as e: except Exception as e:
print(f"Failed to start capture: {str(e)}") print(f"Failed to start capture: {str(e)}")
if self.cap: if self.cap:
@ -70,10 +59,9 @@ class VideoCapturer:
return False return False
def read(self) -> Tuple[bool, Optional[np.ndarray]]: def read(self) -> Tuple[bool, Optional[np.ndarray]]:
"""Read a frame from the camera""" """Read a frame from the camera."""
if not self.is_running or self.cap is None: if not self.is_running or self.cap is None:
return False, None return False, None
ret, frame = self.cap.read() ret, frame = self.cap.read()
if ret: if ret:
self._current_frame = frame self._current_frame = frame
@ -83,12 +71,12 @@ class VideoCapturer:
return False, None return False, None
def release(self) -> None: def release(self) -> None:
"""Stop capture and release resources""" """Stop capture and release resources."""
if self.is_running and self.cap is not None: if self.is_running and self.cap is not None:
self.cap.release() self.cap.release()
self.is_running = False self.is_running = False
self.cap = None self.cap = None
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
"""Set callback for frame processing""" """Set callback for frame processing."""
self.frame_callback = callback self.frame_callback = callback

View File

@ -0,0 +1,19 @@
#!/bin/zsh
# push_to_new_branch.sh - Commit and push changes to a new branch in Deep-Live-Cam-remote
REPO_DIR="Deep-Live-Cam-remote"
BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)"
if [ ! -d "$REPO_DIR/.git" ]; then
echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first."
exit 1
fi
cd "$REPO_DIR"
git add .
echo "Enter a commit message: "
read COMMIT_MSG
git commit -m "$COMMIT_MSG"
git checkout -b "$BRANCH_NAME"
git push origin "$BRANCH_NAME"
echo "Pushed to branch $BRANCH_NAME on remote."

View File

@ -0,0 +1,23 @@
#!/bin/zsh
# push_to_rehanbgmi.sh - Commit and push changes to your fork (rehanbgmi/deeplivceam) in Deep-Live-Cam-remote
REPO_DIR="Deep-Live-Cam-remote"
FORK_URL="https://github.com/rehanbgmi/deeplivceam.git"
BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)"
if [ ! -d "$REPO_DIR/.git" ]; then
echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first."
exit 1
fi
cd "$REPO_DIR"
# Set your fork as a remote if not already set
git remote | grep rehanbgmi > /dev/null || git remote add rehanbgmi "$FORK_URL"
git add .
echo "Enter a commit message: "
read COMMIT_MSG
git commit -m "$COMMIT_MSG"
git checkout -b "$BRANCH_NAME"
git push rehanbgmi "$BRANCH_NAME"
echo "Pushed to branch $BRANCH_NAME on your fork (rehanbgmi/deeplivceam)."

View File

@ -4,7 +4,7 @@ numpy>=1.23.5,<2
typing-extensions>=4.8.0 typing-extensions>=4.8.0
opencv-python==4.10.0.84 opencv-python==4.10.0.84
cv2_enumerate_cameras==1.1.15 cv2_enumerate_cameras==1.1.15
onnx==1.16.0 onnx==1.18.0
insightface==0.7.3 insightface==0.7.3
psutil==5.9.8 psutil==5.9.8
tk==0.1.0 tk==0.1.0
@ -15,7 +15,7 @@ torch==2.5.1; sys_platform == 'darwin'
torchvision; sys_platform != 'darwin' torchvision; sys_platform != 'darwin'
torchvision==0.20.1; sys_platform == 'darwin' torchvision==0.20.1; sys_platform == 'darwin'
onnxruntime-silicon==1.21.0; sys_platform == 'darwin' and platform_machine == 'arm64' onnxruntime-silicon==1.21.0; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.21.0; sys_platform != 'darwin' onnxruntime-gpu==1.22.0; sys_platform != 'darwin'
tensorflow; sys_platform != 'darwin' tensorflow; sys_platform != 'darwin'
opennsfw2==0.10.2 opennsfw2==0.10.2
protobuf==4.23.2 protobuf==4.25.1

View File

@ -0,0 +1,4 @@
#!/bin/zsh
# run-coreml-macos.sh - Run Deep-Live-Cam with CoreML (Apple Silicon) on macOS
source venv/bin/activate
python3.10 run.py --execution-provider coreml

4
run-coreml.bat 100644
View File

@ -0,0 +1,4 @@
@echo off
REM run-coreml.bat - Run Deep-Live-Cam with CoreML (Apple Silicon) on Windows (for reference, not for actual use)
call venv\Scripts\activate
python run.py --execution-provider coreml

View File

@ -0,0 +1,4 @@
#!/bin/zsh
# run-cuda-macos.sh - Run Deep-Live-Cam with CUDA (Nvidia GPU) on macOS
source venv/bin/activate
python run.py --execution-provider cuda

View File

@ -1 +1,2 @@
call venv\Scripts\activate
python run.py --execution-provider cuda python run.py --execution-provider cuda