FIX: Flake8 Violations

pull/391/head
undeƒined 2024-08-18 03:58:26 +02:00
parent 9a4b5d6405
commit 5a5a38b501
7 changed files with 265 additions and 54 deletions

View File

@ -19,7 +19,8 @@ import modules.globals
import modules.metadata
import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, \
get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
@ -31,20 +32,104 @@ warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
program.add_argument(
'-s',
'--source',
help='select an source image',
dest='source_path'
)
program.add_argument(
'-t',
'--target',
help='select an target image or video',
dest='target_path'
)
program.add_argument(
'-o',
'--output',
help='select output file or directory',
dest='output_path'
)
program.add_argument(
'--frame-processor',
help='pipeline of frame processors',
dest='frame_processor',
default=['face_swapper'],
choices=['face_swapper', 'face_enhancer'],
nargs='+'
)
program.add_argument(
'--keep-fps',
help='keep original fps',
dest='keep_fps',
action='store_true',
default=False
)
program.add_argument(
'--keep-audio',
help='keep original audio',
dest='keep_audio',
action='store_true',
default=True
)
program.add_argument(
'--keep-frames',
help='keep temporary frames',
dest='keep_frames',
action='store_true',
default=False
)
program.add_argument(
'--many-faces',
help='process every face',
dest='many_faces',
action='store_true',
default=False
)
program.add_argument(
'--video-encoder',
help='adjust output video encoder',
dest='video_encoder',
default='libx264',
choices=['libx264', 'libx265', 'libvpx-vp9']
)
program.add_argument(
'--video-quality',
help='adjust output video quality',
dest='video_quality',
type=int,
default=18,
choices=range(52),
metavar='[0-51]'
)
program.add_argument(
'--max-memory',
help='maximum amount of RAM in GB',
dest='max_memory',
type=int,
default=suggest_max_memory()
)
program.add_argument(
'--execution-provider',
help='execution provider',
dest='execution_provider',
default=['cpu'],
choices=suggest_execution_providers(),
nargs='+'
)
program.add_argument(
'--execution-threads',
help='number of execution threads',
dest='execution_threads',
type=int,
default=suggest_execution_threads()
)
program.add_argument(
'-v',
'--version',
action='version',
version=f'{modules.metadata.name} {modules.metadata.version}'
)
# register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
@ -69,7 +154,7 @@ def parse_args() -> None:
modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
modules.globals.execution_threads = args.execution_threads
#for ENHANCER tumbler:
# for ENHANCER tumbler:
if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
else:
@ -81,7 +166,10 @@ def parse_args() -> None:
if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
modules.globals.source_path = args.source_path_deprecated
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path)
modules.globals.output_path = normalize_output_path(
args.source_path_deprecated,
modules.globals.target_path, args.output_path
)
if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.cpu_cores_deprecated
@ -104,8 +192,14 @@ def encode_execution_providers(execution_providers: List[str]) -> List[str]:
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
return [
provider for provider, encoded_execution_provider in zip(
onnxruntime.get_available_providers(),
encode_execution_providers(onnxruntime.get_available_providers()),
strict=False
)
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)
]
def suggest_max_memory() -> int:
@ -165,13 +259,14 @@ def update_status(message: str, scope: str = 'DLC.CORE') -> None:
if not modules.globals.headless:
ui.update_status(message)
def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start():
return
# process image to image
if has_image_extension(modules.globals.target_path):
if modules.globals.nsfw == False:
if not modules.globals.nsfw:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy()
@ -186,7 +281,7 @@ def start() -> None:
update_status('Processing to image failed!')
return
# process image to videos
if modules.globals.nsfw == False:
if not modules.globals.nsfw:
from modules.predicter import predict_video
if predict_video(modules.globals.target_path):
destroy()

View File

@ -5,8 +5,8 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
WORKFLOW_DIR = os.path.join(ROOT_DIR, 'workflow')
file_types = [
('Image', ('*.png','*.jpg','*.jpeg','*.gif','*.bmp')),
('Video', ('*.mp4','*.mkv'))
('Image', ('*.png', '*.jpg', '*.jpeg', '*.gif', '*.bmp')),
('Video', ('*.mp4', '*.mkv'))
]
source_path = None
@ -27,4 +27,4 @@ log_level = 'error'
fp_ui: Dict[str, bool] = {}
nsfw = None
camera_input_combobox = None
webcam_preview_running = False
webcam_preview_running = False

View File

@ -6,7 +6,7 @@ from typing import Any, List, Callable
from tqdm import tqdm
import modules
import modules.globals
import modules.globals
FRAME_PROCESSORS_MODULES: List[ModuleType] = []
FRAME_PROCESSORS_INTERFACE = [
@ -40,22 +40,29 @@ def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType
set_frame_processors_modules_from_ui(frame_processors)
return FRAME_PROCESSORS_MODULES
def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
global FRAME_PROCESSORS_MODULES
for frame_processor, state in modules.globals.fp_ui.items():
if state == True and frame_processor not in frame_processors:
if state and frame_processor not in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
modules.globals.frame_processors.append(frame_processor)
if state == False:
if not state:
try:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.remove(frame_processor_module)
modules.globals.frame_processors.remove(frame_processor)
except:
except: # noqa: B001
pass
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
def multi_process_frame(
source_path: str,
temp_frame_paths: List[str],
process_frames: Callable[[str, List[str], Any], None],
progress: Any = None
) -> None:
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
futures = []
for path in temp_frame_paths:
@ -69,5 +76,9 @@ def process_video(source_path: str, frame_paths: list[str], process_frames: Call
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory})
progress.set_postfix({
'execution_providers': modules.globals.execution_providers,
'execution_threads': modules.globals.execution_threads,
'max_memory': modules.globals.max_memory
})
multi_process_frame(source_path, frame_paths, process_frames, progress)

View File

@ -18,7 +18,7 @@ NAME = 'DLC.FACE-ENHANCER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('..\models')
download_directory_path = resolve_relative_path('..\\models')
conditional_download(download_directory_path, ['https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth'])
return True
@ -36,11 +36,11 @@ def get_face_enhancer() -> Any:
with THREAD_LOCK:
if FACE_ENHANCER is None:
if os.name == 'nt':
model_path = resolve_relative_path('..\models\GFPGANv1.4.pth')
model_path = resolve_relative_path('..\\models\\GFPGANv1.4.pth')
# todo: set models path https://github.com/TencentARC/GFPGAN/issues/399
else:
model_path = resolve_relative_path('../models/GFPGANv1.4.pth')
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
return FACE_ENHANCER

View File

@ -17,7 +17,10 @@ NAME = 'DLC.FACE-SWAPPER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx'])
conditional_download(
download_directory_path,
['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx']
)
return True

View File

@ -68,29 +68,65 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
select_target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps))
keep_fps_checkbox = ctk.CTkSwitch(
root,
text='Keep fps',
variable=keep_fps_value,
cursor='hand2',
command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps)
)
keep_fps_checkbox.place(relx=0.1, rely=0.6)
keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames)
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch = ctk.CTkSwitch(
root,
text='Keep frames',
variable=keep_frames_value,
cursor='hand2',
command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get())
)
keep_frames_switch.place(relx=0.1, rely=0.65)
# for FRAME PROCESSOR ENHANCER tumbler:
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get()))
enhancer_switch = ctk.CTkSwitch(
root,
text='Face Enhancer',
variable=enhancer_value,
cursor='hand2',
command=lambda: update_tumbler('face_enhancer', enhancer_value.get())
)
enhancer_switch.place(relx=0.1, rely=0.7)
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_switch = ctk.CTkSwitch(
root,
text='Keep audio',
variable=keep_audio_value,
cursor='hand2',
command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get())
)
keep_audio_switch.place(relx=0.6, rely=0.6)
many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces)
many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get()))
many_faces_switch = ctk.CTkSwitch(
root,
text='Many faces',
variable=many_faces_value,
cursor='hand2',
command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get())
)
many_faces_switch.place(relx=0.6, rely=0.65)
# nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw)
# nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
# nsfw_switch.place(relx=0.6, rely=0.7)
# nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw)
# nsfw_switch = ctk.CTkSwitch(
# root,
# text='NSFW',
# variable=nsfw_value,
# cursor='hand2',
# command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get())
# )
# nsfw_switch.place(relx=0.6, rely=0.7)
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
@ -146,7 +182,11 @@ def select_source_path() -> None:
global RECENT_DIRECTORY_SOURCE, img_ft, vid_ft
PREVIEW.withdraw()
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
source_path = ctk.filedialog.askopenfilename(
title='select an source image',
initialdir=RECENT_DIRECTORY_SOURCE,
filetypes=[img_ft]
)
if is_image(source_path):
modules.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
@ -161,7 +201,11 @@ def select_target_path() -> None:
global RECENT_DIRECTORY_TARGET, img_ft, vid_ft
PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
target_path = ctk.filedialog.askopenfilename(
title='select an target image or video',
initialdir=RECENT_DIRECTORY_TARGET,
filetypes=[img_ft, vid_ft]
)
if is_image(target_path):
modules.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
@ -181,9 +225,21 @@ def select_output_path(start: Callable[[], None]) -> None:
global RECENT_DIRECTORY_OUTPUT, img_ft, vid_ft
if is_image(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
output_path = ctk.filedialog.asksaveasfilename(
title='save image output file',
filetypes=[img_ft],
defaultextension='.png',
initialfile='output.png',
initialdir=RECENT_DIRECTORY_OUTPUT
)
elif is_video(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
output_path = ctk.filedialog.asksaveasfilename(
title='save video output file',
filetypes=[vid_ft],
defaultextension='.mp4',
initialfile='output.mp4',
initialdir=RECENT_DIRECTORY_OUTPUT
)
else:
output_path = None
if output_path:
@ -235,7 +291,7 @@ def init_preview() -> None:
def update_preview(frame_number: int = 0) -> None:
if modules.globals.source_path and modules.globals.target_path:
temp_frame = get_video_frame(modules.globals.target_path, frame_number)
if modules.globals.nsfw == False:
if not modules.globals.nsfw:
from modules.predicter import predict_frame
if predict_frame(temp_frame):
quit()
@ -249,6 +305,7 @@ def update_preview(frame_number: int = 0) -> None:
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
def webcam_preview():
if modules.globals.source_path is None:
# No image selected
@ -256,7 +313,7 @@ def webcam_preview():
global preview_label, PREVIEW
cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
@ -280,7 +337,7 @@ def webcam_preview():
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
temp_frame = frame.copy() #Create a copy of the frame
temp_frame = frame.copy() # Create a copy of the frame
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)

View File

@ -32,7 +32,18 @@ def run_ffmpeg(args: List[str]) -> bool:
def detect_fps(target_path: str) -> float:
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
command = [
'ffprobe',
'-v',
'error',
'-select_streams',
'v:0',
'-show_entries',
'stream=r_frame_rate',
'-of',
'default=noprint_wrappers=1:nokey=1',
target_path
]
output = subprocess.check_output(command).decode().strip().split('/')
try:
numerator, denominator = map(int, output)
@ -50,12 +61,40 @@ def extract_frames(target_path: str) -> None:
def create_video(target_path: str, fps: float = 30.0) -> None:
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
run_ffmpeg([
'-r',
str(fps),
'-i',
os.path.join(temp_directory_path, '%04d.png'),
'-c:v',
modules.globals.video_encoder,
'-crf',
str(modules.globals.video_quality),
'-pix_fmt',
'yuv420p',
'-vf',
'colorspace=bt709:iall=bt601-6-625:fast=1',
'-y',
temp_output_path
])
def restore_audio(target_path: str, output_path: str) -> None:
temp_output_path = get_temp_output_path(target_path)
done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
done = run_ffmpeg([
'-i',
temp_output_path,
'-i',
target_path,
'-c:v',
'copy',
'-map',
'0:v:0',
'-map',
'1:a:0',
'-y',
output_path
])
if not done:
move_temp(target_path, output_path)
@ -131,10 +170,16 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None:
for url in urls:
download_file_path = os.path.join(download_directory_path, os.path.basename(url))
if not os.path.exists(download_file_path):
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get('Content-Length', 0))
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
urllib.request.urlretrieve(
url,
download_file_path,
reporthook=lambda count,
block_size,
total_size: progress.update(block_size)
) # type: ignore[attr-defined]
def resolve_relative_path(path: str) -> str: