Revert "Merge pull request #293 from vic4key/experimental"

This reverts commit eab5ba7027.
pull/372/head
Kenneth Estanislao 2024-08-16 13:47:12 +08:00
parent eab5ba7027
commit 22abb8c25f
4 changed files with 152 additions and 364 deletions

View File

@ -158,8 +158,6 @@ options:
--many-faces process every face --many-faces process every face
--video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder --video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder
--video-quality [0-51] adjust output video quality --video-quality [0-51] adjust output video quality
--live-mirror the live camera display as you see it in the front-facing camera frame
--live-resizable the live camera frame is resizable
--max-memory MAX_MEMORY maximum amount of RAM in GB --max-memory MAX_MEMORY maximum amount of RAM in GB
--execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...) --execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...)
--execution-threads EXECUTION_THREADS number of execution threads --execution-threads EXECUTION_THREADS number of execution threads

View File

@ -1,17 +1,16 @@
import os import os
import sys import sys
# single thread doubles cuda performance - needs to be set before torch import
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings import warnings
from typing import List
import platform import platform
import signal import signal
import shutil import shutil
import argparse import argparse
from typing import List
# Set environment variables for CUDA performance and TensorFlow logging
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import torch import torch
import onnxruntime import onnxruntime
import tensorflow import tensorflow
@ -20,60 +19,34 @@ import modules.globals
import modules.metadata import modules.metadata
import modules.ui as ui import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import ( from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
has_image_extension,
is_image, if 'ROCMExecutionProvider' in modules.globals.execution_providers:
is_video, del torch
detect_fps,
create_video,
extract_frames,
get_temp_frame_paths,
restore_audio,
create_temp,
move_temp,
clean_temp,
normalize_output_path
)
# Filter warnings
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface') warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
# Cross-platform resource management
if platform.system() == 'Darwin' and 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
def parse_args() -> None: def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser() program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='Select a source image', dest='source_path') program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='Select a target image or video', dest='target_path') program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='Select output file or directory', dest='output_path') program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor', program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+') program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-fps', help='Keep original fps', dest='keep_fps', action='store_true', default=False) program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-audio', help='Keep original audio', dest='keep_audio', action='store_true', default=True) program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--keep-frames', help='Keep temporary frames', dest='keep_frames', action='store_true', default=False) program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--many-faces', help='Process every face', dest='many_faces', action='store_true', default=False) program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-encoder', help='Adjust output video encoder', dest='video_encoder', default='libx264', program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
choices=['libx264', 'libx265', 'libvpx-vp9']) program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--video-quality', help='Adjust output video quality', dest='video_quality', type=int, default=18, program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
choices=range(52), metavar='[0-51]') program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame', program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
dest='live_mirror', action='store_true', default=False)
program.add_argument('--live-resizable', help='The live camera frame is resizable',
dest='live_resizable', action='store_true', default=False)
program.add_argument('--max-memory', help='Maximum amount of RAM in GB', dest='max_memory', type=int,
default=suggest_max_memory())
program.add_argument('--execution-provider', help='Execution provider', dest='execution_provider', default=['cpu'],
choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='Number of execution threads', dest='execution_threads', type=int,
default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version',
version=f'{modules.metadata.name} {modules.metadata.version}')
# Register deprecated args # register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated') program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int) program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated') program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
@ -83,8 +56,7 @@ def parse_args() -> None:
modules.globals.source_path = args.source_path modules.globals.source_path = args.source_path
modules.globals.target_path = args.target_path modules.globals.target_path = args.target_path
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path)
args.output_path)
modules.globals.frame_processors = args.frame_processor modules.globals.frame_processors = args.frame_processor
modules.globals.headless = args.source_path or args.target_path or args.output_path modules.globals.headless = args.source_path or args.target_path or args.output_path
modules.globals.keep_fps = args.keep_fps modules.globals.keep_fps = args.keep_fps
@ -93,28 +65,23 @@ def parse_args() -> None:
modules.globals.many_faces = args.many_faces modules.globals.many_faces = args.many_faces
modules.globals.video_encoder = args.video_encoder modules.globals.video_encoder = args.video_encoder
modules.globals.video_quality = args.video_quality modules.globals.video_quality = args.video_quality
modules.globals.live_mirror = args.live_mirror
modules.globals.live_resizable = args.live_resizable
modules.globals.max_memory = args.max_memory modules.globals.max_memory = args.max_memory
modules.globals.execution_providers = decode_execution_providers(args.execution_provider) modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
modules.globals.execution_threads = args.execution_threads modules.globals.execution_threads = args.execution_threads
# Handle face enhancer tumbler #for ENHANCER tumbler:
modules.globals.fp_ui['face_enhancer'] = 'face_enhancer' in args.frame_processor if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
else:
modules.globals.fp_ui['face_enhancer'] = False
modules.globals.nsfw = False modules.globals.nsfw = False
# Handle deprecated arguments # translate deprecated args
handle_deprecated_args(args)
def handle_deprecated_args(args) -> None:
"""Handle deprecated arguments by translating them to the new format."""
if args.source_path_deprecated: if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m') print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
modules.globals.source_path = args.source_path_deprecated modules.globals.source_path = args.source_path_deprecated
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path)
args.output_path)
if args.cpu_cores_deprecated: if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m') print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.cpu_cores_deprecated modules.globals.execution_threads = args.cpu_cores_deprecated
@ -125,7 +92,7 @@ def handle_deprecated_args(args) -> None:
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m') print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['cuda']) modules.globals.execution_providers = decode_execution_providers(['cuda'])
if args.gpu_vendor_deprecated == 'amd': if args.gpu_vendor_deprecated == 'amd':
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider rocm instead.\033[0m') print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['rocm']) modules.globals.execution_providers = decode_execution_providers(['rocm'])
if args.gpu_threads_deprecated: if args.gpu_threads_deprecated:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m') print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
@ -133,22 +100,18 @@ def handle_deprecated_args(args) -> None:
def encode_execution_providers(execution_providers: List[str]) -> List[str]: def encode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider.replace('ExecutionProvider', '').lower() for provider in execution_providers] return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]
def decode_execution_providers(execution_providers: List[str]) -> List[str]: def decode_execution_providers(execution_providers: List[str]) -> List[str]:
available_providers = onnxruntime.get_available_providers() return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
encoded_providers = encode_execution_providers(available_providers) if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
selected_providers = [available_providers[encoded_providers.index(req)] for req in execution_providers
if req in encoded_providers]
# Default to CPU if no suitable providers are found
return selected_providers if selected_providers else ['CPUExecutionProvider']
def suggest_max_memory() -> int: def suggest_max_memory() -> int:
return 4 if platform.system().lower() == 'darwin' else 16 if platform.system().lower() == 'darwin':
return 4
return 16
def suggest_execution_providers() -> List[str]: def suggest_execution_providers() -> List[str]:
@ -156,41 +119,34 @@ def suggest_execution_providers() -> List[str]:
def suggest_execution_threads() -> int: def suggest_execution_threads() -> int:
if 'dml' in modules.globals.execution_providers: if 'DmlExecutionProvider' in modules.globals.execution_providers:
return 1 return 1
if 'rocm' in modules.globals.execution_providers: if 'ROCMExecutionProvider' in modules.globals.execution_providers:
return 1 return 1
return 8 return 8
def limit_resources() -> None: def limit_resources() -> None:
# Prevent TensorFlow memory leak # prevent tensorflow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU') gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus: for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True) tensorflow.config.experimental.set_memory_growth(gpu, True)
# limit memory usage
# Limit memory usage
if modules.globals.max_memory: if modules.globals.max_memory:
memory = modules.globals.max_memory * 1024 ** 3 memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin': if platform.system().lower() == 'darwin':
memory = modules.globals.max_memory * 1024 ** 3 memory = modules.globals.max_memory * 1024 ** 6
elif platform.system().lower() == 'windows': if platform.system().lower() == 'windows':
import ctypes import ctypes
kernel32 = ctypes.windll.kernel32 kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory)) kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else: else:
import resource import resource
try:
soft, hard = resource.getrlimit(resource.RLIMIT_DATA)
if memory > hard:
print(f"Warning: Requested memory limit {memory / (1024 ** 3)} GB exceeds system's hard limit. Setting to maximum allowed {hard / (1024 ** 3)} GB.")
memory = hard
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory)) resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
except ValueError as e:
print(f"Warning: Could not set memory limit: {e}. Continuing with default limits.")
def release_resources() -> None: def release_resources() -> None:
if 'cuda' in modules.globals.execution_providers: if 'CUDAExecutionProvider' in modules.globals.execution_providers:
torch.cuda.empty_cache() torch.cuda.empty_cache()
@ -201,85 +157,49 @@ def pre_check() -> bool:
if not shutil.which('ffmpeg'): if not shutil.which('ffmpeg'):
update_status('ffmpeg is not installed.') update_status('ffmpeg is not installed.')
return False return False
if 'cuda' in modules.globals.execution_providers and not torch.cuda.is_available():
update_status('CUDA is not available. Please check your GPU or CUDA installation.')
return False
return True return True
def update_status(message: str, scope: str = 'DLC.CORE') -> None: def update_status(message: str, scope: str = 'DLC.CORE') -> None:
print(f'[{scope}] {message}') print(f'[{scope}] {message}')
if not modules.globals.headless and ui.status_label: if not modules.globals.headless:
ui.update_status(message) ui.update_status(message)
def start() -> None: def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start(): if not frame_processor.pre_start():
return return
# process image to image
# Process image to image
if has_image_extension(modules.globals.target_path): if has_image_extension(modules.globals.target_path):
process_image_to_image() if modules.globals.nsfw == False:
return
# Process image to video
process_image_to_video()
def process_image_to_image() -> None:
if modules.globals.nsfw:
from modules.predicter import predict_image from modules.predicter import predict_image
if predict_image(modules.globals.target_path): if predict_image(modules.globals.target_path):
destroy(to_quit=False) destroy()
update_status('Processing to image ignored!')
return
try:
shutil.copy2(modules.globals.target_path, modules.globals.output_path) shutil.copy2(modules.globals.target_path, modules.globals.output_path)
except Exception as e:
print("Error copying file:", str(e))
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Processing...', frame_processor.NAME) update_status('Progressing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path) frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources() release_resources()
if is_image(modules.globals.target_path): if is_image(modules.globals.target_path):
update_status('Processing to image succeeded!') update_status('Processing to image succeed!')
else: else:
update_status('Processing to image failed!') update_status('Processing to image failed!')
return
# process image to videos
def process_image_to_video() -> None: if modules.globals.nsfw == False:
if modules.globals.nsfw:
from modules.predicter import predict_video from modules.predicter import predict_video
if predict_video(modules.globals.target_path): if predict_video(modules.globals.target_path):
destroy(to_quit=False) destroy()
update_status('Processing to video ignored!') update_status('Creating temp resources...')
return
update_status('Creating temporary resources...')
create_temp(modules.globals.target_path) create_temp(modules.globals.target_path)
update_status('Extracting frames...') update_status('Extracting frames...')
extract_frames(modules.globals.target_path) extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Processing...', frame_processor.NAME) update_status('Progressing...', frame_processor.NAME)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths) frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
release_resources() release_resources()
# handles fps
handle_video_fps()
handle_video_audio()
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeeded!')
else:
update_status('Processing to video failed!')
def handle_video_fps() -> None:
if modules.globals.keep_fps: if modules.globals.keep_fps:
update_status('Detecting fps...') update_status('Detecting fps...')
fps = detect_fps(modules.globals.target_path) fps = detect_fps(modules.globals.target_path)
@ -288,9 +208,7 @@ def handle_video_fps() -> None:
else: else:
update_status('Creating video with 30.0 fps...') update_status('Creating video with 30.0 fps...')
create_video(modules.globals.target_path) create_video(modules.globals.target_path)
# handle audio
def handle_video_audio() -> None:
if modules.globals.keep_audio: if modules.globals.keep_audio:
if modules.globals.keep_fps: if modules.globals.keep_fps:
update_status('Restoring audio...') update_status('Restoring audio...')
@ -299,16 +217,21 @@ def handle_video_audio() -> None:
restore_audio(modules.globals.target_path, modules.globals.output_path) restore_audio(modules.globals.target_path, modules.globals.output_path)
else: else:
move_temp(modules.globals.target_path, modules.globals.output_path) move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!')
else:
update_status('Processing to video failed!')
def destroy(to_quit=True) -> None: def destroy() -> None:
if modules.globals.target_path: if modules.globals.target_path:
clean_temp(modules.globals.target_path) clean_temp(modules.globals.target_path)
if to_quit: quit() quit()
def run() -> None: def run() -> None:
try:
parse_args() parse_args()
if not pre_check(): if not pre_check():
return return
@ -321,7 +244,3 @@ def run() -> None:
else: else:
window = ui.init(start, destroy) window = ui.init(start, destroy)
window.mainloop() window.mainloop()
except Exception as e:
print(f"UI initialization failed: {str(e)}")
update_status(f"UI initialization failed: {str(e)}")
destroy() # Ensure any resources are cleaned up on failure

View File

@ -19,8 +19,6 @@ keep_frames = None
many_faces = None many_faces = None
video_encoder = None video_encoder = None
video_quality = None video_quality = None
live_mirror = None
live_resizable = None
max_memory = None max_memory = None
execution_providers: List[str] = [] execution_providers: List[str] = []
execution_threads = None execution_threads = None

View File

@ -1,17 +1,10 @@
import os import os
import platform
import webbrowser import webbrowser
import customtkinter as ctk import customtkinter as ctk
from typing import Callable, Tuple from typing import Callable, Tuple
import cv2 import cv2
from PIL import Image, ImageOps from PIL import Image, ImageOps
# Import OS-specific modules only when necessary
if platform.system() == 'Darwin': # macOS
import objc
from Foundation import NSObject
import AVFoundation
import modules.globals import modules.globals
import modules.metadata import modules.metadata
from modules.face_analyser import get_one_face from modules.face_analyser import get_one_face
@ -26,8 +19,6 @@ ROOT_WIDTH = 600
PREVIEW = None PREVIEW = None
PREVIEW_MAX_HEIGHT = 700 PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200 PREVIEW_MAX_WIDTH = 1200
PREVIEW_DEFAULT_WIDTH = 960
PREVIEW_DEFAULT_HEIGHT = 540
RECENT_DIRECTORY_SOURCE = None RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None RECENT_DIRECTORY_TARGET = None
@ -41,49 +32,10 @@ status_label = None
img_ft, vid_ft = modules.globals.file_types img_ft, vid_ft = modules.globals.file_types
camera = None
def check_camera_permissions():
"""Check and request camera access permission on macOS."""
if platform.system() == 'Darwin': # macOS-specific
auth_status = AVFoundation.AVCaptureDevice.authorizationStatusForMediaType_(AVFoundation.AVMediaTypeVideo)
if auth_status == AVFoundation.AVAuthorizationStatusNotDetermined:
# Request access to the camera
def completion_handler(granted):
if granted:
print("Access granted to the camera.")
else:
print("Access denied to the camera.")
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(AVFoundation.AVMediaTypeVideo, completion_handler)
elif auth_status == AVFoundation.AVAuthorizationStatusAuthorized:
print("Camera access already authorized.")
elif auth_status == AVFoundation.AVAuthorizationStatusDenied:
print("Camera access denied. Please enable it in System Preferences.")
elif auth_status == AVFoundation.AVAuthorizationStatusRestricted:
print("Camera access restricted. The app is not allowed to use the camera.")
def select_camera(camera_name: str):
"""Select the appropriate camera based on its name (cross-platform)."""
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.localizedName() == camera_name:
return device
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# On Windows/Linux, simply return the camera name as OpenCV can handle it by index
return camera_name
return None
def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk: def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
global ROOT, PREVIEW global ROOT, PREVIEW
if platform.system() == 'Darwin': # macOS-specific
check_camera_permissions() # Check camera permissions before initializing the UI
ROOT = create_root(start, destroy) ROOT = create_root(start, destroy)
PREVIEW = create_preview(ROOT) PREVIEW = create_preview(ROOT)
@ -97,11 +49,10 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
ctk.set_appearance_mode('system') ctk.set_appearance_mode('system')
ctk.set_default_color_theme(resolve_relative_path('ui.json')) ctk.set_default_color_theme(resolve_relative_path('ui.json'))
print("Creating root window...")
root = ctk.CTk() root = ctk.CTk()
root.minsize(ROOT_WIDTH, ROOT_HEIGHT) root.minsize(ROOT_WIDTH, ROOT_HEIGHT)
root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}') root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}')
root.configure()
root.protocol('WM_DELETE_WINDOW', lambda: destroy()) root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = ctk.CTkLabel(root, text=None) source_label = ctk.CTkLabel(root, text=None)
@ -110,11 +61,11 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
target_label = ctk.CTkLabel(root, text=None) target_label = ctk.CTkLabel(root, text=None)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25) target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=select_source_path) select_face_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path())
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1) select_face_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=select_target_path) select_target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path())
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1) select_target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps) keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps)) keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps))
@ -124,8 +75,9 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get())) keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch.place(relx=0.1, rely=0.65) keep_frames_switch.place(relx=0.1, rely=0.65)
# for FRAME PROCESSOR ENHANCER tumbler:
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer']) enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer', enhancer_value.get())) enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.7) enhancer_switch.place(relx=0.1, rely=0.7)
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio) keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
@ -136,56 +88,47 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get())) many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get()))
many_faces_switch.place(relx=0.6, rely=0.65) many_faces_switch.place(relx=0.6, rely=0.65)
nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw) # nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw)
nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get())) # nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
nsfw_switch.place(relx=0.6, rely=0.7) # nsfw_switch.place(relx=0.6, rely=0.7)
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start)) start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.8, relwidth=0.2, relheight=0.05) start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy) stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy())
stop_button.place(relx=0.4, rely=0.8, relwidth=0.2, relheight=0.05) stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview) preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.8, relwidth=0.2, relheight=0.05) preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
camera_label = ctk.CTkLabel(root, text="Select Camera:") live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview())
camera_label.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05) live_button.place(relx=0.40, rely=0.86, relwidth=0.2, relheight=0.05)
available_cameras = get_available_cameras()
available_camera_strings = [str(cam) for cam in available_cameras]
camera_variable = ctk.StringVar(value=available_camera_strings[0] if available_camera_strings else "No cameras found")
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, values=available_camera_strings)
camera_optionmenu.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(camera_variable.get()))
live_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05)
status_label = ctk.CTkLabel(root, text=None, justify='center') status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, relwidth=0.8, rely=0.9) status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2') donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2')
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8) donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color')) donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color'))
donate_label.bind('<Button-1>', lambda event: webbrowser.open('https://paypal.me/hacksider')) donate_label.bind('<Button>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
return root return root
def create_preview(parent: ctk.CTk) -> ctk.CTkToplevel: def create_preview(parent: ctk.CTkToplevel) -> ctk.CTkToplevel:
global preview_label, preview_slider global preview_label, preview_slider
preview = ctk.CTkToplevel(parent) preview = ctk.CTkToplevel(parent)
preview.withdraw() preview.withdraw()
preview.title('Preview') preview.title('Preview')
preview.protocol('WM_DELETE_WINDOW', toggle_preview) preview.configure()
preview.resizable(width=True, height=True) preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
preview.resizable(width=False, height=False)
preview_label = ctk.CTkLabel(preview, text=None) preview_label = ctk.CTkLabel(preview, text=None)
preview_label.pack(fill='both', expand=True) preview_label.pack(fill='both', expand=True)
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=update_preview) preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=lambda frame_value: update_preview(frame_value))
return preview return preview
@ -200,10 +143,10 @@ def update_tumbler(var: str, value: bool) -> None:
def select_source_path() -> None: def select_source_path() -> None:
global RECENT_DIRECTORY_SOURCE global RECENT_DIRECTORY_SOURCE, img_ft, vid_ft
PREVIEW.withdraw() PREVIEW.withdraw()
source_path = ctk.filedialog.askopenfilename(title='Select a source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft]) source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
if is_image(source_path): if is_image(source_path):
modules.globals.source_path = source_path modules.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path) RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
@ -215,10 +158,10 @@ def select_source_path() -> None:
def select_target_path() -> None: def select_target_path() -> None:
global RECENT_DIRECTORY_TARGET global RECENT_DIRECTORY_TARGET, img_ft, vid_ft
PREVIEW.withdraw() PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(title='Select a target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft]) target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
if is_image(target_path): if is_image(target_path):
modules.globals.target_path = target_path modules.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path) RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
@ -235,12 +178,12 @@ def select_target_path() -> None:
def select_output_path(start: Callable[[], None]) -> None: def select_output_path(start: Callable[[], None]) -> None:
global RECENT_DIRECTORY_OUTPUT global RECENT_DIRECTORY_OUTPUT, img_ft, vid_ft
if is_image(modules.globals.target_path): if is_image(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='Save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT) output_path = ctk.filedialog.asksaveasfilename(title='save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
elif is_video(modules.globals.target_path): elif is_video(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='Save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT) output_path = ctk.filedialog.asksaveasfilename(title='save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
else: else:
output_path = None output_path = None
if output_path: if output_path:
@ -261,13 +204,13 @@ def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: i
if frame_number: if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read() has_frame, frame = capture.read()
capture.release()
if has_frame: if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size: if size:
image = ImageOps.fit(image, size, Image.LANCZOS) image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size) return ctk.CTkImage(image, size=image.size)
return None capture.release()
cv2.destroyAllWindows()
def toggle_preview() -> None: def toggle_preview() -> None:
@ -277,17 +220,12 @@ def toggle_preview() -> None:
init_preview() init_preview()
update_preview() update_preview()
PREVIEW.deiconify() PREVIEW.deiconify()
global camera
if PREVIEW.state() == 'withdrawn':
if camera and camera.isOpened():
camera.release()
camera = None
def init_preview() -> None: def init_preview() -> None:
if is_image(modules.globals.target_path): if is_image(modules.globals.target_path):
preview_slider.pack_forget() preview_slider.pack_forget()
elif is_video(modules.globals.target_path): if is_video(modules.globals.target_path):
video_frame_total = get_video_frame_total(modules.globals.target_path) video_frame_total = get_video_frame_total(modules.globals.target_path)
preview_slider.configure(to=video_frame_total) preview_slider.configure(to=video_frame_total)
preview_slider.pack(fill='x') preview_slider.pack(fill='x')
@ -297,7 +235,7 @@ def init_preview() -> None:
def update_preview(frame_number: int = 0) -> None: def update_preview(frame_number: int = 0) -> None:
if modules.globals.source_path and modules.globals.target_path: if modules.globals.source_path and modules.globals.target_path:
temp_frame = get_video_frame(modules.globals.target_path, frame_number) temp_frame = get_video_frame(modules.globals.target_path, frame_number)
if not modules.globals.nsfw: if modules.globals.nsfw == False:
from modules.predicter import predict_frame from modules.predicter import predict_frame
if predict_frame(temp_frame): if predict_frame(temp_frame):
quit() quit()
@ -311,116 +249,51 @@ def update_preview(frame_number: int = 0) -> None:
image = ctk.CTkImage(image, size=image.size) image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image) preview_label.configure(image=image)
def webcam_preview():
def fit_image_to_size(image, width: int, height: int):
if width is None and height is None:
return image
h, w, _ = image.shape
ratio_h = 0.0
ratio_w = 0.0
if width > height:
ratio_h = height / h
else:
ratio_w = width / w
ratio = max(ratio_w, ratio_h)
new_size = (int(ratio * w), int(ratio * h))
return cv2.resize(image, dsize=new_size)
def webcam_preview(camera_name: str):
if modules.globals.source_path is None: if modules.globals.source_path is None:
# No image selected
return return
global preview_label, PREVIEW global preview_label, PREVIEW
# Select the camera by its name cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
selected_camera = select_camera(camera_name) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution
if selected_camera is None: cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution
update_status(f"No suitable camera found.") cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
return PREVIEW_MAX_WIDTH = 960
PREVIEW_MAX_HEIGHT = 540
# Use OpenCV's camera index for cross-platform compatibility preview_label.configure(image=None) # Reset the preview image before startup
camera_index = get_camera_index_by_name(camera_name)
global camera PREVIEW.deiconify() # Open preview window
camera = cv2.VideoCapture(camera_index)
if not camera.isOpened():
update_status(f"Error: Could not open camera {camera_name}")
return
camera.set(cv2.CAP_PROP_FRAME_WIDTH, 960)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 540)
camera.set(cv2.CAP_PROP_FPS, 60)
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
frame_processors = get_frame_processors_modules(modules.globals.frame_processors) frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = get_one_face(cv2.imread(modules.globals.source_path))
while camera: source_image = None # Initialize variable for the selected face image
ret, frame = camera.read()
while True:
ret, frame = cap.read()
if not ret: if not ret:
update_status(f"Error: Frame not received from camera.")
break break
temp_frame = frame.copy() # Select and save face image only once
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
if modules.globals.live_mirror: temp_frame = frame.copy() #Create a copy of the frame
temp_frame = cv2.flip(temp_frame, 1) # horizontal flipping
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height())
for frame_processor in frame_processors: for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame) temp_frame = frame_processor.process_frame(source_image, temp_frame)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)) image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) # Convert the image to RGB format to display it with Tkinter
image = ImageOps.contain(image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS) image = Image.fromarray(image)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size) image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image) preview_label.configure(image=image)
ROOT.update() ROOT.update()
if camera: camera.release() if PREVIEW.state() == 'withdrawn':
PREVIEW.withdraw()
def get_camera_index_by_name(camera_name: str) -> int:
"""Map camera name to index for OpenCV."""
if platform.system() == 'Darwin': # macOS-specific
if "FaceTime" in camera_name:
return 0 # Assuming FaceTime is at index 0
elif "iPhone" in camera_name:
return 1 # Assuming iPhone camera is at index 1
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# Map camera name to index dynamically (OpenCV on these platforms usually starts with 0)
return get_available_cameras().index(camera_name)
return -1
def get_available_cameras():
"""Get available camera names (cross-platform)."""
available_cameras = []
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.deviceType() == AVFoundation.AVCaptureDeviceTypeBuiltInWideAngleCamera:
print(f"Found Built-In Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeExternal":
print(f"Found External Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeContinuityCamera":
print(f"Skipping Continuity Camera: {device.localizedName()}")
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# Use OpenCV to detect camera indexes
index = 0
while True:
cap = cv2.VideoCapture(index)
if not cap.isOpened():
break break
available_cameras.append(f"Camera {index}")
cap.release() cap.release()
index += 1 PREVIEW.withdraw() # Close preview window when loop is finished
return available_cameras