Deep-Live-Cam/modules/core.py

977 lines
55 KiB
Python

# --- START OF FILE core.py ---
import os
import sys
# single thread doubles cuda performance - needs to be set before torch import
# Check if CUDAExecutionProvider is likely intended
_cuda_intended = False
if '--execution-provider' in sys.argv:
try:
providers_index = sys.argv.index('--execution-provider')
# Check subsequent arguments until the next option (starts with '-') or end of list
for i in range(providers_index + 1, len(sys.argv)):
if sys.argv[i].startswith('-'):
break
if 'cuda' in sys.argv[i].lower():
_cuda_intended = True
break
except ValueError:
pass # --execution-provider not found
# Less precise check if the above fails or isn't used (e.g. deprecated --gpu-vendor nvidia)
if not _cuda_intended and any('cuda' in arg.lower() or 'nvidia' in arg.lower() for arg in sys.argv):
_cuda_intended = True
if _cuda_intended:
print("[DLC.CORE] CUDA execution provider detected or inferred, setting OMP_NUM_THREADS=1.")
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
from typing import List, Optional
import platform
import signal
import shutil
import argparse
import gc # Garbage Collector
# --- ONNX Runtime Version Check ---
# Ensure ONNX Runtime is imported and check version compatibility if needed.
# As of onnxruntime 1.19, the core APIs used here (get_available_providers, InferenceSession config)
# remain stable. No specific code changes are required *in this file* for 1.19 compatibility,
# assuming frame processors use standard SessionOptions/InferenceSession creation.
try:
import onnxruntime
# print(f"[DLC.CORE] Using ONNX Runtime version: {onnxruntime.__version__}") # Optional: uncomment for debug
# Example future check:
# from packaging import version
# if version.parse(onnxruntime.__version__) < version.parse("1.19.0"):
# print(f"Warning: ONNX Runtime version {onnxruntime.__version__} is older than 1.19. Some features might differ.")
except ImportError:
print("\033[31m[DLC.CORE] Error: ONNX Runtime is not installed. Please install it (e.g., `pip install onnxruntime` or `pip install onnxruntime-gpu`).\033[0m")
sys.exit(1)
# --- PyTorch Conditional Import ---
_torch_available = False
_torch_cuda_available = False
try:
import torch
_torch_available = True
if torch.cuda.is_available():
_torch_cuda_available = True
except ImportError:
# Warning only if CUDA EP might be used, otherwise PyTorch is optional
if _cuda_intended:
print("[DLC.CORE] Warning: PyTorch not found or CUDA not available. GPU memory limiting via Torch is disabled.")
pass # Keep torch=None or handle appropriately
# --- TensorFlow Conditional Import (for resource limiting) ---
_tensorflow_available = False
try:
import tensorflow
_tensorflow_available = True
except ImportError:
print("[DLC.CORE] Info: TensorFlow not found. GPU memory growth configuration for TensorFlow will be skipped.")
pass
import modules.globals
import modules.metadata
import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
# Configuration for GPU Memory Limit (0.8 = 80%)
GPU_MEMORY_LIMIT_FRACTION = 0.8
# Check if ROCM is chosen early, before parse_args if possible, or handle after
_is_rocm_selected = False
# A simple check; parse_args will give the definitive list later
if any('rocm' in arg.lower() for arg in sys.argv):
_is_rocm_selected = True
if _is_rocm_selected and _torch_available:
# If ROCM is selected, torch might interfere or not be needed.
# Let's keep the behavior of unloading it for safety, as ROCm support in PyTorch can be complex.
print("[DLC.CORE] ROCM detected or selected, unloading PyTorch to prevent potential conflicts.")
del torch
_torch_available = False
_torch_cuda_available = False
gc.collect() # Try to explicitly collect garbage
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser(formatter_class=lambda prog: argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=40)) # Wider help
program.add_argument('-s', '--source', help='Path to the source image file', dest='source_path')
program.add_argument('-t', '--target', help='Path to the target image or video file', dest='target_path')
program.add_argument('-o', '--output', help='Path for the output file or directory', dest='output_path')
# Frame processors - Updated choices might be needed if new processors are added
available_processors = ['face_swapper', 'face_enhancer'] # Dynamically get these if possible in future
program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=available_processors, nargs='+')
program.add_argument('--keep-fps', help='Keep the original frames per second (FPS) of the target video', dest='keep_fps', action='store_true')
program.add_argument('--keep-audio', help='Keep the original audio of the target video (requires --keep-fps for perfect sync)', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='Keep the temporary extracted frames after processing', dest='keep_frames', action='store_true')
program.add_argument('--many-faces', help='Process all detected faces in the target, not just the most similar', dest='many_faces', action='store_true')
program.add_argument('--nsfw-filter', help='Enable NSFW content filtering (experimental, image-only currently)', dest='nsfw_filter', action='store_true')
program.add_argument('--map-faces', help='EXPERIMENTAL: Map source faces to target faces based on order or index. Requires manual setup or specific naming conventions.', dest='map_faces', action='store_true')
program.add_argument('--mouth-mask', help='Apply a mask over the mouth region during processing (specific to certain processors)', dest='mouth_mask', action='store_true')
program.add_argument('--video-encoder', help='Encoder for the output video', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc']) # Added NVENC options
program.add_argument('--video-quality', help='Quality for the output video (lower value means higher quality, range depends on encoder)', dest='video_quality', type=int, default=18, metavar='[0-51 for x264/x265, 0-63 for vp9]') # Adjusted range note
program.add_argument('-l', '--lang', help='User interface language code (e.g., "en", "es")', default="en")
program.add_argument('--live-mirror', help='Mirror the live camera preview (like a webcam)', dest='live_mirror', action='store_true')
program.add_argument('--live-resizable', help='Allow resizing the live camera preview window', dest='live_resizable', action='store_true')
program.add_argument('--max-memory', help='DEPRECATED (use with caution): Approx. maximum CPU RAM in GB. Less effective than GPU limits.', dest='max_memory', type=int) # Removed default, let suggest_max_memory handle it dynamically if needed
# Execution Provider - Updated based on ONNX Runtime 1.19 common providers
program.add_argument('--execution-provider', help='Execution provider(s) to use (e.g., cuda, cpu, rocm, dml, coreml). Order determines priority.', dest='execution_provider', default=suggest_execution_providers(), choices=get_available_execution_providers_short(), nargs='+')
program.add_argument('--execution-threads', help='Number of threads for the execution provider', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version} (ONNX Runtime: {onnxruntime.__version__})') # Added ORT version
# register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated', choices=['apple', 'nvidia', 'amd'])
program.add_argument('--gpu-threads', help=argparse.SUPPRESS, dest='gpu_threads_deprecated', type=int)
args = program.parse_args()
# Set default for max_memory if not provided
if args.max_memory is None:
args.max_memory = suggest_max_memory()
# Process deprecated args first
handle_deprecated_args(args)
# Assign to globals
modules.globals.source_path = args.source_path
modules.globals.target_path = args.target_path
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path)
modules.globals.frame_processors = args.frame_processor
# Headless mode is determined by the presence of CLI args for paths
modules.globals.headless = bool(args.source_path or args.target_path or args.output_path)
modules.globals.keep_fps = args.keep_fps
modules.globals.keep_audio = args.keep_audio # Note: keep_audio without keep_fps can cause sync issues
modules.globals.keep_frames = args.keep_frames
modules.globals.many_faces = args.many_faces
modules.globals.mouth_mask = args.mouth_mask
modules.globals.nsfw_filter = args.nsfw_filter
modules.globals.map_faces = args.map_faces
modules.globals.video_encoder = args.video_encoder
modules.globals.video_quality = args.video_quality
modules.globals.live_mirror = args.live_mirror
modules.globals.live_resizable = args.live_resizable
modules.globals.max_memory = args.max_memory # Still set, but primarily for CPU RAM limit now
modules.globals.execution_providers = decode_execution_providers(args.execution_provider) # Decode selected short names
modules.globals.execution_threads = args.execution_threads
modules.globals.lang = args.lang
# Update derived globals
modules.globals.fp_ui = {proc: (proc in modules.globals.frame_processors) for proc in available_processors} # Simplified UI state init
# Validate keep_audio / keep_fps combination
if modules.globals.keep_audio and not modules.globals.keep_fps and not modules.globals.headless:
# Only warn in interactive mode, CLI users are expected to know
print("\033[33mWarning: --keep-audio is enabled but --keep-fps is disabled. This might cause audio/video synchronization issues.\033[0m")
elif modules.globals.keep_audio and not modules.globals.target_path:
print("\033[33mWarning: --keep-audio is enabled but no target video path is provided. Audio cannot be kept.\033[0m")
modules.globals.keep_audio = False
def handle_deprecated_args(args: argparse.Namespace) -> None:
"""Handles deprecated arguments and updates corresponding new arguments if necessary."""
if args.source_path_deprecated:
print('\033[33mArgument -f/--face is deprecated. Use -s/--source instead.\033[0m')
if not args.source_path: # Only override if --source wasn't set
args.source_path = args.source_path_deprecated
# Re-evaluate output path based on deprecated source (normalize_output_path handles this later)
# Track if execution_threads was explicitly set by the user via --execution-threads
# This requires checking sys.argv as argparse doesn't directly expose this.
threads_explicitly_set = '--execution-threads' in sys.argv
if args.cpu_cores_deprecated is not None:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
# Only override if --execution-threads wasn't explicitly set
if not threads_explicitly_set:
args.execution_threads = args.cpu_cores_deprecated
threads_explicitly_set = True # Mark as set now
if args.gpu_threads_deprecated is not None:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
# Only override if --execution-threads wasn't explicitly set (by user or cpu-cores)
if not threads_explicitly_set:
args.execution_threads = args.gpu_threads_deprecated
threads_explicitly_set = True # Mark as set
# Handle --gpu-vendor deprecation by modifying execution_provider list *if not explicitly set*
ep_explicitly_set = '--execution-provider' in sys.argv
if args.gpu_vendor_deprecated:
print(f'\033[33mArgument --gpu-vendor {args.gpu_vendor_deprecated} is deprecated. Use --execution-provider instead.\033[0m')
if not ep_explicitly_set:
provider_map = {
# Map vendor to preferred execution provider short names
'apple': ['coreml', 'cpu'], # CoreML first
'nvidia': ['cuda', 'cpu'], # CUDA first
'amd': ['rocm', 'cpu'] # ROCm first
# 'intel': ['openvino', 'cpu'] # Example if OpenVINO support is relevant
}
if args.gpu_vendor_deprecated in provider_map:
suggested_providers = provider_map[args.gpu_vendor_deprecated]
print(f"Mapping deprecated --gpu-vendor {args.gpu_vendor_deprecated} to --execution-provider {' '.join(suggested_providers)}")
args.execution_provider = suggested_providers # Set the list of short names
else:
print(f'\033[33mWarning: Unknown --gpu-vendor {args.gpu_vendor_deprecated}. Default execution providers will be used.\033[0m')
else:
print(f'\033[33mWarning: --gpu-vendor {args.gpu_vendor_deprecated} is ignored because --execution-provider was explicitly set.\033[0m')
def get_available_execution_providers_full() -> List[str]:
"""Returns the full names of available ONNX Runtime execution providers."""
try:
return onnxruntime.get_available_providers()
except AttributeError:
# Fallback for very old versions or unexpected issues
print("\033[33mWarning: Could not dynamically get available providers. Falling back to common defaults.\033[0m")
# Provide a reasonable guess
defaults = ['CPUExecutionProvider']
if _cuda_intended: defaults.insert(0, 'CUDAExecutionProvider')
if _is_rocm_selected: defaults.insert(0, 'ROCMExecutionProvider')
# Add others based on platform if needed
return defaults
def get_available_execution_providers_short() -> List[str]:
"""Returns the short names (lowercase) of available ONNX Runtime execution providers."""
full_names = get_available_execution_providers_full()
return [name.replace('ExecutionProvider', '').lower() for name in full_names]
def decode_execution_providers(selected_short_names: List[str]) -> List[str]:
"""Converts selected short names back to full ONNX Runtime provider names, preserving order and checking availability."""
available_full_names = get_available_execution_providers_full()
available_short_map = {name.replace('ExecutionProvider', '').lower(): name for name in available_full_names}
decoded_providers = []
valid_short_names_found = []
for short_name in selected_short_names:
name_lower = short_name.lower()
if name_lower in available_short_map:
full_name = available_short_map[name_lower]
if full_name not in decoded_providers: # Avoid duplicates
decoded_providers.append(full_name)
valid_short_names_found.append(name_lower)
else:
print(f"\033[33mWarning: Requested execution provider '{short_name}' is not available or not recognized. Skipping.\033[0m")
if not decoded_providers:
print("\033[33mWarning: No valid execution providers selected or available. Falling back to CPU.\033[0m")
if 'CPUExecutionProvider' in available_full_names:
decoded_providers = ['CPUExecutionProvider']
valid_short_names_found.append('cpu')
else:
print("\033[31mError: CPUExecutionProvider is not available in this build of ONNX Runtime. Cannot proceed.\033[0m")
sys.exit(1) # Critical error
print(f"[DLC.CORE] Using execution providers: {valid_short_names_found} (Full names: {decoded_providers})")
return decoded_providers
def suggest_max_memory() -> int:
"""Suggests a default max CPU RAM limit in GB. Less critical now with GPU limits."""
try:
import psutil
total_ram_gb = psutil.virtual_memory().total / (1024 ** 3)
# Suggest slightly less than half of total RAM, capped at a reasonable upper limit (e.g., 64GB)
# and a minimum (e.g., 4GB)
suggested = max(4, min(int(total_ram_gb * 0.4), 64))
# print(f"[DLC.CORE] Auto-suggesting max_memory: {suggested} GB (based on total system RAM: {total_ram_gb:.1f} GB)")
return suggested
except (ImportError, OSError):
print("[DLC.CORE] Info: psutil not found or failed. Using fallback default for max_memory suggestion (16 GB).")
# Fallback defaults similar to original code
if platform.system().lower() == 'darwin':
return 8 # Increased macOS default slightly
return 16 # Keep higher default for Linux/Windows
def suggest_execution_providers() -> List[str]:
"""Suggests a default list of execution providers based on availability and platform."""
available_short = get_available_execution_providers_short()
preferred_providers = []
# Prioritize GPU providers if available
if 'cuda' in available_short:
preferred_providers.append('cuda')
elif 'rocm' in available_short:
preferred_providers.append('rocm')
elif 'dml' in available_short and platform.system().lower() == 'windows':
preferred_providers.append('dml') # DirectML on Windows
elif 'coreml' in available_short and platform.system().lower() == 'darwin':
preferred_providers.append('coreml') # CoreML on macOS
# Always include CPU as a fallback
if 'cpu' in available_short:
preferred_providers.append('cpu')
elif available_short: # If CPU is somehow missing, add the first available one
preferred_providers.append(available_short[0])
# If list is empty (shouldn't happen if get_available works), default to cpu
if not preferred_providers:
return ['cpu']
# print(f"[DLC.CORE] Suggested execution providers: {preferred_providers}") # Optional debug info
return preferred_providers
def suggest_execution_threads() -> int:
"""Suggests a sensible default number of execution threads based on CPU cores."""
try:
logical_cores = os.cpu_count() or 4 # Default to 4 if cpu_count fails
# Use slightly fewer threads than logical cores, capped.
# Good balance between parallelism and overhead.
suggested_threads = max(1, min(logical_cores - 1 if logical_cores > 1 else 1, 16))
# Don't suggest 1 for CUDA/ROCm implicitly here, let user override or frame processors decide.
# The SessionOptions in the processors should handle provider-specific thread settings if needed.
# print(f"[DLC.CORE] Auto-suggesting execution_threads: {suggested_threads} (based on {logical_cores} logical cores)")
return suggested_threads
except NotImplementedError:
print("[DLC.CORE] Warning: os.cpu_count() not implemented. Using fallback default for execution_threads (4).")
return 4 # Fallback
def limit_gpu_memory(fraction: float) -> None:
"""Attempts to limit GPU memory usage, primarily via PyTorch if CUDA is used."""
# Check if CUDAExecutionProvider is in the *actually selected* providers
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
if _torch_cuda_available:
try:
# Ensure CUDA is initialized if needed (might not be necessary, but safe)
if not torch.cuda.is_initialized():
torch.cuda.init()
device_count = torch.cuda.device_count()
if device_count > 0:
# Limit memory on the default device (usually device 0)
# Note: This limits PyTorch's allocation pool. ONNX Runtime might manage
# its CUDA memory somewhat separately, but this can still help prevent
# PyTorch from grabbing everything.
print(f"[DLC.CORE] Attempting to limit PyTorch CUDA memory fraction to {fraction:.1%} on device 0")
torch.cuda.set_per_process_memory_fraction(fraction, 0)
# Optional: Check memory after setting limit
total_mem = torch.cuda.get_device_properties(0).total_memory
reserved_mem = torch.cuda.memory_reserved(0)
allocated_mem = torch.cuda.memory_allocated(0)
print(f"[DLC.CORE] PyTorch CUDA memory limit hint set. Device 0 Total: {total_mem / 1024**3:.2f} GB. "
f"PyTorch Reserved: {reserved_mem / 1024**3:.2f} GB, Allocated: {allocated_mem / 1024**3:.2f} GB.")
else:
print("\033[33mWarning: PyTorch reports no CUDA devices available, cannot set memory limit.\033[0m")
except RuntimeError as e:
print(f"\033[33mWarning: PyTorch CUDA runtime error during memory limit setting (may already be initialized?): {e}\033[0m")
except Exception as e:
print(f"\033[33mWarning: Failed to set PyTorch CUDA memory fraction: {e}\033[0m")
else:
# Only warn if PyTorch CUDA specifically isn't available, but CUDA EP was chosen.
if _cuda_intended: # Check original intent
print("\033[33mWarning: CUDAExecutionProvider selected, but PyTorch CUDA is not available. Cannot apply PyTorch memory limit.\033[0m")
# Add future limits for other providers if ONNX Runtime API supports it directly
# Example placeholder for potential future ONNX Runtime API:
# elif 'ROCMExecutionProvider' in modules.globals.execution_providers:
# try:
# # Hypothetical ONNX Runtime API
# ort_options = onnxruntime.SessionOptions()
# ort_options.add_provider_options('rocm', {'gpu_mem_limit': str(int(total_mem_bytes * fraction))})
# print("[DLC.CORE] Note: ROCm memory limit set via ONNX Runtime provider options (if API exists).")
# except Exception as e:
# print(f"\033[33mWarning: Failed to set ROCm memory limit via hypothetical ORT options: {e}\033[0m")
# else:
# print("[DLC.CORE] GPU memory limit not applied (PyTorch CUDA not used or unavailable).")
def limit_resources() -> None:
"""Limits system resources like CPU RAM (best effort) and sets TensorFlow GPU options."""
# 1. Limit CPU RAM (Best-effort, OS-dependent)
if modules.globals.max_memory and modules.globals.max_memory > 0:
limit_gb = modules.globals.max_memory
limit_bytes = limit_gb * (1024 ** 3)
current_system = platform.system().lower()
try:
if current_system == 'linux' or current_system == 'darwin':
import resource
# RLIMIT_AS (virtual memory) is often more effective than RLIMIT_DATA
try:
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
# Set soft limit; hard limit usually requires root. Don't exceed current hard limit.
new_soft = min(limit_bytes, hard)
resource.setrlimit(resource.RLIMIT_AS, (new_soft, hard))
print(f"[DLC.CORE] Limited process virtual memory (CPU RAM approximation) soft limit towards ~{limit_gb} GB.")
except (ValueError, resource.error) as e:
print(f"\033[33mWarning: Failed to set virtual memory limit (RLIMIT_AS): {e}\033[0m")
# Fallback attempt using RLIMIT_DATA (less effective for total memory)
try:
soft_data, hard_data = resource.getrlimit(resource.RLIMIT_DATA)
new_soft_data = min(limit_bytes, hard_data)
resource.setrlimit(resource.RLIMIT_DATA, (new_soft_data, hard_data))
print(f"[DLC.CORE] Limited process data segment (partial CPU RAM) soft limit towards ~{limit_gb} GB.")
except (ValueError, resource.error) as e_data:
print(f"\033[33mWarning: Failed to set data segment limit (RLIMIT_DATA): {e_data}\033[0m")
elif current_system == 'windows':
# Windows memory limiting is complex. SetProcessWorkingSetSizeEx is more of a suggestion.
# Job Objects are the robust way but much more involved. Keep the hint for now.
import ctypes
kernel32 = ctypes.windll.kernel32
process_handle = kernel32.GetCurrentProcess()
# Flags: QUOTA_LIMITS_HARDWS_ENABLE (1) requires special privileges, use 0 for min/max hint only
# Using min=1MB, max=limit_bytes. Returns non-zero on success.
min_ws = ctypes.c_size_t(1024 * 1024)
max_ws = ctypes.c_size_t(limit_bytes)
if not kernel32.SetProcessWorkingSetSizeEx(process_handle, min_ws, max_ws, 0):
error_code = ctypes.get_last_error()
print(f"\033[33mWarning: Failed to set process working set size hint (Windows). Error code: {error_code}. This limit may not be enforced.\033[0m")
else:
print(f"[DLC.CORE] Requested process working set size hint (Windows memory guidance) max ~{limit_gb} GB.")
else:
print(f"\033[33mWarning: CPU RAM limiting not implemented for platform {current_system}. --max-memory ignored.\033[0m")
except ImportError:
print(f"\033[33mWarning: 'resource' module (Unix) not available. Cannot limit CPU RAM via setrlimit.\033[0m")
except Exception as e:
print(f"\033[33mWarning: An unexpected error occurred during CPU RAM limiting: {e}\033[0m")
# else:
# print("[DLC.CORE] Info: CPU RAM limit (--max-memory) not set or disabled.")
# 2. Configure TensorFlow GPU memory (if TensorFlow is installed)
if _tensorflow_available:
try:
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
if gpus:
configured_gpus = 0
for gpu in gpus:
try:
# Allow memory growth instead of pre-allocating everything
tensorflow.config.experimental.set_memory_growth(gpu, True)
# print(f"[DLC.CORE] Enabled TensorFlow memory growth for GPU: {gpu.name}")
configured_gpus += 1
except RuntimeError as e:
# Memory growth must be set before GPUs have been initialized
print(f"\033[33mWarning: Could not set TensorFlow memory growth for {gpu.name} (may already be initialized): {e}\033[0m")
except Exception as e_inner: # Catch other potential TF config errors
print(f"\033[33mWarning: Error configuring TensorFlow memory growth for {gpu.name}: {e_inner}\033[0m")
if configured_gpus > 0:
print(f"[DLC.CORE] Enabled TensorFlow memory growth for {configured_gpus} GPU(s).")
# else:
# print("[DLC.CORE] No TensorFlow physical GPUs detected.")
except Exception as e:
print(f"\033[33mWarning: Error listing or configuring TensorFlow GPU devices: {e}\033[0m")
# else:
# print("[DLC.CORE] TensorFlow not available, skipping TF GPU configuration.")
def release_resources() -> None:
"""Releases resources, especially GPU memory caches."""
# Clear PyTorch CUDA cache if applicable and PyTorch CUDA is available
if 'CUDAExecutionProvider' in modules.globals.execution_providers and _torch_cuda_available:
try:
torch.cuda.empty_cache()
# print("[DLC.CORE] Cleared PyTorch CUDA cache.") # Optional: uncomment for verbose logging
except Exception as e:
print(f"\033[33mWarning: Failed to clear PyTorch CUDA cache: {e}\033[0m")
# Add potential cleanup for other frameworks or ONNX Runtime sessions if needed
# (Usually session objects going out of scope and gc.collect() is sufficient for ORT C++ backend)
# Explicitly run garbage collection
# This helps release Python-level objects, which might then trigger
# the release of underlying resources (like ONNX Runtime session memory)
gc.collect()
# print("[DLC.CORE] Ran garbage collector.") # Optional: uncomment for verbose logging
def pre_check() -> bool:
"""Performs essential pre-run checks for dependencies and versions."""
if sys.version_info < (3, 9):
update_status('Python version is not supported - please upgrade to Python 3.9 or higher.')
return False
if not shutil.which('ffmpeg'):
update_status('ffmpeg command not found in PATH. Please install ffmpeg and ensure it is accessible.')
return False
# ONNX Runtime was checked at import time, but double check here if needed.
# The import would have failed earlier if it's not installed.
# print(f"[DLC.CORE] Using ONNX Runtime version: {onnxruntime.__version__}")
# TensorFlow check (optional, only issue warning if unavailable)
if not _tensorflow_available:
update_status('TensorFlow not found. Some features like GPU memory growth setting will be skipped.', scope='INFO')
# Decide if TF is strictly required by any processor. If so, change to error and return False.
# Currently, it seems only used for optional resource limiting.
# Check PyTorch availability *only if* CUDA EP is selected
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
if not _torch_available:
update_status('CUDAExecutionProvider selected, but PyTorch is not installed. Install PyTorch with CUDA support (see PyTorch website).', scope='ERROR')
return False
if not _torch_cuda_available:
update_status('CUDAExecutionProvider selected, but torch.cuda.is_available() is False. Check PyTorch CUDA installation, GPU drivers, and CUDA toolkit compatibility.', scope='ERROR')
return False
# Check if selected video encoder potentially requires specific hardware/drivers (e.g., NVENC)
if modules.globals.video_encoder in ['h264_nvenc', 'hevc_nvenc']:
# This check is basic. FFmpeg needs to be compiled with NVENC support,
# and NVIDIA drivers must be installed. We can't easily verify this from Python.
# Just issue an informational note.
update_status(f"Selected video encoder '{modules.globals.video_encoder}' requires an NVIDIA GPU and correctly configured FFmpeg/drivers.", scope='INFO')
if 'CUDAExecutionProvider' not in modules.globals.execution_providers:
update_status(f"Warning: NVENC encoder selected, but CUDAExecutionProvider is not active. Ensure FFmpeg can access the GPU independently.", scope='WARN')
return True
def update_status(message: str, scope: str = 'DLC.CORE') -> None:
"""Prints status messages and updates UI if not headless."""
formatted_message = f'[{scope}] {message}'
print(formatted_message)
if not modules.globals.headless:
# Ensure ui module and update_status function exist and are callable
if hasattr(ui, 'update_status') and callable(ui.update_status):
try:
# Use a mechanism that's safe for cross-thread UI updates if necessary
# (e.g., queue or wx.CallAfter if using wxPython)
# Assuming direct call is okay for now based on original structure.
ui.update_status(message) # Pass the original message without scope prefix
except Exception as e:
# Avoid crashing core process for UI update errors
print(f"[DLC.CORE] Error updating UI status: {e}")
# else:
# print("[DLC.CORE] UI or ui.update_status not available for status update.")
def start() -> None:
"""Main processing logic: routes to image or video processing."""
# Ensure frame processors are ready (this also initializes them)
try:
active_processors = get_frame_processors_modules(modules.globals.frame_processors)
if not active_processors:
update_status("No valid frame processors selected or loaded. Aborting.", "ERROR")
return
all_processors_initialized = True
for frame_processor in active_processors:
update_status(f'Initializing frame processor: {getattr(frame_processor, "NAME", "UnknownProcessor")}...')
# The pre_start method should handle model loading and initial setup.
# It might raise exceptions or return False on failure.
if not hasattr(frame_processor, 'pre_start') or not callable(frame_processor.pre_start):
update_status(f'Processor {getattr(frame_processor, "NAME", "UnknownProcessor")} lacks a pre_start method.', 'WARN')
continue # Or treat as failure?
if not frame_processor.pre_start():
update_status(f'Initialization failed for {getattr(frame_processor, "NAME", "UnknownProcessor")}. Aborting.', 'ERROR')
all_processors_initialized = False
break # Stop initialization if one fails
if not all_processors_initialized:
return # Abort if any processor failed to initialize
except Exception as e:
update_status(f"Error during frame processor initialization: {e}", "ERROR")
import traceback
traceback.print_exc()
return
# --- Route based on target type ---
if not modules.globals.target_path or not os.path.exists(modules.globals.target_path):
update_status(f"Target path '{modules.globals.target_path}' not found or not specified.", "ERROR")
return
if has_image_extension(modules.globals.target_path) and is_image(modules.globals.target_path):
process_image_target(active_processors)
elif is_video(modules.globals.target_path):
process_video_target(active_processors)
else:
update_status(f"Target path '{modules.globals.target_path}' is not a recognized image or video file.", "ERROR")
def process_image_target(active_processors: List) -> None:
"""Handles processing when the target is an image."""
update_status('Processing image target...')
# NSFW check (basic, for image only)
if modules.globals.nsfw_filter:
update_status('Checking image for NSFW content...', 'NSFW')
# Assuming ui.check_and_ignore_nsfw is suitable for this
if ui.check_and_ignore_nsfw(modules.globals.target_path, destroy):
update_status('NSFW content detected and processing skipped.', 'NSFW')
return # Stop processing
try:
# Ensure source path exists if needed by processors
if not modules.globals.source_path or not os.path.exists(modules.globals.source_path):
# Face swapping requires a source, enhancer might not. Check processor needs?
if any(proc.NAME == 'face_swapper' for proc in active_processors): # Example check
update_status(f"Source image path '{modules.globals.source_path}' not found or not specified, required for face swapping.", "ERROR")
return
# Ensure output directory exists
output_dir = os.path.dirname(modules.globals.output_path)
if output_dir and not os.path.exists(output_dir):
try:
os.makedirs(output_dir, exist_ok=True)
print(f"[DLC.CORE] Created output directory: {output_dir}")
except OSError as e:
update_status(f"Error creating output directory '{output_dir}': {e}", "ERROR")
return
# Copy target to output path first to preserve metadata if possible and safe
final_output_path = modules.globals.output_path
temp_output_path = None # Use a temp path if overwriting source/target directly
# Avoid overwriting input files directly during processing if they are the same as output
if os.path.abspath(modules.globals.target_path) == os.path.abspath(final_output_path) or \
(modules.globals.source_path and os.path.abspath(modules.globals.source_path) == os.path.abspath(final_output_path)):
temp_output_path = os.path.join(output_dir, f"temp_image_{os.path.basename(final_output_path)}")
print(f"[DLC.CORE] Output path conflicts with input, using temporary file: {temp_output_path}")
shutil.copy2(modules.globals.target_path, temp_output_path)
current_processing_file = temp_output_path
else:
# Copy target to final destination to start
shutil.copy2(modules.globals.target_path, final_output_path)
current_processing_file = final_output_path
# Apply processors sequentially to the current file path
source_for_processing = modules.globals.source_path
output_for_processing = current_processing_file # Processors modify this file
for frame_processor in active_processors:
processor_name = getattr(frame_processor, "NAME", "UnknownProcessor")
update_status(f'Applying {processor_name}...', processor_name)
try:
# Pass source, input_path (current state), output_path (same as input for in-place modification)
frame_processor.process_image(source_for_processing, output_for_processing, output_for_processing)
release_resources() # Release memory after each processor step
except Exception as e:
update_status(f'Error during {processor_name} processing: {e}', 'ERROR')
import traceback
traceback.print_exc()
# Optionally clean up temp file and abort
if temp_output_path and os.path.exists(temp_output_path): os.remove(temp_output_path)
return
# If a temporary file was used, move it to the final destination
if temp_output_path:
try:
shutil.move(temp_output_path, final_output_path)
print(f"[DLC.CORE] Moved temporary result to final output: {final_output_path}")
except Exception as e:
update_status(f"Error moving temporary file to final output: {e}", "ERROR")
# Temp file might still exist, leave it for inspection?
return
# Final check if output exists and is an image
if os.path.exists(final_output_path) and is_image(final_output_path):
update_status('Processing image finished successfully.')
else:
update_status('Processing image failed: Output file not found or invalid after processing.', 'ERROR')
except Exception as e:
update_status(f'An unexpected error occurred during image processing: {e}', 'ERROR')
import traceback
traceback.print_exc()
# Clean up potentially corrupted output/temp file? Be cautious.
# if temp_output_path and os.path.exists(temp_output_path): os.remove(temp_output_path)
# if os.path.exists(final_output_path) and current_processing_file == final_output_path: # Careful not to delete original if copy failed
# Consider what to do on failure - delete potentially corrupt output?
def process_video_target(active_processors: List) -> None:
"""Handles processing when the target is a video."""
update_status('Processing video target...')
# Basic check for source if needed (similar to image processing)
if not modules.globals.source_path or not os.path.exists(modules.globals.source_path):
if any(proc.NAME == 'face_swapper' for proc in active_processors):
update_status(f"Source image path '{modules.globals.source_path}' not found or not specified, required for face swapping.", "ERROR")
return
# NSFW Check (Could be enhanced to sample frames, currently basic/skipped for video)
if modules.globals.nsfw_filter:
update_status('NSFW check for video is basic/experimental. Checking first frame...', 'NSFW')
# Consider implementing frame sampling for a more robust check if needed
# if ui.check_and_ignore_nsfw(modules.globals.target_path, destroy): # This might not work well for video
# update_status('NSFW content potentially detected (based on first frame check). Skipping.', 'NSFW')
# return
update_status('NSFW check passed or skipped for video.', 'NSFW INFO')
temp_output_video_path = None
temp_frame_dir = None # Keep track of temp frame directory
try:
# --- Frame Extraction ---
# map_faces might imply frames are already extracted or handled differently
if not modules.globals.map_faces:
update_status('Creating temporary resources for video frames...')
# create_temp should return the path to the temp directory created
temp_frame_dir = create_temp(modules.globals.target_path)
if not temp_frame_dir:
update_status("Failed to create temporary directory for frames.", "ERROR")
return
update_status('Extracting video frames...')
# extract_frames needs the temp directory path
# It should also ideally set modules.globals.video_fps based on the extracted video
extract_frames(modules.globals.target_path, temp_frame_dir) # Pass temp dir
update_status('Frame extraction complete.')
else:
update_status('Skipping frame extraction due to --map-faces flag.', 'INFO')
# Assuming frames are already in the expected temp location or handled by processors
temp_frame_dir = os.path.join(modules.globals.TEMP_DIRECTORY, os.path.basename(modules.globals.target_path)) # Need consistent temp path logic
# Get paths to frames (extracted or pre-existing)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) # This needs to know the temp dir structure
if not temp_frame_paths:
update_status('No frames found to process. Check temp folder or extraction step.', 'ERROR')
# Clean up if temp dir was created
if temp_frame_dir and not modules.globals.keep_frames: clean_temp(modules.globals.target_path)
return
update_status(f'Processing {len(temp_frame_paths)} frames...')
# --- Frame Processing ---
source_for_processing = modules.globals.source_path
for frame_processor in active_processors:
processor_name = getattr(frame_processor, "NAME", "UnknownProcessor")
update_status(f'Applying {processor_name}...', processor_name)
try:
# process_video should modify frames in-place in the temp directory
# It needs the source path and the list of frame paths
frame_processor.process_video(source_for_processing, temp_frame_paths)
release_resources() # Release memory after each processor completes its pass
except Exception as e:
update_status(f'Error during {processor_name} frame processing: {e}', 'ERROR')
import traceback
traceback.print_exc()
# Abort processing
# Clean up temp frames if not keeping them
if temp_frame_dir and not modules.globals.keep_frames: clean_temp(modules.globals.target_path)
return
# --- Video Creation ---
update_status('Reconstructing video from processed frames...')
fps = modules.globals.video_fps # Should be set by extract_frames or detected earlier
if modules.globals.keep_fps:
# Use the FPS detected during extraction (should be stored in globals.video_fps)
if fps is None:
update_status('Original FPS not detected during extraction, attempting fallback detection...', 'WARN')
detected_fps = detect_fps(modules.globals.target_path)
if detected_fps is not None:
fps = detected_fps
modules.globals.video_fps = fps # Store it back
update_status(f'Using fallback detected FPS: {fps:.2f}')
else:
fps = 30.0 # Ultimate fallback
update_status("Could not detect FPS, using default 30.", "WARN")
else:
update_status(f'Using original detected FPS: {fps:.2f}')
else:
fps = 30.0 # Use default fps if not keeping original
update_status(f'Using fixed FPS: {fps:.2f}')
# Define a temporary path for the video created *without* audio
output_dir = os.path.dirname(modules.globals.output_path)
if not output_dir: output_dir = '.' # Handle case where output is in current dir
temp_output_video_filename = f"temp_{os.path.basename(modules.globals.output_path)}"
# Ensure the temp filename doesn't clash if multiple runs happen concurrently (less likely in this app)
temp_output_video_path = os.path.join(output_dir, temp_output_video_filename)
# create_video needs the target path (for context?), fps, and the *temp* output path
# It internally uses get_temp_frame_paths based on the target_path context.
create_video(modules.globals.target_path, fps, temp_output_video_path)
# --- Audio Handling ---
final_output_path = modules.globals.output_path
if modules.globals.keep_audio:
update_status('Restoring audio...')
if not modules.globals.keep_fps:
update_status('Audio restoration may cause sync issues as FPS was not kept.', 'WARN')
# restore_audio needs: original video (with audio), temp video (no audio), final output path
restore_success = restore_audio(modules.globals.target_path, temp_output_video_path, final_output_path)
if restore_success:
update_status('Audio restoration complete.')
# Remove the intermediate temp video *after* successful audio merge
if os.path.exists(temp_output_video_path):
try: os.remove(temp_output_video_path)
except OSError as e: print(f"\033[33mWarning: Could not remove intermediate video file {temp_output_video_path}: {e}\033[0m")
temp_output_video_path = None # Mark as removed
else:
update_status('Audio restoration failed. The output video will be silent.', 'ERROR')
# Audio failed, move the silent video to the final path as a fallback?
update_status('Moving silent video to final output path as fallback.')
try:
shutil.move(temp_output_video_path, final_output_path)
temp_output_video_path = None # Mark as moved
except Exception as e:
update_status(f"Error moving silent video to final output: {e}", "ERROR")
# Both audio failed and move failed, temp video might still exist
else:
# No audio requested, move the temp video to the final output path
update_status('Moving temporary video to final output path (no audio).')
try:
if os.path.abspath(temp_output_video_path) == os.path.abspath(final_output_path):
update_status("Temporary path is the same as final path, no move needed.", "INFO")
temp_output_video_path = None # No deletion needed later
else:
# Ensure target directory exists (should already, but double check)
os.makedirs(os.path.dirname(final_output_path), exist_ok=True)
shutil.move(temp_output_video_path, final_output_path)
temp_output_video_path = None # Mark as moved successfully
except Exception as e:
update_status(f"Error moving temporary video to final output: {e}", "ERROR")
# The temp video might still exist
# --- Validation ---
if os.path.exists(final_output_path) and is_video(final_output_path):
update_status('Processing video finished successfully.')
else:
update_status('Processing video failed: Output file not found or invalid after processing.', 'ERROR')
except Exception as e:
update_status(f'An unexpected error occurred during video processing: {e}', 'ERROR')
import traceback
traceback.print_exc() # Print detailed traceback for debugging
finally:
# --- Cleanup ---
# Clean up temporary frames if they exist and keep_frames is false
if temp_frame_dir and os.path.exists(temp_frame_dir) and not modules.globals.keep_frames:
update_status("Cleaning up temporary frames...")
clean_temp(modules.globals.target_path) # clean_temp uses target_path context to find the dir
# Clean up intermediate temp video file if it still exists (e.g., audio failed and move failed)
if temp_output_video_path and os.path.exists(temp_output_video_path):
try:
os.remove(temp_output_video_path)
print(f"[DLC.CORE] Removed intermediate temporary video file: {temp_output_video_path}")
except OSError as e:
print(f"\033[33mWarning: Could not remove intermediate temporary video file {temp_output_video_path}: {e}\033[0m")
def destroy(to_quit: bool = True) -> None:
"""Cleans up temporary files, releases resources, and optionally exits."""
update_status("Cleaning up temporary resources...", "CLEANUP")
# Use the context of target_path to find the temp directory
if modules.globals.target_path and not modules.globals.keep_frames:
clean_temp(modules.globals.target_path)
release_resources() # Final resource release (GPU cache, GC)
update_status("Cleanup complete.", "CLEANUP")
if to_quit:
print("[DLC.CORE] Exiting application.")
os._exit(0) # Use os._exit for a more forceful exit if sys.exit hangs (e.g., due to threads)
# sys.exit(0) # Standard exit
def run() -> None:
"""Parses arguments, sets up the environment, performs checks, and starts processing or UI."""
try:
parse_args() # Parse arguments first to set globals like execution_providers, paths, etc.
# Apply GPU Memory Limit early, requires execution_providers to be set by parse_args
limit_gpu_memory(GPU_MEMORY_LIMIT_FRACTION)
# Limit other resources (CPU RAM approximation, TF GPU options)
# Call this *after* potential PyTorch limit and TensorFlow import check
limit_resources()
# Perform pre-checks (dependencies like Python version, ffmpeg, libraries, provider checks)
update_status("Performing pre-run checks...")
if not pre_check():
update_status("Pre-run checks failed. Please see messages above.", "ERROR")
# destroy(to_quit=True) # Don't call destroy here, let the main try/finally handle it
return # Exit run() function
update_status("Pre-run checks passed.")
# Pre-check frame processors (model downloads, requirements within processors)
# This needs globals to be set by parse_args and should happen before starting work.
active_processor_modules = get_frame_processors_modules(modules.globals.frame_processors)
all_processors_reqs_met = True
for frame_processor_module in active_processor_modules:
processor_name = getattr(frame_processor_module, "NAME", "UnknownProcessor")
update_status(f'Checking requirements for {processor_name}...')
if hasattr(frame_processor_module, 'pre_check') and callable(frame_processor_module.pre_check):
if not frame_processor_module.pre_check():
update_status(f'Requirements check failed for {processor_name}. See processor messages for details.', 'ERROR')
all_processors_reqs_met = False
# Don't break early, check all processors to report all issues
else:
update_status(f'Processor {processor_name} does not have a pre_check method. Assuming requirements met.', 'WARN')
if not all_processors_reqs_met:
update_status('Some frame processors failed requirement checks. Please resolve the issues and retry.', 'ERROR')
# destroy(to_quit=True) # Let finally handle cleanup
return
update_status("All frame processor requirements met.")
# --- Start processing (headless) or launch UI ---
if modules.globals.headless:
# Check for essential paths in headless mode
if not modules.globals.source_path:
update_status("Error: Headless mode requires --source argument.", "ERROR")
# program.print_help() # Can't access program object here easily
print("Use -h or --help for usage details.")
return
if not modules.globals.target_path:
update_status("Error: Headless mode requires --target argument.", "ERROR")
print("Use -h or --help for usage details.")
return
if not modules.globals.output_path:
update_status("Error: Headless mode requires --output argument.", "ERROR")
print("Use -h or --help for usage details.")
return
update_status('Running in headless mode.')
start() # Execute the main processing logic
# destroy() will be called by the finally block
else:
# --- Launch UI ---
update_status('Launching graphical user interface...')
# Ensure destroy is callable without arguments for the UI close button
destroy_wrapper = lambda: destroy(to_quit=True)
try:
# Pass start (processing function) and destroy (cleanup) to the UI
window = ui.init(start, destroy_wrapper, modules.globals.lang)
if window:
window.mainloop() # Start the UI event loop
else:
update_status("UI initialization failed.", "ERROR")
except Exception as e:
update_status(f"Error initializing or running the UI: {e}", "FATAL")
import traceback
traceback.print_exc()
# Attempt cleanup even if UI fails
# destroy(to_quit=True) # Let finally handle it
except Exception as e:
# Catch any unexpected errors during setup or execution
update_status(f"A critical error occurred: {e}", "FATAL")
import traceback
traceback.print_exc()
finally:
# Ensure cleanup happens regardless of success or failure
destroy(to_quit=True) # Clean up and exit
# --- Main execution entry point ---
if __name__ == "__main__":
# This ensures 'run()' is called only when the script is executed directly
run()
# --- END OF FILE core.py ---