PR #844 - Pygrabber + Mac fix

Pygrabber + Mac fix
pull/846/head
KRSHH 2024-12-22 18:34:32 +05:30 committed by GitHub
commit 47c8f7acc0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 298 additions and 187 deletions

View File

@ -26,7 +26,7 @@ nsfw_filter = False
video_encoder = None
video_quality = None
live_mirror = False
live_resizable = False
live_resizable = True
max_memory = None
execution_providers: List[str] = []
execution_threads = None

View File

@ -1,3 +1,3 @@
name = 'Deep-Live-Cam'
version = '1.7.0'
edition = 'Github Edition'
version = '1.7.5'
edition = 'GitHub Edition'

View File

@ -9,6 +9,8 @@ import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.typing import Frame, Face
import platform
import torch
from modules.utilities import (
conditional_download,
is_image,
@ -21,7 +23,10 @@ THREAD_LOCK = threading.Lock()
NAME = "DLC.FACE-ENHANCER"
abs_dir = os.path.dirname(os.path.abspath(__file__))
models_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), 'models')
models_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
)
def pre_check() -> bool:
download_directory_path = models_dir
@ -48,8 +53,14 @@ def get_face_enhancer() -> Any:
with THREAD_LOCK:
if FACE_ENHANCER is None:
model_path = os.path.join(models_dir, 'GFPGANv1.4.pth')
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
model_path = os.path.join(models_dir, "GFPGANv1.4.pth")
if platform.system() == "Darwin": # Mac OS
mps_device = None
if torch.backends.mps.is_available():
mps_device = torch.device("mps")
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=mps_device) # type: ignore[attr-defined]
else: # Other OS
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
return FACE_ENHANCER

View File

@ -21,7 +21,10 @@ THREAD_LOCK = threading.Lock()
NAME = "DLC.FACE-SWAPPER"
abs_dir = os.path.dirname(os.path.abspath(__file__))
models_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), 'models')
models_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
)
def pre_check() -> bool:
download_directory_path = abs_dir
@ -56,7 +59,7 @@ def get_face_swapper() -> Any:
with THREAD_LOCK:
if FACE_SWAPPER is None:
model_path = os.path.join(models_dir, 'inswapper_128_fp16.onnx')
model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
FACE_SWAPPER = insightface.model_zoo.get_model(
model_path, providers=modules.globals.execution_providers
)

View File

@ -7,7 +7,7 @@ from cv2_enumerate_cameras import enumerate_cameras # Add this import
from PIL import Image, ImageOps
import time
import json
from pygrabber.dshow_graph import FilterGraph
import modules.globals
import modules.metadata
from modules.face_analyser import (
@ -26,6 +26,8 @@ from modules.utilities import (
resolve_relative_path,
has_image_extension,
)
from modules.video_capture import VideoCapturer
import platform
ROOT = None
POPUP = None
@ -96,7 +98,7 @@ def save_switch_states():
"fp_ui": modules.globals.fp_ui,
"show_fps": modules.globals.show_fps,
"mouth_mask": modules.globals.mouth_mask,
"show_mouth_mask_box": modules.globals.show_mouth_mask_box
"show_mouth_mask_box": modules.globals.show_mouth_mask_box,
}
with open("switch_states.json", "w") as f:
json.dump(switch_states, f)
@ -118,7 +120,9 @@ def load_switch_states():
modules.globals.fp_ui = switch_states.get("fp_ui", {"face_enhancer": False})
modules.globals.show_fps = switch_states.get("show_fps", False)
modules.globals.mouth_mask = switch_states.get("mouth_mask", False)
modules.globals.show_mouth_mask_box = switch_states.get("show_mouth_mask_box", False)
modules.globals.show_mouth_mask_box = switch_states.get(
"show_mouth_mask_box", False
)
except FileNotFoundError:
# If the file doesn't exist, use default values
pass
@ -315,18 +319,22 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
camera_label.place(relx=0.1, rely=0.86, relwidth=0.2, relheight=0.05)
available_cameras = get_available_cameras()
# Convert camera indices to strings for CTkOptionMenu
available_camera_indices, available_camera_strings = available_cameras
camera_variable = ctk.StringVar(
value=(
available_camera_strings[0]
if available_camera_strings
else "No cameras found"
camera_indices, camera_names = available_cameras
if not camera_names or camera_names[0] == "No cameras found":
camera_variable = ctk.StringVar(value="No cameras found")
camera_optionmenu = ctk.CTkOptionMenu(
root,
variable=camera_variable,
values=["No cameras found"],
state="disabled",
)
)
camera_optionmenu = ctk.CTkOptionMenu(
root, variable=camera_variable, values=available_camera_strings
)
else:
camera_variable = ctk.StringVar(value=camera_names[0])
camera_optionmenu = ctk.CTkOptionMenu(
root, variable=camera_variable, values=camera_names
)
camera_optionmenu.place(relx=0.35, rely=0.86, relwidth=0.25, relheight=0.05)
live_button = ctk.CTkButton(
@ -335,9 +343,16 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
cursor="hand2",
command=lambda: webcam_preview(
root,
available_camera_indices[
available_camera_strings.index(camera_variable.get())
],
(
camera_indices[camera_names.index(camera_variable.get())]
if camera_names and camera_names[0] != "No cameras found"
else None
),
),
state=(
"normal"
if camera_names and camera_names[0] != "No cameras found"
else "disabled"
),
)
live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
@ -745,7 +760,7 @@ def update_preview(frame_number: int = 0) -> None:
def webcam_preview(root: ctk.CTk, camera_index: int):
if not modules.globals.map_faces:
if modules.globals.source_path is None:
# No image selected
update_status("Please select a source image first")
return
create_webcam_preview(camera_index)
else:
@ -757,40 +772,78 @@ def webcam_preview(root: ctk.CTk, camera_index: int):
def get_available_cameras():
"""Returns a list of available camera names and indices."""
camera_indices = []
camera_names = []
if platform.system() == "Windows":
try:
graph = FilterGraph()
devices = graph.get_input_devices()
for camera in enumerate_cameras():
cap = cv2.VideoCapture(camera.index)
if cap.isOpened():
camera_indices.append(camera.index)
camera_names.append(camera.name)
cap.release()
return (camera_indices, camera_names)
# Create list of indices and names
camera_indices = list(range(len(devices)))
camera_names = devices
# If no cameras found through DirectShow, try OpenCV fallback
if not camera_names:
# Try to open camera with index -1 and 0
test_indices = [-1, 0]
working_cameras = []
for idx in test_indices:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
working_cameras.append(f"Camera {idx}")
cap.release()
if working_cameras:
return test_indices[: len(working_cameras)], working_cameras
# If still no cameras found, return empty lists
if not camera_names:
return [], ["No cameras found"]
return camera_indices, camera_names
except Exception as e:
print(f"Error detecting cameras: {str(e)}")
return [], ["No cameras found"]
else:
# Unix-like systems (Linux/Mac) camera detection
camera_indices = []
camera_names = []
# Test the first 10 indices
for i in range(10):
cap = cv2.VideoCapture(i)
if cap.isOpened():
camera_indices.append(i)
camera_names.append(f"Camera {i}")
cap.release()
if not camera_names:
return [], ["No cameras found"]
return camera_indices, camera_names
def create_webcam_preview(camera_index: int):
global preview_label, PREVIEW
camera = cv2.VideoCapture(camera_index)
camera.set(cv2.CAP_PROP_FRAME_WIDTH, PREVIEW_DEFAULT_WIDTH)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, PREVIEW_DEFAULT_HEIGHT)
camera.set(cv2.CAP_PROP_FPS, 60)
cap = VideoCapturer(camera_index)
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
update_status("Failed to start camera")
return
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None
prev_time = time.time()
fps_update_interval = 0.5 # Update FPS every 0.5 seconds
fps_update_interval = 0.5
frame_count = 0
fps = 0
while camera:
ret, frame = camera.read()
while True:
ret, frame = cap.read()
if not ret:
break
@ -804,6 +857,11 @@ def create_webcam_preview(camera_index: int):
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
if not modules.globals.map_faces:
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
@ -816,7 +874,6 @@ def create_webcam_preview(camera_index: int):
temp_frame = frame_processor.process_frame(source_image, temp_frame)
else:
modules.globals.target_path = None
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
@ -855,7 +912,7 @@ def create_webcam_preview(camera_index: int):
if PREVIEW.state() == "withdrawn":
break
camera.release()
cap.release()
PREVIEW.withdraw()

View File

@ -12,16 +12,23 @@ from tqdm import tqdm
import modules.globals
TEMP_FILE = 'temp.mp4'
TEMP_DIRECTORY = 'temp'
TEMP_FILE = "temp.mp4"
TEMP_DIRECTORY = "temp"
# monkey patch ssl for mac
if platform.system().lower() == 'darwin':
if platform.system().lower() == "darwin":
ssl._create_default_https_context = ssl._create_unverified_context
def run_ffmpeg(args: List[str]) -> bool:
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', modules.globals.log_level]
commands = [
"ffmpeg",
"-hide_banner",
"-hwaccel",
"auto",
"-loglevel",
modules.globals.log_level,
]
commands.extend(args)
try:
subprocess.check_output(commands, stderr=subprocess.STDOUT)
@ -32,8 +39,19 @@ def run_ffmpeg(args: List[str]) -> bool:
def detect_fps(target_path: str) -> float:
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
output = subprocess.check_output(command).decode().strip().split('/')
command = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=r_frame_rate",
"-of",
"default=noprint_wrappers=1:nokey=1",
target_path,
]
output = subprocess.check_output(command).decode().strip().split("/")
try:
numerator, denominator = map(int, output)
return numerator / denominator
@ -44,25 +62,65 @@ def detect_fps(target_path: str) -> float:
def extract_frames(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')])
run_ffmpeg(
[
"-i",
target_path,
"-pix_fmt",
"rgb24",
os.path.join(temp_directory_path, "%04d.png"),
]
)
def create_video(target_path: str, fps: float = 30.0) -> None:
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
run_ffmpeg(
[
"-r",
str(fps),
"-i",
os.path.join(temp_directory_path, "%04d.png"),
"-c:v",
modules.globals.video_encoder,
"-crf",
str(modules.globals.video_quality),
"-pix_fmt",
"yuv420p",
"-vf",
"colorspace=bt709:iall=bt601-6-625:fast=1",
"-y",
temp_output_path,
]
)
def restore_audio(target_path: str, output_path: str) -> None:
temp_output_path = get_temp_output_path(target_path)
done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
done = run_ffmpeg(
[
"-i",
temp_output_path,
"-i",
target_path,
"-c:v",
"copy",
"-map",
"0:v:0",
"-map",
"1:a:0",
"-y",
output_path,
]
)
if not done:
move_temp(target_path, output_path)
def get_temp_frame_paths(target_path: str) -> List[str]:
temp_directory_path = get_temp_directory_path(target_path)
return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png')))
return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png")))
def get_temp_directory_path(target_path: str) -> str:
@ -81,7 +139,9 @@ def normalize_output_path(source_path: str, target_path: str, output_path: str)
source_name, _ = os.path.splitext(os.path.basename(source_path))
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
if os.path.isdir(output_path):
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
return os.path.join(
output_path, source_name + "-" + target_name + target_extension
)
return output_path
@ -108,20 +168,20 @@ def clean_temp(target_path: str) -> None:
def has_image_extension(image_path: str) -> bool:
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
return image_path.lower().endswith(("png", "jpg", "jpeg"))
def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path):
mimetype, _ = mimetypes.guess_type(image_path)
return bool(mimetype and mimetype.startswith('image/'))
return bool(mimetype and mimetype.startswith("image/"))
return False
def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path):
mimetype, _ = mimetypes.guess_type(video_path)
return bool(mimetype and mimetype.startswith('video/'))
return bool(mimetype and mimetype.startswith("video/"))
return False
@ -129,12 +189,20 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None:
if not os.path.exists(download_directory_path):
os.makedirs(download_directory_path)
for url in urls:
download_file_path = os.path.join(download_directory_path, os.path.basename(url))
download_file_path = os.path.join(
download_directory_path, os.path.basename(url)
)
if not os.path.exists(download_file_path):
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get('Content-Length', 0))
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get("Content-Length", 0))
with tqdm(
total=total,
desc="Downloading",
unit="B",
unit_scale=True,
unit_divisor=1024,
) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
def resolve_relative_path(path: str) -> str:

View File

@ -0,0 +1,94 @@
import cv2
import numpy as np
from typing import Optional, Tuple, Callable
import platform
import threading
# Only import Windows-specific library if on Windows
if platform.system() == "Windows":
from pygrabber.dshow_graph import FilterGraph
class VideoCapturer:
def __init__(self, device_index: int):
self.device_index = device_index
self.frame_callback = None
self._current_frame = None
self._frame_ready = threading.Event()
self.is_running = False
self.cap = None
# Initialize Windows-specific components if on Windows
if platform.system() == "Windows":
self.graph = FilterGraph()
# Verify device exists
devices = self.graph.get_input_devices()
if self.device_index >= len(devices):
raise ValueError(
f"Invalid device index {device_index}. Available devices: {len(devices)}"
)
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
"""Initialize and start video capture"""
try:
if platform.system() == "Windows":
# Windows-specific capture methods
capture_methods = [
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first
(self.device_index, cv2.CAP_ANY), # Then try default backend
(-1, cv2.CAP_ANY), # Try -1 as fallback
(0, cv2.CAP_ANY), # Finally try 0 without specific backend
]
for dev_id, backend in capture_methods:
try:
self.cap = cv2.VideoCapture(dev_id, backend)
if self.cap.isOpened():
break
self.cap.release()
except Exception:
continue
else:
# Unix-like systems (Linux/Mac) capture method
self.cap = cv2.VideoCapture(self.device_index)
if not self.cap or not self.cap.isOpened():
raise RuntimeError("Failed to open camera")
# Configure format
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.cap.set(cv2.CAP_PROP_FPS, fps)
self.is_running = True
return True
except Exception as e:
print(f"Failed to start capture: {str(e)}")
if self.cap:
self.cap.release()
return False
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
"""Read a frame from the camera"""
if not self.is_running or self.cap is None:
return False, None
ret, frame = self.cap.read()
if ret:
self._current_frame = frame
if self.frame_callback:
self.frame_callback(frame)
return True, frame
return False, None
def release(self) -> None:
"""Stop capture and release resources"""
if self.is_running and self.cap is not None:
self.cap.release()
self.is_running = False
self.cap = None
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
"""Set callback for frame processing"""
self.frame_callback = callback

View File

@ -1,122 +0,0 @@
@echo off
setlocal EnableDelayedExpansion
:: 1. Setup your platform
echo Setting up your platform...
:: Python
where python >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo Python is not installed. Please install Python 3.10 or later.
pause
exit /b
)
:: Pip
where pip >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo Pip is not installed. Please install Pip.
pause
exit /b
)
:: Git
where git >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo Git is not installed. Installing Git...
winget install --id Git.Git -e --source winget
)
:: FFMPEG
where ffmpeg >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo FFMPEG is not installed. Installing FFMPEG...
winget install --id Gyan.FFmpeg -e --source winget
)
:: Visual Studio 2022 Runtimes
echo Installing Visual Studio 2022 Runtimes...
winget install --id Microsoft.VC++2015-2022Redist-x64 -e --source winget
:: 2. Clone Repository
if exist Deep-Live-Cam (
echo Deep-Live-Cam directory already exists.
set /p overwrite="Do you want to overwrite? (Y/N): "
if /i "%overwrite%"=="Y" (
rmdir /s /q Deep-Live-Cam
git clone https://github.com/hacksider/Deep-Live-Cam.git
) else (
echo Skipping clone, using existing directory.
)
) else (
git clone https://github.com/hacksider/Deep-Live-Cam.git
)
cd Deep-Live-Cam
:: 3. Download Models
echo Downloading models...
mkdir models
curl -L -o models/GFPGANv1.4.pth https://path.to.model/GFPGANv1.4.pth
curl -L -o models/inswapper_128_fp16.onnx https://path.to.model/inswapper_128_fp16.onnx
:: 4. Install dependencies
echo Creating a virtual environment...
python -m venv venv
call venv\Scripts\activate
echo Installing required Python packages...
pip install --upgrade pip
pip install -r requirements.txt
echo Setup complete. You can now run the application.
:: GPU Acceleration Options
echo.
echo Choose the GPU Acceleration Option if applicable:
echo 1. CUDA (Nvidia)
echo 2. CoreML (Apple Silicon)
echo 3. CoreML (Apple Legacy)
echo 4. DirectML (Windows)
echo 5. OpenVINO (Intel)
echo 6. None
set /p choice="Enter your choice (1-6): "
if "%choice%"=="1" (
echo Installing CUDA dependencies...
pip uninstall -y onnxruntime onnxruntime-gpu
pip install onnxruntime-gpu==1.16.3
set exec_provider="cuda"
) else if "%choice%"=="2" (
echo Installing CoreML (Apple Silicon) dependencies...
pip uninstall -y onnxruntime onnxruntime-silicon
pip install onnxruntime-silicon==1.13.1
set exec_provider="coreml"
) else if "%choice%"=="3" (
echo Installing CoreML (Apple Legacy) dependencies...
pip uninstall -y onnxruntime onnxruntime-coreml
pip install onnxruntime-coreml==1.13.1
set exec_provider="coreml"
) else if "%choice%"=="4" (
echo Installing DirectML dependencies...
pip uninstall -y onnxruntime onnxruntime-directml
pip install onnxruntime-directml==1.15.1
set exec_provider="directml"
) else if "%choice%"=="5" (
echo Installing OpenVINO dependencies...
pip uninstall -y onnxruntime onnxruntime-openvino
pip install onnxruntime-openvino==1.15.0
set exec_provider="openvino"
) else (
echo Skipping GPU acceleration setup.
)
:: Run the application
if defined exec_provider (
echo Running the application with %exec_provider% execution provider...
python run.py --execution-provider %exec_provider%
) else (
echo Running the application...
python run.py
)
pause