Merge remote-tracking branch 'parent/experimental' into experimental

pull/372/head
Aleksandr Spiridonov 2024-08-15 12:15:53 -04:00 committed by Kenneth Estanislao
parent 22abb8c25f
commit df99f6ca17
4 changed files with 419 additions and 166 deletions

View File

@ -158,6 +158,8 @@ options:
--many-faces process every face --many-faces process every face
--video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder --video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder
--video-quality [0-51] adjust output video quality --video-quality [0-51] adjust output video quality
--live-mirror the live camera display as you see it in the front-facing camera frame
--live-resizable the live camera frame is resizable
--max-memory MAX_MEMORY maximum amount of RAM in GB --max-memory MAX_MEMORY maximum amount of RAM in GB
--execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...) --execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...)
--execution-threads EXECUTION_THREADS number of execution threads --execution-threads EXECUTION_THREADS number of execution threads

View File

@ -1,16 +1,17 @@
import os import os
import sys import sys
# single thread doubles cuda performance - needs to be set before torch import
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings import warnings
from typing import List
import platform import platform
import signal import signal
import shutil import shutil
import argparse import argparse
from typing import List
# Set environment variables for CUDA performance and TensorFlow logging
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import torch import torch
import onnxruntime import onnxruntime
import tensorflow import tensorflow
@ -19,34 +20,60 @@ import modules.globals
import modules.metadata import modules.metadata
import modules.ui as ui import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path from modules.utilities import (
has_image_extension,
if 'ROCMExecutionProvider' in modules.globals.execution_providers: is_image,
del torch is_video,
detect_fps,
create_video,
extract_frames,
get_temp_frame_paths,
restore_audio,
create_temp,
move_temp,
clean_temp,
normalize_output_path
)
# Filter warnings
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface') warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
# Cross-platform resource management
if platform.system() == 'Darwin' and 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
def parse_args() -> None: def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser() program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='select an source image', dest='source_path') program.add_argument('-s', '--source', help='Select a source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') program.add_argument('-t', '--target', help='Select a target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path') program.add_argument('-o', '--output', help='Select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+') program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor',
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False) default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) program.add_argument('--keep-fps', help='Keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) program.add_argument('--keep-audio', help='Keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False) program.add_argument('--keep-frames', help='Keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9']) program.add_argument('--many-faces', help='Process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]') program.add_argument('--video-encoder', help='Adjust output video encoder', dest='video_encoder', default='libx264',
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory()) choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+') program.add_argument('--video-quality', help='Adjust output video quality', dest='video_quality', type=int, default=18,
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads()) choices=range(52), metavar='[0-51]')
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}') program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame',
dest='live_mirror', action='store_true', default=False)
program.add_argument('--live-resizable', help='The live camera frame is resizable',
dest='live_resizable', action='store_true', default=False)
program.add_argument('--max-memory', help='Maximum amount of RAM in GB', dest='max_memory', type=int,
default=suggest_max_memory())
program.add_argument('--execution-provider', help='Execution provider', dest='execution_provider', default=['cpu'],
choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='Number of execution threads', dest='execution_threads', type=int,
default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version',
version=f'{modules.metadata.name} {modules.metadata.version}')
# register deprecated args # Register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated') program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int) program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated') program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
@ -56,7 +83,8 @@ def parse_args() -> None:
modules.globals.source_path = args.source_path modules.globals.source_path = args.source_path
modules.globals.target_path = args.target_path modules.globals.target_path = args.target_path
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path) modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path,
args.output_path)
modules.globals.frame_processors = args.frame_processor modules.globals.frame_processors = args.frame_processor
modules.globals.headless = args.source_path or args.target_path or args.output_path modules.globals.headless = args.source_path or args.target_path or args.output_path
modules.globals.keep_fps = args.keep_fps modules.globals.keep_fps = args.keep_fps
@ -65,23 +93,28 @@ def parse_args() -> None:
modules.globals.many_faces = args.many_faces modules.globals.many_faces = args.many_faces
modules.globals.video_encoder = args.video_encoder modules.globals.video_encoder = args.video_encoder
modules.globals.video_quality = args.video_quality modules.globals.video_quality = args.video_quality
modules.globals.live_mirror = args.live_mirror
modules.globals.live_resizable = args.live_resizable
modules.globals.max_memory = args.max_memory modules.globals.max_memory = args.max_memory
modules.globals.execution_providers = decode_execution_providers(args.execution_provider) modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
modules.globals.execution_threads = args.execution_threads modules.globals.execution_threads = args.execution_threads
#for ENHANCER tumbler: # Handle face enhancer tumbler
if 'face_enhancer' in args.frame_processor: modules.globals.fp_ui['face_enhancer'] = 'face_enhancer' in args.frame_processor
modules.globals.fp_ui['face_enhancer'] = True
else:
modules.globals.fp_ui['face_enhancer'] = False
modules.globals.nsfw = False modules.globals.nsfw = False
# translate deprecated args # Handle deprecated arguments
handle_deprecated_args(args)
def handle_deprecated_args(args) -> None:
"""Handle deprecated arguments by translating them to the new format."""
if args.source_path_deprecated: if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m') print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
modules.globals.source_path = args.source_path_deprecated modules.globals.source_path = args.source_path_deprecated
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path) modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path,
args.output_path)
if args.cpu_cores_deprecated: if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m') print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.cpu_cores_deprecated modules.globals.execution_threads = args.cpu_cores_deprecated
@ -92,7 +125,7 @@ def parse_args() -> None:
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m') print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['cuda']) modules.globals.execution_providers = decode_execution_providers(['cuda'])
if args.gpu_vendor_deprecated == 'amd': if args.gpu_vendor_deprecated == 'amd':
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m') print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider rocm instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['rocm']) modules.globals.execution_providers = decode_execution_providers(['rocm'])
if args.gpu_threads_deprecated: if args.gpu_threads_deprecated:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m') print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
@ -100,18 +133,22 @@ def parse_args() -> None:
def encode_execution_providers(execution_providers: List[str]) -> List[str]: def encode_execution_providers(execution_providers: List[str]) -> List[str]:
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers] return [provider.replace('ExecutionProvider', '').lower() for provider in execution_providers]
def decode_execution_providers(execution_providers: List[str]) -> List[str]: def decode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers())) available_providers = onnxruntime.get_available_providers()
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)] encoded_providers = encode_execution_providers(available_providers)
selected_providers = [available_providers[encoded_providers.index(req)] for req in execution_providers
if req in encoded_providers]
# Default to CPU if no suitable providers are found
return selected_providers if selected_providers else ['CPUExecutionProvider']
def suggest_max_memory() -> int: def suggest_max_memory() -> int:
if platform.system().lower() == 'darwin': return 4 if platform.system().lower() == 'darwin' else 16
return 4
return 16
def suggest_execution_providers() -> List[str]: def suggest_execution_providers() -> List[str]:
@ -119,34 +156,41 @@ def suggest_execution_providers() -> List[str]:
def suggest_execution_threads() -> int: def suggest_execution_threads() -> int:
if 'DmlExecutionProvider' in modules.globals.execution_providers: if 'dml' in modules.globals.execution_providers:
return 1 return 1
if 'ROCMExecutionProvider' in modules.globals.execution_providers: if 'rocm' in modules.globals.execution_providers:
return 1 return 1
return 8 return 8
def limit_resources() -> None: def limit_resources() -> None:
# prevent tensorflow memory leak # Prevent TensorFlow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU') gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus: for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True) tensorflow.config.experimental.set_memory_growth(gpu, True)
# limit memory usage
# Limit memory usage
if modules.globals.max_memory: if modules.globals.max_memory:
memory = modules.globals.max_memory * 1024 ** 3 memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin': if platform.system().lower() == 'darwin':
memory = modules.globals.max_memory * 1024 ** 6 memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'windows': elif platform.system().lower() == 'windows':
import ctypes import ctypes
kernel32 = ctypes.windll.kernel32 kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory)) kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else: else:
import resource import resource
try:
soft, hard = resource.getrlimit(resource.RLIMIT_DATA)
if memory > hard:
print(f"Warning: Requested memory limit {memory / (1024 ** 3)} GB exceeds system's hard limit. Setting to maximum allowed {hard / (1024 ** 3)} GB.")
memory = hard
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory)) resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
except ValueError as e:
print(f"Warning: Could not set memory limit: {e}. Continuing with default limits.")
def release_resources() -> None: def release_resources() -> None:
if 'CUDAExecutionProvider' in modules.globals.execution_providers: if 'cuda' in modules.globals.execution_providers:
torch.cuda.empty_cache() torch.cuda.empty_cache()
@ -157,49 +201,85 @@ def pre_check() -> bool:
if not shutil.which('ffmpeg'): if not shutil.which('ffmpeg'):
update_status('ffmpeg is not installed.') update_status('ffmpeg is not installed.')
return False return False
if 'cuda' in modules.globals.execution_providers and not torch.cuda.is_available():
update_status('CUDA is not available. Please check your GPU or CUDA installation.')
return False
return True return True
def update_status(message: str, scope: str = 'DLC.CORE') -> None: def update_status(message: str, scope: str = 'DLC.CORE') -> None:
print(f'[{scope}] {message}') print(f'[{scope}] {message}')
if not modules.globals.headless: if not modules.globals.headless and ui.status_label:
ui.update_status(message) ui.update_status(message)
def start() -> None: def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start(): if not frame_processor.pre_start():
return return
# process image to image
# Process image to image
if has_image_extension(modules.globals.target_path): if has_image_extension(modules.globals.target_path):
if modules.globals.nsfw == False: process_image_to_image()
return
# Process image to video
process_image_to_video()
def process_image_to_image() -> None:
if modules.globals.nsfw:
from modules.predicter import predict_image from modules.predicter import predict_image
if predict_image(modules.globals.target_path): if predict_image(modules.globals.target_path):
destroy() destroy(to_quit=False)
update_status('Processing to image ignored!')
return
try:
shutil.copy2(modules.globals.target_path, modules.globals.output_path) shutil.copy2(modules.globals.target_path, modules.globals.output_path)
except Exception as e:
print("Error copying file:", str(e))
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME) update_status('Processing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path) frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources() release_resources()
if is_image(modules.globals.target_path): if is_image(modules.globals.target_path):
update_status('Processing to image succeed!') update_status('Processing to image succeeded!')
else: else:
update_status('Processing to image failed!') update_status('Processing to image failed!')
return
# process image to videos
if modules.globals.nsfw == False: def process_image_to_video() -> None:
if modules.globals.nsfw:
from modules.predicter import predict_video from modules.predicter import predict_video
if predict_video(modules.globals.target_path): if predict_video(modules.globals.target_path):
destroy() destroy(to_quit=False)
update_status('Creating temp resources...') update_status('Processing to video ignored!')
return
update_status('Creating temporary resources...')
create_temp(modules.globals.target_path) create_temp(modules.globals.target_path)
update_status('Extracting frames...') update_status('Extracting frames...')
extract_frames(modules.globals.target_path) extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME) update_status('Processing...', frame_processor.NAME)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths) frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
release_resources() release_resources()
# handles fps
handle_video_fps()
handle_video_audio()
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeeded!')
else:
update_status('Processing to video failed!')
def handle_video_fps() -> None:
if modules.globals.keep_fps: if modules.globals.keep_fps:
update_status('Detecting fps...') update_status('Detecting fps...')
fps = detect_fps(modules.globals.target_path) fps = detect_fps(modules.globals.target_path)
@ -208,7 +288,9 @@ def start() -> None:
else: else:
update_status('Creating video with 30.0 fps...') update_status('Creating video with 30.0 fps...')
create_video(modules.globals.target_path) create_video(modules.globals.target_path)
# handle audio
def handle_video_audio() -> None:
if modules.globals.keep_audio: if modules.globals.keep_audio:
if modules.globals.keep_fps: if modules.globals.keep_fps:
update_status('Restoring audio...') update_status('Restoring audio...')
@ -217,21 +299,16 @@ def start() -> None:
restore_audio(modules.globals.target_path, modules.globals.output_path) restore_audio(modules.globals.target_path, modules.globals.output_path)
else: else:
move_temp(modules.globals.target_path, modules.globals.output_path) move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!')
else:
update_status('Processing to video failed!')
def destroy() -> None: def destroy(to_quit=True) -> None:
if modules.globals.target_path: if modules.globals.target_path:
clean_temp(modules.globals.target_path) clean_temp(modules.globals.target_path)
quit() if to_quit: quit()
def run() -> None: def run() -> None:
try:
parse_args() parse_args()
if not pre_check(): if not pre_check():
return return
@ -244,3 +321,7 @@ def run() -> None:
else: else:
window = ui.init(start, destroy) window = ui.init(start, destroy)
window.mainloop() window.mainloop()
except Exception as e:
print(f"UI initialization failed: {str(e)}")
update_status(f"UI initialization failed: {str(e)}")
destroy() # Ensure any resources are cleaned up on failure

View File

@ -19,6 +19,8 @@ keep_frames = None
many_faces = None many_faces = None
video_encoder = None video_encoder = None
video_quality = None video_quality = None
live_mirror = None
live_resizable = None
max_memory = None max_memory = None
execution_providers: List[str] = [] execution_providers: List[str] = []
execution_threads = None execution_threads = None

View File

@ -1,9 +1,19 @@
import os import os
import platform
import webbrowser import webbrowser
import customtkinter as ctk import customtkinter as ctk
from typing import Callable, Tuple from typing import Callable, Tuple, List, Any
from types import ModuleType
import cv2 import cv2
from PIL import Image, ImageOps from PIL import Image, ImageOps
from pygrabber.dshow_graph import FilterGraph
import pyvirtualcam
# Import OS-specific modules only when necessary
if platform.system() == 'Darwin': # macOS
import objc
from Foundation import NSObject
import AVFoundation
import modules.globals import modules.globals
import modules.metadata import modules.metadata
@ -13,12 +23,14 @@ from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import is_image, is_video, resolve_relative_path from modules.utilities import is_image, is_video, resolve_relative_path
ROOT = None ROOT = None
ROOT_HEIGHT = 700 ROOT_HEIGHT = 800
ROOT_WIDTH = 600 ROOT_WIDTH = 600
PREVIEW = None PREVIEW = None
PREVIEW_MAX_HEIGHT = 700 PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200 PREVIEW_MAX_WIDTH = 1200
PREVIEW_DEFAULT_WIDTH = 960
PREVIEW_DEFAULT_HEIGHT = 540
RECENT_DIRECTORY_SOURCE = None RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None RECENT_DIRECTORY_TARGET = None
@ -32,10 +44,49 @@ status_label = None
img_ft, vid_ft = modules.globals.file_types img_ft, vid_ft = modules.globals.file_types
camera = None
def check_camera_permissions():
"""Check and request camera access permission on macOS."""
if platform.system() == 'Darwin': # macOS-specific
auth_status = AVFoundation.AVCaptureDevice.authorizationStatusForMediaType_(AVFoundation.AVMediaTypeVideo)
if auth_status == AVFoundation.AVAuthorizationStatusNotDetermined:
# Request access to the camera
def completion_handler(granted):
if granted:
print("Access granted to the camera.")
else:
print("Access denied to the camera.")
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(AVFoundation.AVMediaTypeVideo, completion_handler)
elif auth_status == AVFoundation.AVAuthorizationStatusAuthorized:
print("Camera access already authorized.")
elif auth_status == AVFoundation.AVAuthorizationStatusDenied:
print("Camera access denied. Please enable it in System Preferences.")
elif auth_status == AVFoundation.AVAuthorizationStatusRestricted:
print("Camera access restricted. The app is not allowed to use the camera.")
def select_camera(camera_name: str):
"""Select the appropriate camera based on its name (cross-platform)."""
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.localizedName() == camera_name:
return device
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# On Windows/Linux, simply return the camera name as OpenCV can handle it by index
return camera_name
return None
def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk: def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
global ROOT, PREVIEW global ROOT, PREVIEW
if platform.system() == 'Darwin': # macOS-specific
check_camera_permissions() # Check camera permissions before initializing the UI
ROOT = create_root(start, destroy) ROOT = create_root(start, destroy)
PREVIEW = create_preview(ROOT) PREVIEW = create_preview(ROOT)
@ -49,86 +100,99 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
ctk.set_appearance_mode('system') ctk.set_appearance_mode('system')
ctk.set_default_color_theme(resolve_relative_path('ui.json')) ctk.set_default_color_theme(resolve_relative_path('ui.json'))
print("Creating root window...")
root = ctk.CTk() root = ctk.CTk()
root.minsize(ROOT_WIDTH, ROOT_HEIGHT) root.minsize(ROOT_WIDTH, ROOT_HEIGHT)
root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}') root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}')
root.configure()
root.protocol('WM_DELETE_WINDOW', lambda: destroy()) root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = ctk.CTkLabel(root, text=None) source_label = ctk.CTkLabel(root, text=None)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25) source_label.place(relx=0.1, rely=0.0875, relwidth=0.3, relheight=0.25)
target_label = ctk.CTkLabel(root, text=None) target_label = ctk.CTkLabel(root, text=None)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25) target_label.place(relx=0.6, rely=0.0875, relwidth=0.3, relheight=0.25)
select_face_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path()) source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=select_source_path)
select_face_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1) source_button.place(relx=0.1, rely=0.35, relwidth=0.3, relheight=0.1)
select_target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path()) target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=select_target_path)
select_target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1) target_button.place(relx=0.6, rely=0.35, relwidth=0.3, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps) keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps)) keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps))
keep_fps_checkbox.place(relx=0.1, rely=0.6) keep_fps_checkbox.place(relx=0.1, rely=0.525)
keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames) keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames)
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get())) keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch.place(relx=0.1, rely=0.65) keep_frames_switch.place(relx=0.1, rely=0.56875)
# for FRAME PROCESSOR ENHANCER tumbler:
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer']) enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer', enhancer_value.get())) enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer', enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.7) enhancer_switch.place(relx=0.1, rely=0.6125)
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio) keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get())) keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_switch.place(relx=0.6, rely=0.6) keep_audio_switch.place(relx=0.6, rely=0.525)
many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces) many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces)
many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get())) many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get()))
many_faces_switch.place(relx=0.6, rely=0.65) many_faces_switch.place(relx=0.6, rely=0.56875)
# nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw) nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw)
# nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get())) nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
# nsfw_switch.place(relx=0.6, rely=0.7) nsfw_switch.place(relx=0.6, rely=0.6125)
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start)) start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05) start_button.place(relx=0.15, rely=0.7, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy()) stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy)
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05) stop_button.place(relx=0.4, rely=0.7, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview()) preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview)
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05) preview_button.place(relx=0.65, rely=0.7, relwidth=0.2, relheight=0.05)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview()) camera_label = ctk.CTkLabel(root, text="Select Camera:")
live_button.place(relx=0.40, rely=0.86, relwidth=0.2, relheight=0.05) camera_label.place(relx=0.4, rely=0.7525, relwidth=0.2, relheight=0.05)
available_cameras = get_available_cameras()
available_camera_strings = [str(cam) for cam in available_cameras]
camera_variable = ctk.StringVar(value=available_camera_strings[0] if available_camera_strings else "No cameras found")
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, values=available_camera_strings)
camera_optionmenu.place(relx=0.65, rely=0.7525, relwidth=0.2, relheight=0.05)
virtual_cam_out_value = ctk.BooleanVar(value=False)
virtual_cam_out_switch = ctk.CTkSwitch(root, text='Virtual Cam Output (OBS)', variable=virtual_cam_out_value, cursor='hand2')
virtual_cam_out_switch.place(relx=0.4, rely=0.805)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(camera_variable.get(), virtual_cam_out_value.get()))
live_button.place(relx=0.15, rely=0.7525, relwidth=0.2, relheight=0.05)
status_label = ctk.CTkLabel(root, text=None, justify='center') status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, rely=0.9, relwidth=0.8) status_label.place(relx=0.1, relwidth=0.8, rely=0.875)
donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2') donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2')
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8) donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color')) donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color'))
donate_label.bind('<Button>', lambda event: webbrowser.open('https://paypal.me/hacksider')) donate_label.bind('<Button-1>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
return root return root
def create_preview(parent: ctk.CTkToplevel) -> ctk.CTkToplevel: def create_preview(parent: ctk.CTk) -> ctk.CTkToplevel:
global preview_label, preview_slider global preview_label, preview_slider
preview = ctk.CTkToplevel(parent) preview = ctk.CTkToplevel(parent)
preview.withdraw() preview.withdraw()
preview.title('Preview') preview.title('Preview')
preview.configure() preview.protocol('WM_DELETE_WINDOW', toggle_preview)
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview()) preview.resizable(width=True, height=True)
preview.resizable(width=False, height=False)
preview_label = ctk.CTkLabel(preview, text=None) preview_label = ctk.CTkLabel(preview, text=None)
preview_label.pack(fill='both', expand=True) preview_label.pack(fill='both', expand=True)
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=lambda frame_value: update_preview(frame_value)) preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=update_preview)
return preview return preview
@ -143,10 +207,10 @@ def update_tumbler(var: str, value: bool) -> None:
def select_source_path() -> None: def select_source_path() -> None:
global RECENT_DIRECTORY_SOURCE, img_ft, vid_ft global RECENT_DIRECTORY_SOURCE
PREVIEW.withdraw() PREVIEW.withdraw()
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft]) source_path = ctk.filedialog.askopenfilename(title='Select a source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
if is_image(source_path): if is_image(source_path):
modules.globals.source_path = source_path modules.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path) RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
@ -158,10 +222,10 @@ def select_source_path() -> None:
def select_target_path() -> None: def select_target_path() -> None:
global RECENT_DIRECTORY_TARGET, img_ft, vid_ft global RECENT_DIRECTORY_TARGET
PREVIEW.withdraw() PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft]) target_path = ctk.filedialog.askopenfilename(title='Select a target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
if is_image(target_path): if is_image(target_path):
modules.globals.target_path = target_path modules.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path) RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
@ -178,12 +242,12 @@ def select_target_path() -> None:
def select_output_path(start: Callable[[], None]) -> None: def select_output_path(start: Callable[[], None]) -> None:
global RECENT_DIRECTORY_OUTPUT, img_ft, vid_ft global RECENT_DIRECTORY_OUTPUT
if is_image(modules.globals.target_path): if is_image(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT) output_path = ctk.filedialog.asksaveasfilename(title='Save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
elif is_video(modules.globals.target_path): elif is_video(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT) output_path = ctk.filedialog.asksaveasfilename(title='Save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
else: else:
output_path = None output_path = None
if output_path: if output_path:
@ -204,13 +268,13 @@ def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: i
if frame_number: if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read() has_frame, frame = capture.read()
capture.release()
if has_frame: if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size: if size:
image = ImageOps.fit(image, size, Image.LANCZOS) image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size) return ctk.CTkImage(image, size=image.size)
capture.release() return None
cv2.destroyAllWindows()
def toggle_preview() -> None: def toggle_preview() -> None:
@ -220,12 +284,17 @@ def toggle_preview() -> None:
init_preview() init_preview()
update_preview() update_preview()
PREVIEW.deiconify() PREVIEW.deiconify()
global camera
if PREVIEW.state() == 'withdrawn':
if camera and camera.isOpened():
camera.release()
camera = None
def init_preview() -> None: def init_preview() -> None:
if is_image(modules.globals.target_path): if is_image(modules.globals.target_path):
preview_slider.pack_forget() preview_slider.pack_forget()
if is_video(modules.globals.target_path): elif is_video(modules.globals.target_path):
video_frame_total = get_video_frame_total(modules.globals.target_path) video_frame_total = get_video_frame_total(modules.globals.target_path)
preview_slider.configure(to=video_frame_total) preview_slider.configure(to=video_frame_total)
preview_slider.pack(fill='x') preview_slider.pack(fill='x')
@ -235,7 +304,7 @@ def init_preview() -> None:
def update_preview(frame_number: int = 0) -> None: def update_preview(frame_number: int = 0) -> None:
if modules.globals.source_path and modules.globals.target_path: if modules.globals.source_path and modules.globals.target_path:
temp_frame = get_video_frame(modules.globals.target_path, frame_number) temp_frame = get_video_frame(modules.globals.target_path, frame_number)
if modules.globals.nsfw == False: if not modules.globals.nsfw:
from modules.predicter import predict_frame from modules.predicter import predict_frame
if predict_frame(temp_frame): if predict_frame(temp_frame):
quit() quit()
@ -249,51 +318,150 @@ def update_preview(frame_number: int = 0) -> None:
image = ctk.CTkImage(image, size=image.size) image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image) preview_label.configure(image=image)
def webcam_preview(): def webcam_preview_loop(cap: cv2.VideoCapture, source_image: Any, frame_processors: List[ModuleType], virtual_cam: pyvirtualcam.Camera = None) -> bool:
if modules.globals.source_path is None:
# No image selected
return
global preview_label, PREVIEW global preview_label, PREVIEW
cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
PREVIEW_MAX_WIDTH = 960
PREVIEW_MAX_HEIGHT = 540
preview_label.configure(image=None) # Reset the preview image before startup
PREVIEW.deiconify() # Open preview window
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None # Initialize variable for the selected face image
while True:
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
break update_status(f"Error: Frame not received from camera.")
return False
# Select and save face image only once temp_frame = frame.copy()
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
temp_frame = frame.copy() #Create a copy of the frame if modules.globals.live_mirror:
temp_frame = cv2.flip(temp_frame, 1) # horizontal flipping
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height())
for frame_processor in frame_processors: for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame) temp_frame = frame_processor.process_frame(source_image, temp_frame)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) # Convert the image to RGB format to display it with Tkinter image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = Image.fromarray(image) image = ImageOps.contain(image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size) image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image) preview_label.configure(image=image)
if virtual_cam:
virtual_cam.send(temp_frame)
virtual_cam.sleep_until_next_frame()
ROOT.update() ROOT.update()
if PREVIEW.state() == 'withdrawn': if PREVIEW.state() == 'withdrawn':
break return False
return True
def fit_image_to_size(image, width: int, height: int):
if width is None and height is None:
return image
h, w, _ = image.shape
ratio_h = 0.0
ratio_w = 0.0
if width > height:
ratio_h = height / h
else:
ratio_w = width / w
ratio = max(ratio_w, ratio_h)
new_size = (int(ratio * w), int(ratio * h))
return cv2.resize(image, dsize=new_size)
def webcam_preview(camera_name: str, virtual_cam_output: bool):
if modules.globals.source_path is None:
return
global preview_label, PREVIEW
WIDTH = 960
HEIGHT = 540
FPS = 60
# Select the camera by its name
selected_camera = select_camera(camera_name)
if selected_camera is None:
update_status(f"No suitable camera found.")
return
# Use OpenCV's camera index for cross-platform compatibility
camera_index = get_camera_index_by_name(camera_name)
global camera
camera = cv2.VideoCapture(camera_index)
if not camera.isOpened():
update_status(f"Error: Could not open camera {camera_name}")
return
camera.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
camera.set(cv2.CAP_PROP_FPS, FPS)
PREVIEW_MAX_WIDTH = WIDTH
PREVIEW_MAX_HEIGHT = HEIGHT
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = get_one_face(cv2.imread(modules.globals.source_path))
preview_running = True
if virtual_cam_output:
with pyvirtualcam.Camera(width=WIDTH, height=HEIGHT, fps=FPS, fmt=pyvirtualcam.PixelFormat.BGR) as virtual_cam:
while preview_running:
preview_running = webcam_preview_loop(cap, source_image, frame_processors, virtual_cam)
while preview_running:
preview_running = webcam_preview_loop(cap, source_image, frame_processors)
if camera: camera.release()
PREVIEW.withdraw()
def get_camera_index_by_name(camera_name: str) -> int:
"""Map camera name to index for OpenCV."""
if platform.system() == 'Darwin': # macOS-specific
if "FaceTime" in camera_name:
return 0 # Assuming FaceTime is at index 0
elif "iPhone" in camera_name:
return 1 # Assuming iPhone camera is at index 1
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# Map camera name to index dynamically (OpenCV on these platforms usually starts with 0)
return get_available_cameras().index(camera_name)
return -1
def get_available_cameras():
"""Get available camera names (cross-platform)."""
available_cameras = []
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.deviceType() == AVFoundation.AVCaptureDeviceTypeBuiltInWideAngleCamera:
print(f"Found Built-In Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeExternal":
print(f"Found External Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeContinuityCamera":
print(f"Skipping Continuity Camera: {device.localizedName()}")
elif platform.system() == 'Windows' or platform.system() == 'Linux':
try:
devices = FilterGraph().get_input_devices()
except Exception as e:
# Use OpenCV to detect camera indexes
index = 0
devices = []
while True:
cap = cv2.VideoCapture(index)
if not cap.isOpened():
break
devices.append(f"Camera {index}")
cap.release() cap.release()
PREVIEW.withdraw() # Close preview window when loop is finished index += 1
available_cameras = devices
return available_cameras