Revert "Merge remote-tracking branch 'parent/experimental' into experimental"

This reverts commit df99f6ca17.
pull/368/head
Kenneth Estanislao 2024-08-16 21:03:14 +08:00
parent d10314c8d6
commit 69d863b44a
4 changed files with 166 additions and 419 deletions

View File

@ -158,8 +158,6 @@ options:
--many-faces process every face
--video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder
--video-quality [0-51] adjust output video quality
--live-mirror the live camera display as you see it in the front-facing camera frame
--live-resizable the live camera frame is resizable
--max-memory MAX_MEMORY maximum amount of RAM in GB
--execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...)
--execution-threads EXECUTION_THREADS number of execution threads

View File

@ -1,17 +1,16 @@
import os
import sys
# single thread doubles cuda performance - needs to be set before torch import
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
from typing import List
import platform
import signal
import shutil
import argparse
from typing import List
# Set environment variables for CUDA performance and TensorFlow logging
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import torch
import onnxruntime
import tensorflow
@ -20,60 +19,34 @@ import modules.globals
import modules.metadata
import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import (
has_image_extension,
is_image,
is_video,
detect_fps,
create_video,
extract_frames,
get_temp_frame_paths,
restore_audio,
create_temp,
move_temp,
clean_temp,
normalize_output_path
)
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
# Filter warnings
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
# Cross-platform resource management
if platform.system() == 'Darwin' and 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='Select a source image', dest='source_path')
program.add_argument('-t', '--target', help='Select a target image or video', dest='target_path')
program.add_argument('-o', '--output', help='Select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor',
default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-fps', help='Keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='Keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='Keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--many-faces', help='Process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-encoder', help='Adjust output video encoder', dest='video_encoder', default='libx264',
choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='Adjust output video quality', dest='video_quality', type=int, default=18,
choices=range(52), metavar='[0-51]')
program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame',
dest='live_mirror', action='store_true', default=False)
program.add_argument('--live-resizable', help='The live camera frame is resizable',
dest='live_resizable', action='store_true', default=False)
program.add_argument('--max-memory', help='Maximum amount of RAM in GB', dest='max_memory', type=int,
default=suggest_max_memory())
program.add_argument('--execution-provider', help='Execution provider', dest='execution_provider', default=['cpu'],
choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='Number of execution threads', dest='execution_threads', type=int,
default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version',
version=f'{modules.metadata.name} {modules.metadata.version}')
program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
# Register deprecated args
# register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
@ -83,8 +56,7 @@ def parse_args() -> None:
modules.globals.source_path = args.source_path
modules.globals.target_path = args.target_path
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path,
args.output_path)
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path)
modules.globals.frame_processors = args.frame_processor
modules.globals.headless = args.source_path or args.target_path or args.output_path
modules.globals.keep_fps = args.keep_fps
@ -93,28 +65,23 @@ def parse_args() -> None:
modules.globals.many_faces = args.many_faces
modules.globals.video_encoder = args.video_encoder
modules.globals.video_quality = args.video_quality
modules.globals.live_mirror = args.live_mirror
modules.globals.live_resizable = args.live_resizable
modules.globals.max_memory = args.max_memory
modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
modules.globals.execution_threads = args.execution_threads
# Handle face enhancer tumbler
modules.globals.fp_ui['face_enhancer'] = 'face_enhancer' in args.frame_processor
#for ENHANCER tumbler:
if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
else:
modules.globals.fp_ui['face_enhancer'] = False
modules.globals.nsfw = False
# Handle deprecated arguments
handle_deprecated_args(args)
def handle_deprecated_args(args) -> None:
"""Handle deprecated arguments by translating them to the new format."""
# translate deprecated args
if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
modules.globals.source_path = args.source_path_deprecated
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path,
args.output_path)
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path)
if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.cpu_cores_deprecated
@ -125,7 +92,7 @@ def handle_deprecated_args(args) -> None:
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['cuda'])
if args.gpu_vendor_deprecated == 'amd':
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider rocm instead.\033[0m')
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['rocm'])
if args.gpu_threads_deprecated:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
@ -133,22 +100,18 @@ def handle_deprecated_args(args) -> None:
def encode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider.replace('ExecutionProvider', '').lower() for provider in execution_providers]
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
available_providers = onnxruntime.get_available_providers()
encoded_providers = encode_execution_providers(available_providers)
selected_providers = [available_providers[encoded_providers.index(req)] for req in execution_providers
if req in encoded_providers]
# Default to CPU if no suitable providers are found
return selected_providers if selected_providers else ['CPUExecutionProvider']
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
def suggest_max_memory() -> int:
return 4 if platform.system().lower() == 'darwin' else 16
if platform.system().lower() == 'darwin':
return 4
return 16
def suggest_execution_providers() -> List[str]:
@ -156,41 +119,34 @@ def suggest_execution_providers() -> List[str]:
def suggest_execution_threads() -> int:
if 'dml' in modules.globals.execution_providers:
if 'DmlExecutionProvider' in modules.globals.execution_providers:
return 1
if 'rocm' in modules.globals.execution_providers:
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
return 1
return 8
def limit_resources() -> None:
# Prevent TensorFlow memory leak
# prevent tensorflow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True)
# Limit memory usage
# limit memory usage
if modules.globals.max_memory:
memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin':
memory = modules.globals.max_memory * 1024 ** 3
elif platform.system().lower() == 'windows':
memory = modules.globals.max_memory * 1024 ** 6
if platform.system().lower() == 'windows':
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else:
import resource
try:
soft, hard = resource.getrlimit(resource.RLIMIT_DATA)
if memory > hard:
print(f"Warning: Requested memory limit {memory / (1024 ** 3)} GB exceeds system's hard limit. Setting to maximum allowed {hard / (1024 ** 3)} GB.")
memory = hard
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
except ValueError as e:
print(f"Warning: Could not set memory limit: {e}. Continuing with default limits.")
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
def release_resources() -> None:
if 'cuda' in modules.globals.execution_providers:
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
torch.cuda.empty_cache()
@ -201,85 +157,49 @@ def pre_check() -> bool:
if not shutil.which('ffmpeg'):
update_status('ffmpeg is not installed.')
return False
if 'cuda' in modules.globals.execution_providers and not torch.cuda.is_available():
update_status('CUDA is not available. Please check your GPU or CUDA installation.')
return False
return True
def update_status(message: str, scope: str = 'DLC.CORE') -> None:
print(f'[{scope}] {message}')
if not modules.globals.headless and ui.status_label:
if not modules.globals.headless:
ui.update_status(message)
def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start():
return
# Process image to image
# process image to image
if has_image_extension(modules.globals.target_path):
process_image_to_image()
return
# Process image to video
process_image_to_video()
def process_image_to_image() -> None:
if modules.globals.nsfw:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy(to_quit=False)
update_status('Processing to image ignored!')
return
try:
if modules.globals.nsfw == False:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy()
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
except Exception as e:
print("Error copying file:", str(e))
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Processing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeeded!')
else:
update_status('Processing to image failed!')
def process_image_to_video() -> None:
if modules.globals.nsfw:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeed!')
else:
update_status('Processing to image failed!')
return
# process image to videos
if modules.globals.nsfw == False:
from modules.predicter import predict_video
if predict_video(modules.globals.target_path):
destroy(to_quit=False)
update_status('Processing to video ignored!')
return
update_status('Creating temporary resources...')
destroy()
update_status('Creating temp resources...')
create_temp(modules.globals.target_path)
update_status('Extracting frames...')
extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Processing...', frame_processor.NAME)
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
release_resources()
handle_video_fps()
handle_video_audio()
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeeded!')
else:
update_status('Processing to video failed!')
def handle_video_fps() -> None:
# handles fps
if modules.globals.keep_fps:
update_status('Detecting fps...')
fps = detect_fps(modules.globals.target_path)
@ -288,9 +208,7 @@ def handle_video_fps() -> None:
else:
update_status('Creating video with 30.0 fps...')
create_video(modules.globals.target_path)
def handle_video_audio() -> None:
# handle audio
if modules.globals.keep_audio:
if modules.globals.keep_fps:
update_status('Restoring audio...')
@ -299,29 +217,30 @@ def handle_video_audio() -> None:
restore_audio(modules.globals.target_path, modules.globals.output_path)
else:
move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!')
else:
update_status('Processing to video failed!')
def destroy(to_quit=True) -> None:
def destroy() -> None:
if modules.globals.target_path:
clean_temp(modules.globals.target_path)
if to_quit: quit()
quit()
def run() -> None:
try:
parse_args()
if not pre_check():
parse_args()
if not pre_check():
return
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_check():
return
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_check():
return
limit_resources()
if modules.globals.headless:
start()
else:
window = ui.init(start, destroy)
window.mainloop()
except Exception as e:
print(f"UI initialization failed: {str(e)}")
update_status(f"UI initialization failed: {str(e)}")
destroy() # Ensure any resources are cleaned up on failure
limit_resources()
if modules.globals.headless:
start()
else:
window = ui.init(start, destroy)
window.mainloop()

View File

@ -19,8 +19,6 @@ keep_frames = None
many_faces = None
video_encoder = None
video_quality = None
live_mirror = None
live_resizable = None
max_memory = None
execution_providers: List[str] = []
execution_threads = None

View File

@ -1,19 +1,9 @@
import os
import platform
import webbrowser
import customtkinter as ctk
from typing import Callable, Tuple, List, Any
from types import ModuleType
from typing import Callable, Tuple
import cv2
from PIL import Image, ImageOps
from pygrabber.dshow_graph import FilterGraph
import pyvirtualcam
# Import OS-specific modules only when necessary
if platform.system() == 'Darwin': # macOS
import objc
from Foundation import NSObject
import AVFoundation
import modules.globals
import modules.metadata
@ -23,14 +13,12 @@ from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import is_image, is_video, resolve_relative_path
ROOT = None
ROOT_HEIGHT = 800
ROOT_HEIGHT = 700
ROOT_WIDTH = 600
PREVIEW = None
PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200
PREVIEW_DEFAULT_WIDTH = 960
PREVIEW_DEFAULT_HEIGHT = 540
PREVIEW_MAX_WIDTH = 1200
RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None
@ -44,49 +32,10 @@ status_label = None
img_ft, vid_ft = modules.globals.file_types
camera = None
def check_camera_permissions():
"""Check and request camera access permission on macOS."""
if platform.system() == 'Darwin': # macOS-specific
auth_status = AVFoundation.AVCaptureDevice.authorizationStatusForMediaType_(AVFoundation.AVMediaTypeVideo)
if auth_status == AVFoundation.AVAuthorizationStatusNotDetermined:
# Request access to the camera
def completion_handler(granted):
if granted:
print("Access granted to the camera.")
else:
print("Access denied to the camera.")
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(AVFoundation.AVMediaTypeVideo, completion_handler)
elif auth_status == AVFoundation.AVAuthorizationStatusAuthorized:
print("Camera access already authorized.")
elif auth_status == AVFoundation.AVAuthorizationStatusDenied:
print("Camera access denied. Please enable it in System Preferences.")
elif auth_status == AVFoundation.AVAuthorizationStatusRestricted:
print("Camera access restricted. The app is not allowed to use the camera.")
def select_camera(camera_name: str):
"""Select the appropriate camera based on its name (cross-platform)."""
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.localizedName() == camera_name:
return device
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# On Windows/Linux, simply return the camera name as OpenCV can handle it by index
return camera_name
return None
def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
global ROOT, PREVIEW
if platform.system() == 'Darwin': # macOS-specific
check_camera_permissions() # Check camera permissions before initializing the UI
ROOT = create_root(start, destroy)
PREVIEW = create_preview(ROOT)
@ -100,99 +49,86 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
ctk.set_appearance_mode('system')
ctk.set_default_color_theme(resolve_relative_path('ui.json'))
print("Creating root window...")
root = ctk.CTk()
root.minsize(ROOT_WIDTH, ROOT_HEIGHT)
root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}')
root.configure()
root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = ctk.CTkLabel(root, text=None)
source_label.place(relx=0.1, rely=0.0875, relwidth=0.3, relheight=0.25)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25)
target_label = ctk.CTkLabel(root, text=None)
target_label.place(relx=0.6, rely=0.0875, relwidth=0.3, relheight=0.25)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=select_source_path)
source_button.place(relx=0.1, rely=0.35, relwidth=0.3, relheight=0.1)
select_face_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path())
select_face_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=select_target_path)
target_button.place(relx=0.6, rely=0.35, relwidth=0.3, relheight=0.1)
select_target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path())
select_target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps))
keep_fps_checkbox.place(relx=0.1, rely=0.525)
keep_fps_checkbox.place(relx=0.1, rely=0.6)
keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames)
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch.place(relx=0.1, rely=0.56875)
keep_frames_switch.place(relx=0.1, rely=0.65)
# for FRAME PROCESSOR ENHANCER tumbler:
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer', enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.6125)
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.7)
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_switch.place(relx=0.6, rely=0.525)
keep_audio_switch.place(relx=0.6, rely=0.6)
many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces)
many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get()))
many_faces_switch.place(relx=0.6, rely=0.56875)
many_faces_switch.place(relx=0.6, rely=0.65)
nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw)
nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
nsfw_switch.place(relx=0.6, rely=0.6125)
# nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw)
# nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
# nsfw_switch.place(relx=0.6, rely=0.7)
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.7, relwidth=0.2, relheight=0.05)
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy)
stop_button.place(relx=0.4, rely=0.7, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy())
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview)
preview_button.place(relx=0.65, rely=0.7, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
camera_label = ctk.CTkLabel(root, text="Select Camera:")
camera_label.place(relx=0.4, rely=0.7525, relwidth=0.2, relheight=0.05)
available_cameras = get_available_cameras()
available_camera_strings = [str(cam) for cam in available_cameras]
camera_variable = ctk.StringVar(value=available_camera_strings[0] if available_camera_strings else "No cameras found")
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, values=available_camera_strings)
camera_optionmenu.place(relx=0.65, rely=0.7525, relwidth=0.2, relheight=0.05)
virtual_cam_out_value = ctk.BooleanVar(value=False)
virtual_cam_out_switch = ctk.CTkSwitch(root, text='Virtual Cam Output (OBS)', variable=virtual_cam_out_value, cursor='hand2')
virtual_cam_out_switch.place(relx=0.4, rely=0.805)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(camera_variable.get(), virtual_cam_out_value.get()))
live_button.place(relx=0.15, rely=0.7525, relwidth=0.2, relheight=0.05)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview())
live_button.place(relx=0.40, rely=0.86, relwidth=0.2, relheight=0.05)
status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, relwidth=0.8, rely=0.875)
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2')
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color'))
donate_label.bind('<Button-1>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
donate_label.bind('<Button>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
return root
def create_preview(parent: ctk.CTk) -> ctk.CTkToplevel:
def create_preview(parent: ctk.CTkToplevel) -> ctk.CTkToplevel:
global preview_label, preview_slider
preview = ctk.CTkToplevel(parent)
preview.withdraw()
preview.title('Preview')
preview.protocol('WM_DELETE_WINDOW', toggle_preview)
preview.resizable(width=True, height=True)
preview.configure()
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
preview.resizable(width=False, height=False)
preview_label = ctk.CTkLabel(preview, text=None)
preview_label.pack(fill='both', expand=True)
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=update_preview)
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=lambda frame_value: update_preview(frame_value))
return preview
@ -207,10 +143,10 @@ def update_tumbler(var: str, value: bool) -> None:
def select_source_path() -> None:
global RECENT_DIRECTORY_SOURCE
global RECENT_DIRECTORY_SOURCE, img_ft, vid_ft
PREVIEW.withdraw()
source_path = ctk.filedialog.askopenfilename(title='Select a source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
if is_image(source_path):
modules.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
@ -222,10 +158,10 @@ def select_source_path() -> None:
def select_target_path() -> None:
global RECENT_DIRECTORY_TARGET
global RECENT_DIRECTORY_TARGET, img_ft, vid_ft
PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(title='Select a target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
if is_image(target_path):
modules.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
@ -242,12 +178,12 @@ def select_target_path() -> None:
def select_output_path(start: Callable[[], None]) -> None:
global RECENT_DIRECTORY_OUTPUT
global RECENT_DIRECTORY_OUTPUT, img_ft, vid_ft
if is_image(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='Save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
output_path = ctk.filedialog.asksaveasfilename(title='save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
elif is_video(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='Save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
output_path = ctk.filedialog.asksaveasfilename(title='save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
else:
output_path = None
if output_path:
@ -268,13 +204,13 @@ def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: i
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
capture.release()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
return None
capture.release()
cv2.destroyAllWindows()
def toggle_preview() -> None:
@ -284,17 +220,12 @@ def toggle_preview() -> None:
init_preview()
update_preview()
PREVIEW.deiconify()
global camera
if PREVIEW.state() == 'withdrawn':
if camera and camera.isOpened():
camera.release()
camera = None
def init_preview() -> None:
if is_image(modules.globals.target_path):
preview_slider.pack_forget()
elif is_video(modules.globals.target_path):
if is_video(modules.globals.target_path):
video_frame_total = get_video_frame_total(modules.globals.target_path)
preview_slider.configure(to=video_frame_total)
preview_slider.pack(fill='x')
@ -304,7 +235,7 @@ def init_preview() -> None:
def update_preview(frame_number: int = 0) -> None:
if modules.globals.source_path and modules.globals.target_path:
temp_frame = get_video_frame(modules.globals.target_path, frame_number)
if not modules.globals.nsfw:
if modules.globals.nsfw == False:
from modules.predicter import predict_frame
if predict_frame(temp_frame):
quit()
@ -318,150 +249,51 @@ def update_preview(frame_number: int = 0) -> None:
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
def webcam_preview_loop(cap: cv2.VideoCapture, source_image: Any, frame_processors: List[ModuleType], virtual_cam: pyvirtualcam.Camera = None) -> bool:
global preview_label, PREVIEW
ret, frame = cap.read()
if not ret:
update_status(f"Error: Frame not received from camera.")
return False
temp_frame = frame.copy()
if modules.globals.live_mirror:
temp_frame = cv2.flip(temp_frame, 1) # horizontal flipping
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height())
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = ImageOps.contain(image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
if virtual_cam:
virtual_cam.send(temp_frame)
virtual_cam.sleep_until_next_frame()
ROOT.update()
if PREVIEW.state() == 'withdrawn':
return False
return True
def fit_image_to_size(image, width: int, height: int):
if width is None and height is None:
return image
h, w, _ = image.shape
ratio_h = 0.0
ratio_w = 0.0
if width > height:
ratio_h = height / h
else:
ratio_w = width / w
ratio = max(ratio_w, ratio_h)
new_size = (int(ratio * w), int(ratio * h))
return cv2.resize(image, dsize=new_size)
def webcam_preview(camera_name: str, virtual_cam_output: bool):
def webcam_preview():
if modules.globals.source_path is None:
# No image selected
return
global preview_label, PREVIEW
WIDTH = 960
HEIGHT = 540
FPS = 60
cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
PREVIEW_MAX_WIDTH = 960
PREVIEW_MAX_HEIGHT = 540
# Select the camera by its name
selected_camera = select_camera(camera_name)
if selected_camera is None:
update_status(f"No suitable camera found.")
return
preview_label.configure(image=None) # Reset the preview image before startup
# Use OpenCV's camera index for cross-platform compatibility
camera_index = get_camera_index_by_name(camera_name)
global camera
camera = cv2.VideoCapture(camera_index)
if not camera.isOpened():
update_status(f"Error: Could not open camera {camera_name}")
return
camera.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
camera.set(cv2.CAP_PROP_FPS, FPS)
PREVIEW_MAX_WIDTH = WIDTH
PREVIEW_MAX_HEIGHT = HEIGHT
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
PREVIEW.deiconify() # Open preview window
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = get_one_face(cv2.imread(modules.globals.source_path))
preview_running = True
source_image = None # Initialize variable for the selected face image
if virtual_cam_output:
with pyvirtualcam.Camera(width=WIDTH, height=HEIGHT, fps=FPS, fmt=pyvirtualcam.PixelFormat.BGR) as virtual_cam:
while preview_running:
preview_running = webcam_preview_loop(cap, source_image, frame_processors, virtual_cam)
while True:
ret, frame = cap.read()
if not ret:
break
while preview_running:
preview_running = webcam_preview_loop(cap, source_image, frame_processors)
# Select and save face image only once
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
temp_frame = frame.copy() #Create a copy of the frame
if camera: camera.release()
PREVIEW.withdraw()
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) # Convert the image to RGB format to display it with Tkinter
image = Image.fromarray(image)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
ROOT.update()
def get_camera_index_by_name(camera_name: str) -> int:
"""Map camera name to index for OpenCV."""
if platform.system() == 'Darwin': # macOS-specific
if "FaceTime" in camera_name:
return 0 # Assuming FaceTime is at index 0
elif "iPhone" in camera_name:
return 1 # Assuming iPhone camera is at index 1
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# Map camera name to index dynamically (OpenCV on these platforms usually starts with 0)
return get_available_cameras().index(camera_name)
return -1
if PREVIEW.state() == 'withdrawn':
break
def get_available_cameras():
"""Get available camera names (cross-platform)."""
available_cameras = []
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.deviceType() == AVFoundation.AVCaptureDeviceTypeBuiltInWideAngleCamera:
print(f"Found Built-In Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeExternal":
print(f"Found External Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeContinuityCamera":
print(f"Skipping Continuity Camera: {device.localizedName()}")
elif platform.system() == 'Windows' or platform.system() == 'Linux':
try:
devices = FilterGraph().get_input_devices()
except Exception as e:
# Use OpenCV to detect camera indexes
index = 0
devices = []
while True:
cap = cv2.VideoCapture(index)
if not cap.isOpened():
break
devices.append(f"Camera {index}")
cap.release()
index += 1
available_cameras = devices
return available_cameras
cap.release()
PREVIEW.withdraw() # Close preview window when loop is finished