WIP fixes and performance

pull/286/head
Jason Kneen 2024-08-13 20:54:50 +01:00
parent 1570d91a12
commit 6cda41de6c
4 changed files with 205 additions and 51 deletions

View File

@ -29,6 +29,40 @@ if 'ROCMExecutionProvider' in modules.globals.execution_providers:
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
def get_system_memory() -> int:
"""
Get the total system memory in GB.
Returns:
int: Total system memory in GB.
"""
if platform.system().lower() == 'darwin':
try:
import psutil
return psutil.virtual_memory().total // (1024 ** 3)
except ImportError:
# If psutil is not available, return a default value
return 16 # Assuming 16GB as a default for macOS
else:
# For other systems, we can use psutil if available, or implement system-specific methods
try:
import psutil
return psutil.virtual_memory().total // (1024 ** 3)
except ImportError:
# If psutil is not available, return a default value
return 8 # Assuming 8GB as a default for other systems
def suggest_max_memory() -> int:
"""
Suggest the maximum memory to use based on the system's total memory.
Returns:
int: Suggested maximum memory in GB.
"""
total_memory = get_system_memory()
# Suggest using 70% of total memory, but not more than 64GB
suggested_memory = min(int(total_memory * 0.7), 64)
return max(suggested_memory, 4) # Ensure at least 4GB is suggested
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
@ -46,6 +80,8 @@ def parse_args() -> None:
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['coreml'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('--video-processor', help='video processor to use', dest='video_processor', default='cv2', choices=['cv2', 'ffmpeg'])
program.add_argument('--model', help='model to use for face swapping', dest='model', default='inswapper_128v2.fp16.onnx')
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
args = program.parse_args()
@ -64,6 +100,8 @@ def parse_args() -> None:
modules.globals.max_memory = args.max_memory
modules.globals.execution_providers = ['CoreMLExecutionProvider'] # Force CoreML
modules.globals.execution_threads = args.execution_threads
modules.globals.video_processor = args.video_processor
modules.globals.model = args.model
if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
@ -89,7 +127,6 @@ def suggest_execution_threads() -> int:
return 4
def limit_resources() -> None:
if modules.globals.max_memory:
memory = modules.globals.max_memory * 1024 ** 6
@ -150,7 +187,10 @@ def process_video():
update_status('Creating temp resources...')
create_temp(modules.globals.target_path)
update_status('Extracting frames...')
extract_frames(modules.globals.target_path)
if modules.globals.video_processor == 'cv2':
extract_frames_cv2(modules.globals.target_path)
else:
extract_frames_ffmpeg(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
@ -178,6 +218,30 @@ def process_video():
update_status('Processing to video failed!')
def extract_frames_cv2(target_path: str) -> None:
import cv2
capture = cv2.VideoCapture(target_path)
frame_num = 0
while True:
success, frame = capture.read()
if not success:
break
cv2.imwrite(f'{get_temp_frame_paths(target_path)}/%04d.png' % frame_num, frame)
frame_num += 1
capture.release()
def extract_frames_ffmpeg(target_path: str) -> None:
import ffmpeg
(
ffmpeg
.input(target_path)
.output(f'{get_temp_frame_paths(target_path)}/%04d.png', start_number=0)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
def destroy() -> None:
if modules.globals.target_path:
clean_temp(modules.globals.target_path)

View File

@ -2,6 +2,7 @@ from typing import Any, List
import cv2
import insightface
import threading
import numpy as np
import modules.globals
import modules.processors.frame.core
@ -15,7 +16,6 @@ THREAD_LOCK = threading.Lock()
NAME = 'DLC.FACE-SWAPPER'
# model update to use GPU / Metal on MacOS
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128.onnx'])
@ -46,7 +46,18 @@ def get_face_swapper() -> Any:
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
try:
print("Debug: Starting face swap")
print(f"Debug: temp_frame shape: {temp_frame.shape}, dtype: {temp_frame.dtype}")
print(f"Debug: target_face keys: {target_face.keys()}")
print(f"Debug: source_face keys: {source_face.keys()}")
result = get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
print("Debug: Face swap completed successfully")
return result
except Exception as e:
print(f"Error in swap_face: {str(e)}")
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:

View File

@ -1,9 +1,10 @@
import os
import time
import webbrowser
import customtkinter as ctk
from typing import Callable, Tuple
import cv2
from PIL import Image, ImageOps
from PIL import Image, ImageOps, ImageDraw, ImageFont
import numpy as np
import modules.globals
import modules.metadata
@ -17,8 +18,8 @@ ROOT_HEIGHT = 700
ROOT_WIDTH = 600
PREVIEW = None
PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200
PREVIEW_MAX_HEIGHT = 720
PREVIEW_MAX_WIDTH = 1280
RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None
@ -75,7 +76,6 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch.place(relx=0.1, rely=0.65)
# for FRAME PROCESSOR ENHANCER tumbler:
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.7)
@ -92,23 +92,36 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
nsfw_switch.place(relx=0.6, rely=0.7)
video_processor_label = ctk.CTkLabel(root, text="Video Processor:")
video_processor_label.place(relx=0.1, rely=0.75)
video_processor_var = ctk.StringVar(value=modules.globals.video_processor)
video_processor_menu = ctk.CTkOptionMenu(root, variable=video_processor_var, values=["cv2", "ffmpeg"], command=lambda choice: setattr(modules.globals, 'video_processor', choice))
video_processor_menu.place(relx=0.3, rely=0.75)
model_label = ctk.CTkLabel(root, text="Model:")
model_label.place(relx=0.1, rely=0.8)
model_var = ctk.StringVar(value=modules.globals.model)
model_entry = ctk.CTkEntry(root, textvariable=model_var)
model_entry.place(relx=0.3, rely=0.8, relwidth=0.4)
model_entry.bind("<FocusOut>", lambda event: setattr(modules.globals, 'model', model_var.get()))
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
start_button.place(relx=0.15, rely=0.85, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy())
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
stop_button.place(relx=0.4, rely=0.85, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
preview_button.place(relx=0.65, rely=0.85, relwidth=0.2, relheight=0.05)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview())
live_button.place(relx=0.40, rely=0.86, relwidth=0.2, relheight=0.05)
live_button.place(relx=0.40, rely=0.91, relwidth=0.2, relheight=0.05)
status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
status_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2')
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label.place(relx=0.1, rely=0.98, relwidth=0.8)
donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color'))
donate_label.bind('<Button>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
@ -200,17 +213,32 @@ def render_image_preview(image_path: str, size: Tuple[int, int]) -> ctk.CTkImage
def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: int = 0) -> ctk.CTkImage:
capture = cv2.VideoCapture(video_path)
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
if modules.globals.video_processor == 'cv2':
import cv2
capture = cv2.VideoCapture(video_path)
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
capture.release()
cv2.destroyAllWindows()
else:
import ffmpeg
probe = ffmpeg.probe(video_path)
time = float(probe['streams'][0]['duration']) // 2
out, _ = (
ffmpeg
.input(video_path, ss=time)
.filter('scale', size[0], size[1])
.output('pipe:', vframes=1, format='rawvideo', pix_fmt='rgb24')
.run(capture_stdout=True)
)
image = Image.frombytes('RGB', size, out)
return ctk.CTkImage(image, size=image.size)
capture.release()
cv2.destroyAllWindows()
def toggle_preview() -> None:
@ -241,14 +269,20 @@ def update_preview(frame_number: int = 0) -> None:
quit()
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
temp_frame = frame_processor.process_frame(
get_one_face(cv2.imread(modules.globals.source_path)),
get_one_face(modules.globals.source_path),
temp_frame
)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = Image.fromarray(temp_frame)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
def draw_fps(image, fps):
draw = ImageDraw.Draw(image)
font = ImageFont.truetype("/System/Library/Fonts/Supplemental/Arial.ttf", 36)
draw.text((10, 10), f"FPS: {fps:.2f}", font=font, fill=(255, 255, 255))
return image
def webcam_preview():
if modules.globals.source_path is None:
# No image selected
@ -256,12 +290,31 @@ def webcam_preview():
global preview_label, PREVIEW
cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1024) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 768) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
PREVIEW_MAX_WIDTH = 1024
PREVIEW_MAX_HEIGHT = 768
if modules.globals.video_processor == 'cv2':
import cv2
cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
if not cap.isOpened():
update_status("Error: Unable to open webcam. Please check your camera connection.")
return
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 30) # Set the frame rate of the webcam
else:
import ffmpeg
import subprocess
command = [
'ffmpeg',
'-f', 'avfoundation',
'-framerate', '30',
'-video_size', '1280x720',
'-i', '0:none',
'-f', 'rawvideo',
'-pix_fmt', 'rgb24',
'-'
]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
preview_label.configure(image=None) # Reset the preview image before startup
@ -269,28 +322,51 @@ def webcam_preview():
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None # Initialize variable for the selected face image
# Load the source image
if modules.globals.video_processor == 'cv2':
import cv2
source_image = cv2.imread(modules.globals.source_path)
source_image = cv2.cvtColor(source_image, cv2.COLOR_BGR2RGB)
else:
source_image = np.array(Image.open(modules.globals.source_path))
source_face = get_one_face(source_image)
prev_frame_time = time.time()
fps = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Select and save face image only once
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
temp_frame = frame.copy() #Create a copy of the frame
if modules.globals.video_processor == 'cv2':
ret, frame = cap.read()
if not ret:
break
temp_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
else:
in_bytes = process.stdout.read(1280 * 720 * 3)
if not in_bytes:
break
temp_frame = np.frombuffer(in_bytes, np.uint8).reshape([720, 1280, 3])
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
temp_frame = frame_processor.process_frame(source_face, temp_frame)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) # Convert the image to RGB format to display it with Tkinter
image = Image.fromarray(image)
# Calculate FPS
current_time = time.time()
fps = 1 / (current_time - prev_frame_time)
prev_frame_time = current_time
image = Image.fromarray(temp_frame)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
# Draw FPS on the image
image = draw_fps(image, fps)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
ROOT.update()
cap.release()
if modules.globals.video_processor == 'cv2':
cap.release()
else:
process.terminate()
PREVIEW.withdraw() # Close preview window when loop is finished

View File

@ -3,12 +3,11 @@
# Core dependencies
numpy==1.26.4
onnxruntime-silicon==1.16.3
opencv-python==4.8.1.78
pillow==9.5.0
insightface==0.7.3
torch==2.1.0 # Add the specific version you're using
tensorflow-macos==2.16.2 # Add the specific version you're using
tensorflow-metal==1.1.0 # Add the specific version you're using
torch==2.1.0
tensorflow-macos==2.16.2
tensorflow-metal==1.1.0
# Image processing
scikit-image==0.24.0
@ -22,6 +21,10 @@ tqdm==4.66.4
requests==2.32.3
prettytable==3.11.0
# Video processing (optional)
opencv-python==4.8.1.78 # Optional: for cv2 video processing
ffmpeg-python==0.2.0 # For ffmpeg video processing
# Optional dependencies (comment out if not needed)
# albumentations==1.4.13
# coloredlogs==15.0.1