Merge f7fdaaec20
into 745d449ca6
commit
075c9f4399
|
@ -19,7 +19,24 @@ import modules.globals
|
||||||
import modules.metadata
|
import modules.metadata
|
||||||
import modules.ui as ui
|
import modules.ui as ui
|
||||||
from modules.processors.frame.core import get_frame_processors_modules
|
from modules.processors.frame.core import get_frame_processors_modules
|
||||||
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
|
from modules.utilities import (
|
||||||
|
has_image_extension,
|
||||||
|
is_image,
|
||||||
|
is_video,
|
||||||
|
detect_fps,
|
||||||
|
create_video,
|
||||||
|
extract_frames,
|
||||||
|
get_temp_frame_paths,
|
||||||
|
restore_audio,
|
||||||
|
create_temp,
|
||||||
|
move_temp,
|
||||||
|
clean_temp,
|
||||||
|
normalize_output_path,
|
||||||
|
start_ffmpeg_writer,
|
||||||
|
get_temp_output_path,
|
||||||
|
)
|
||||||
|
import cv2
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
|
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
|
||||||
del torch
|
del torch
|
||||||
|
@ -175,6 +192,47 @@ def update_status(message: str, scope: str = 'DLC.CORE') -> None:
|
||||||
if not modules.globals.headless:
|
if not modules.globals.headless:
|
||||||
ui.update_status(message)
|
ui.update_status(message)
|
||||||
|
|
||||||
|
|
||||||
|
def stream_video() -> None:
|
||||||
|
capture = cv2.VideoCapture(modules.globals.target_path)
|
||||||
|
if not capture.isOpened():
|
||||||
|
update_status('Failed to open video file.')
|
||||||
|
return
|
||||||
|
fps = capture.get(cv2.CAP_PROP_FPS) if modules.globals.keep_fps else 30.0
|
||||||
|
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
|
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
|
total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||||
|
|
||||||
|
update_status('Creating temp resources...')
|
||||||
|
create_temp(modules.globals.target_path)
|
||||||
|
temp_output_path = get_temp_output_path(modules.globals.target_path)
|
||||||
|
writer = start_ffmpeg_writer(width, height, fps, temp_output_path)
|
||||||
|
|
||||||
|
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
|
||||||
|
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
|
||||||
|
progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory})
|
||||||
|
while True:
|
||||||
|
ret, frame = capture.read()
|
||||||
|
if not ret:
|
||||||
|
break
|
||||||
|
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||||
|
frame = frame_processor.process_frame_stream(modules.globals.source_path, frame)
|
||||||
|
writer.stdin.write(frame.tobytes())
|
||||||
|
progress.update(1)
|
||||||
|
|
||||||
|
capture.release()
|
||||||
|
writer.stdin.close()
|
||||||
|
exit_code = writer.wait()
|
||||||
|
if exit_code != 0:
|
||||||
|
raise RuntimeError(f"ffmpeg writer exited with non-zero status: {exit_code}")
|
||||||
|
|
||||||
|
if modules.globals.keep_audio:
|
||||||
|
update_status('Restoring audio...')
|
||||||
|
restore_audio(modules.globals.target_path, modules.globals.output_path)
|
||||||
|
else:
|
||||||
|
move_temp(modules.globals.target_path, modules.globals.output_path)
|
||||||
|
clean_temp(modules.globals.target_path)
|
||||||
|
|
||||||
def start() -> None:
|
def start() -> None:
|
||||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||||
if not frame_processor.pre_start():
|
if not frame_processor.pre_start():
|
||||||
|
@ -202,10 +260,17 @@ def start() -> None:
|
||||||
return
|
return
|
||||||
|
|
||||||
if not modules.globals.map_faces:
|
if not modules.globals.map_faces:
|
||||||
update_status('Creating temp resources...')
|
stream_video()
|
||||||
create_temp(modules.globals.target_path)
|
if is_video(modules.globals.target_path):
|
||||||
update_status('Extracting frames...')
|
update_status('Processing to video succeed!')
|
||||||
extract_frames(modules.globals.target_path)
|
else:
|
||||||
|
update_status('Processing to video failed!')
|
||||||
|
return
|
||||||
|
|
||||||
|
update_status('Creating temp resources...')
|
||||||
|
create_temp(modules.globals.target_path)
|
||||||
|
update_status('Extracting frames...')
|
||||||
|
extract_frames(modules.globals.target_path)
|
||||||
|
|
||||||
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
|
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
|
||||||
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
|
||||||
|
|
|
@ -14,7 +14,8 @@ FRAME_PROCESSORS_INTERFACE = [
|
||||||
'pre_start',
|
'pre_start',
|
||||||
'process_frame',
|
'process_frame',
|
||||||
'process_image',
|
'process_image',
|
||||||
'process_video'
|
'process_video',
|
||||||
|
'process_frame_stream'
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -48,17 +48,6 @@ def pre_start() -> bool:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
TENSORRT_AVAILABLE = False
|
|
||||||
try:
|
|
||||||
import torch_tensorrt
|
|
||||||
TENSORRT_AVAILABLE = True
|
|
||||||
except ImportError as im:
|
|
||||||
print(f"TensorRT is not available: {im}")
|
|
||||||
pass
|
|
||||||
except Exception as e:
|
|
||||||
print(f"TensorRT is not available: {e}")
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_face_enhancer() -> Any:
|
def get_face_enhancer() -> Any:
|
||||||
global FACE_ENHANCER
|
global FACE_ENHANCER
|
||||||
|
|
||||||
|
@ -66,26 +55,16 @@ def get_face_enhancer() -> Any:
|
||||||
if FACE_ENHANCER is None:
|
if FACE_ENHANCER is None:
|
||||||
model_path = os.path.join(models_dir, "GFPGANv1.4.pth")
|
model_path = os.path.join(models_dir, "GFPGANv1.4.pth")
|
||||||
|
|
||||||
selected_device = None
|
match platform.system():
|
||||||
device_priority = []
|
case "Darwin": # Mac OS
|
||||||
|
if torch.backends.mps.is_available():
|
||||||
|
mps_device = torch.device("mps")
|
||||||
|
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=mps_device) # type: ignore[attr-defined]
|
||||||
|
else:
|
||||||
|
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
|
||||||
|
case _: # Other OS
|
||||||
|
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
|
||||||
|
|
||||||
if TENSORRT_AVAILABLE and torch.cuda.is_available():
|
|
||||||
selected_device = torch.device("cuda")
|
|
||||||
device_priority.append("TensorRT+CUDA")
|
|
||||||
elif torch.cuda.is_available():
|
|
||||||
selected_device = torch.device("cuda")
|
|
||||||
device_priority.append("CUDA")
|
|
||||||
elif torch.backends.mps.is_available() and platform.system() == "Darwin":
|
|
||||||
selected_device = torch.device("mps")
|
|
||||||
device_priority.append("MPS")
|
|
||||||
elif not torch.cuda.is_available():
|
|
||||||
selected_device = torch.device("cpu")
|
|
||||||
device_priority.append("CPU")
|
|
||||||
|
|
||||||
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device)
|
|
||||||
|
|
||||||
# for debug:
|
|
||||||
print(f"Selected device: {selected_device} and device priority: {device_priority}")
|
|
||||||
return FACE_ENHANCER
|
return FACE_ENHANCER
|
||||||
|
|
||||||
|
|
||||||
|
@ -128,3 +107,8 @@ def process_frame_v2(temp_frame: Frame) -> Frame:
|
||||||
if target_face:
|
if target_face:
|
||||||
temp_frame = enhance_face(temp_frame)
|
temp_frame = enhance_face(temp_frame)
|
||||||
return temp_frame
|
return temp_frame
|
||||||
|
|
||||||
|
|
||||||
|
def process_frame_stream(source_path: str, frame: Frame) -> Frame:
|
||||||
|
return process_frame(None, frame)
|
||||||
|
|
||||||
|
|
|
@ -125,25 +125,21 @@ def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
|
||||||
if modules.globals.many_faces:
|
if modules.globals.many_faces:
|
||||||
source_face = default_source_face()
|
source_face = default_source_face()
|
||||||
for map in modules.globals.source_target_map:
|
for map in modules.globals.source_target_map:
|
||||||
target_face = map["target"]["face"]
|
target_face = map['target']['face']
|
||||||
temp_frame = swap_face(source_face, target_face, temp_frame)
|
temp_frame = swap_face(source_face, target_face, temp_frame)
|
||||||
|
|
||||||
elif not modules.globals.many_faces:
|
elif not modules.globals.many_faces:
|
||||||
for map in modules.globals.source_target_map:
|
for map in modules.globals.source_target_map:
|
||||||
if "source" in map:
|
if "source" in map:
|
||||||
source_face = map["source"]["face"]
|
source_face = map['source']['face']
|
||||||
target_face = map["target"]["face"]
|
target_face = map['target']['face']
|
||||||
temp_frame = swap_face(source_face, target_face, temp_frame)
|
temp_frame = swap_face(source_face, target_face, temp_frame)
|
||||||
|
|
||||||
elif is_video(modules.globals.target_path):
|
elif is_video(modules.globals.target_path):
|
||||||
if modules.globals.many_faces:
|
if modules.globals.many_faces:
|
||||||
source_face = default_source_face()
|
source_face = default_source_face()
|
||||||
for map in modules.globals.source_target_map:
|
for map in modules.globals.source_target_map:
|
||||||
target_frame = [
|
target_frame = [f for f in map['target_faces_in_frame'] if f['location'] == temp_frame_path]
|
||||||
f
|
|
||||||
for f in map["target_faces_in_frame"]
|
|
||||||
if f["location"] == temp_frame_path
|
|
||||||
]
|
|
||||||
|
|
||||||
for frame in target_frame:
|
for frame in target_frame:
|
||||||
for target_face in frame["faces"]:
|
for target_face in frame["faces"]:
|
||||||
|
@ -152,12 +148,8 @@ def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
|
||||||
elif not modules.globals.many_faces:
|
elif not modules.globals.many_faces:
|
||||||
for map in modules.globals.source_target_map:
|
for map in modules.globals.source_target_map:
|
||||||
if "source" in map:
|
if "source" in map:
|
||||||
target_frame = [
|
target_frame = [f for f in map['target_faces_in_frame'] if f['location'] == temp_frame_path]
|
||||||
f
|
source_face = map['source']['face']
|
||||||
for f in map["target_faces_in_frame"]
|
|
||||||
if f["location"] == temp_frame_path
|
|
||||||
]
|
|
||||||
source_face = map["source"]["face"]
|
|
||||||
|
|
||||||
for frame in target_frame:
|
for frame in target_frame:
|
||||||
for target_face in frame["faces"]:
|
for target_face in frame["faces"]:
|
||||||
|
@ -256,367 +248,26 @@ def process_image(source_path: str, target_path: str, output_path: str) -> None:
|
||||||
|
|
||||||
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
|
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
|
||||||
if modules.globals.map_faces and modules.globals.many_faces:
|
if modules.globals.map_faces and modules.globals.many_faces:
|
||||||
update_status(
|
update_status('Many faces enabled. Using first source image (if applicable in v2). Processing...', NAME)
|
||||||
"Many faces enabled. Using first source image. Progressing...", NAME
|
# The core processing logic is delegated, which is good.
|
||||||
)
|
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)
|
||||||
modules.processors.frame.core.process_video(
|
|
||||||
source_path, temp_frame_paths, process_frames
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def create_lower_mouth_mask(
|
STREAM_SOURCE_FACE = None
|
||||||
face: Face, frame: Frame
|
|
||||||
) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
|
|
||||||
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
|
||||||
mouth_cutout = None
|
|
||||||
landmarks = face.landmark_2d_106
|
|
||||||
if landmarks is not None:
|
|
||||||
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
|
||||||
lower_lip_order = [
|
|
||||||
65,
|
|
||||||
66,
|
|
||||||
62,
|
|
||||||
70,
|
|
||||||
69,
|
|
||||||
18,
|
|
||||||
19,
|
|
||||||
20,
|
|
||||||
21,
|
|
||||||
22,
|
|
||||||
23,
|
|
||||||
24,
|
|
||||||
0,
|
|
||||||
8,
|
|
||||||
7,
|
|
||||||
6,
|
|
||||||
5,
|
|
||||||
4,
|
|
||||||
3,
|
|
||||||
2,
|
|
||||||
65,
|
|
||||||
]
|
|
||||||
lower_lip_landmarks = landmarks[lower_lip_order].astype(
|
|
||||||
np.float32
|
|
||||||
) # Use float for precise calculations
|
|
||||||
|
|
||||||
# Calculate the center of the landmarks
|
|
||||||
center = np.mean(lower_lip_landmarks, axis=0)
|
|
||||||
|
|
||||||
# Expand the landmarks outward
|
|
||||||
expansion_factor = (
|
|
||||||
1 + modules.globals.mask_down_size
|
|
||||||
) # Adjust this for more or less expansion
|
|
||||||
expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center
|
|
||||||
|
|
||||||
# Extend the top lip part
|
|
||||||
toplip_indices = [
|
|
||||||
20,
|
|
||||||
0,
|
|
||||||
1,
|
|
||||||
2,
|
|
||||||
3,
|
|
||||||
4,
|
|
||||||
5,
|
|
||||||
] # Indices for landmarks 2, 65, 66, 62, 70, 69, 18
|
|
||||||
toplip_extension = (
|
|
||||||
modules.globals.mask_size * 0.5
|
|
||||||
) # Adjust this factor to control the extension
|
|
||||||
for idx in toplip_indices:
|
|
||||||
direction = expanded_landmarks[idx] - center
|
|
||||||
direction = direction / np.linalg.norm(direction)
|
|
||||||
expanded_landmarks[idx] += direction * toplip_extension
|
|
||||||
|
|
||||||
# Extend the bottom part (chin area)
|
|
||||||
chin_indices = [
|
|
||||||
11,
|
|
||||||
12,
|
|
||||||
13,
|
|
||||||
14,
|
|
||||||
15,
|
|
||||||
16,
|
|
||||||
] # Indices for landmarks 21, 22, 23, 24, 0, 8
|
|
||||||
chin_extension = 2 * 0.2 # Adjust this factor to control the extension
|
|
||||||
for idx in chin_indices:
|
|
||||||
expanded_landmarks[idx][1] += (
|
|
||||||
expanded_landmarks[idx][1] - center[1]
|
|
||||||
) * chin_extension
|
|
||||||
|
|
||||||
# Convert back to integer coordinates
|
|
||||||
expanded_landmarks = expanded_landmarks.astype(np.int32)
|
|
||||||
|
|
||||||
# Calculate bounding box for the expanded lower mouth
|
|
||||||
min_x, min_y = np.min(expanded_landmarks, axis=0)
|
|
||||||
max_x, max_y = np.max(expanded_landmarks, axis=0)
|
|
||||||
|
|
||||||
# Add some padding to the bounding box
|
|
||||||
padding = int((max_x - min_x) * 0.1) # 10% padding
|
|
||||||
min_x = max(0, min_x - padding)
|
|
||||||
min_y = max(0, min_y - padding)
|
|
||||||
max_x = min(frame.shape[1], max_x + padding)
|
|
||||||
max_y = min(frame.shape[0], max_y + padding)
|
|
||||||
|
|
||||||
# Ensure the bounding box dimensions are valid
|
|
||||||
if max_x <= min_x or max_y <= min_y:
|
|
||||||
if (max_x - min_x) <= 1:
|
|
||||||
max_x = min_x + 1
|
|
||||||
if (max_y - min_y) <= 1:
|
|
||||||
max_y = min_y + 1
|
|
||||||
|
|
||||||
# Create the mask
|
|
||||||
mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8)
|
|
||||||
cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255)
|
|
||||||
|
|
||||||
# Apply Gaussian blur to soften the mask edges
|
|
||||||
mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5)
|
|
||||||
|
|
||||||
# Place the mask ROI in the full-sized mask
|
|
||||||
mask[min_y:max_y, min_x:max_x] = mask_roi
|
|
||||||
|
|
||||||
# Extract the masked area from the frame
|
|
||||||
mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
|
|
||||||
|
|
||||||
# Return the expanded lower lip polygon in original frame coordinates
|
|
||||||
lower_lip_polygon = expanded_landmarks
|
|
||||||
|
|
||||||
return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon
|
|
||||||
|
|
||||||
|
|
||||||
def draw_mouth_mask_visualization(
|
def process_frame_stream(source_path: str, frame: Frame) -> Frame:
|
||||||
frame: Frame, face: Face, mouth_mask_data: tuple
|
global STREAM_SOURCE_FACE
|
||||||
) -> Frame:
|
if modules.globals.map_faces:
|
||||||
landmarks = face.landmark_2d_106
|
result = process_frame_v2(frame)
|
||||||
if landmarks is not None and mouth_mask_data is not None:
|
if result is not None:
|
||||||
mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = (
|
return result
|
||||||
mouth_mask_data
|
else:
|
||||||
)
|
return frame # Fallback to original frame if process_frame_v2 returns None
|
||||||
|
if STREAM_SOURCE_FACE is None:
|
||||||
vis_frame = frame.copy()
|
source_img = cv2.imread(source_path)
|
||||||
|
if source_img is not None:
|
||||||
# Ensure coordinates are within frame bounds
|
STREAM_SOURCE_FACE = get_one_face(source_img)
|
||||||
height, width = vis_frame.shape[:2]
|
if STREAM_SOURCE_FACE is not None:
|
||||||
min_x, min_y = max(0, min_x), max(0, min_y)
|
return process_frame(STREAM_SOURCE_FACE, frame)
|
||||||
max_x, max_y = min(width, max_x), min(height, max_y)
|
|
||||||
|
|
||||||
# Adjust mask to match the region size
|
|
||||||
mask_region = mask[0 : max_y - min_y, 0 : max_x - min_x]
|
|
||||||
|
|
||||||
# Remove the color mask overlay
|
|
||||||
# color_mask = cv2.applyColorMap((mask_region * 255).astype(np.uint8), cv2.COLORMAP_JET)
|
|
||||||
|
|
||||||
# Ensure shapes match before blending
|
|
||||||
vis_region = vis_frame[min_y:max_y, min_x:max_x]
|
|
||||||
# Remove blending with color_mask
|
|
||||||
# if vis_region.shape[:2] == color_mask.shape[:2]:
|
|
||||||
# blended = cv2.addWeighted(vis_region, 0.7, color_mask, 0.3, 0)
|
|
||||||
# vis_frame[min_y:max_y, min_x:max_x] = blended
|
|
||||||
|
|
||||||
# Draw the lower lip polygon
|
|
||||||
cv2.polylines(vis_frame, [lower_lip_polygon], True, (0, 255, 0), 2)
|
|
||||||
|
|
||||||
# Remove the red box
|
|
||||||
# cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 2)
|
|
||||||
|
|
||||||
# Visualize the feathered mask
|
|
||||||
feather_amount = max(
|
|
||||||
1,
|
|
||||||
min(
|
|
||||||
30,
|
|
||||||
(max_x - min_x) // modules.globals.mask_feather_ratio,
|
|
||||||
(max_y - min_y) // modules.globals.mask_feather_ratio,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
# Ensure kernel size is odd
|
|
||||||
kernel_size = 2 * feather_amount + 1
|
|
||||||
feathered_mask = cv2.GaussianBlur(
|
|
||||||
mask_region.astype(float), (kernel_size, kernel_size), 0
|
|
||||||
)
|
|
||||||
feathered_mask = (feathered_mask / feathered_mask.max() * 255).astype(np.uint8)
|
|
||||||
# Remove the feathered mask color overlay
|
|
||||||
# color_feathered_mask = cv2.applyColorMap(feathered_mask, cv2.COLORMAP_VIRIDIS)
|
|
||||||
|
|
||||||
# Ensure shapes match before blending feathered mask
|
|
||||||
# if vis_region.shape == color_feathered_mask.shape:
|
|
||||||
# blended_feathered = cv2.addWeighted(vis_region, 0.7, color_feathered_mask, 0.3, 0)
|
|
||||||
# vis_frame[min_y:max_y, min_x:max_x] = blended_feathered
|
|
||||||
|
|
||||||
# Add labels
|
|
||||||
cv2.putText(
|
|
||||||
vis_frame,
|
|
||||||
"Lower Mouth Mask",
|
|
||||||
(min_x, min_y - 10),
|
|
||||||
cv2.FONT_HERSHEY_SIMPLEX,
|
|
||||||
0.5,
|
|
||||||
(255, 255, 255),
|
|
||||||
1,
|
|
||||||
)
|
|
||||||
cv2.putText(
|
|
||||||
vis_frame,
|
|
||||||
"Feathered Mask",
|
|
||||||
(min_x, max_y + 20),
|
|
||||||
cv2.FONT_HERSHEY_SIMPLEX,
|
|
||||||
0.5,
|
|
||||||
(255, 255, 255),
|
|
||||||
1,
|
|
||||||
)
|
|
||||||
|
|
||||||
return vis_frame
|
|
||||||
return frame
|
return frame
|
||||||
|
|
||||||
|
|
||||||
def apply_mouth_area(
|
|
||||||
frame: np.ndarray,
|
|
||||||
mouth_cutout: np.ndarray,
|
|
||||||
mouth_box: tuple,
|
|
||||||
face_mask: np.ndarray,
|
|
||||||
mouth_polygon: np.ndarray,
|
|
||||||
) -> np.ndarray:
|
|
||||||
min_x, min_y, max_x, max_y = mouth_box
|
|
||||||
box_width = max_x - min_x
|
|
||||||
box_height = max_y - min_y
|
|
||||||
|
|
||||||
if (
|
|
||||||
mouth_cutout is None
|
|
||||||
or box_width is None
|
|
||||||
or box_height is None
|
|
||||||
or face_mask is None
|
|
||||||
or mouth_polygon is None
|
|
||||||
):
|
|
||||||
return frame
|
|
||||||
|
|
||||||
try:
|
|
||||||
resized_mouth_cutout = cv2.resize(mouth_cutout, (box_width, box_height))
|
|
||||||
roi = frame[min_y:max_y, min_x:max_x]
|
|
||||||
|
|
||||||
if roi.shape != resized_mouth_cutout.shape:
|
|
||||||
resized_mouth_cutout = cv2.resize(
|
|
||||||
resized_mouth_cutout, (roi.shape[1], roi.shape[0])
|
|
||||||
)
|
|
||||||
|
|
||||||
color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi)
|
|
||||||
|
|
||||||
# Use the provided mouth polygon to create the mask
|
|
||||||
polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8)
|
|
||||||
adjusted_polygon = mouth_polygon - [min_x, min_y]
|
|
||||||
cv2.fillPoly(polygon_mask, [adjusted_polygon], 255)
|
|
||||||
|
|
||||||
# Apply feathering to the polygon mask
|
|
||||||
feather_amount = min(
|
|
||||||
30,
|
|
||||||
box_width // modules.globals.mask_feather_ratio,
|
|
||||||
box_height // modules.globals.mask_feather_ratio,
|
|
||||||
)
|
|
||||||
feathered_mask = cv2.GaussianBlur(
|
|
||||||
polygon_mask.astype(float), (0, 0), feather_amount
|
|
||||||
)
|
|
||||||
feathered_mask = feathered_mask / feathered_mask.max()
|
|
||||||
|
|
||||||
face_mask_roi = face_mask[min_y:max_y, min_x:max_x]
|
|
||||||
combined_mask = feathered_mask * (face_mask_roi / 255.0)
|
|
||||||
|
|
||||||
combined_mask = combined_mask[:, :, np.newaxis]
|
|
||||||
blended = (
|
|
||||||
color_corrected_mouth * combined_mask + roi * (1 - combined_mask)
|
|
||||||
).astype(np.uint8)
|
|
||||||
|
|
||||||
# Apply face mask to blended result
|
|
||||||
face_mask_3channel = (
|
|
||||||
np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0
|
|
||||||
)
|
|
||||||
final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel)
|
|
||||||
|
|
||||||
frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8)
|
|
||||||
except Exception as e:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return frame
|
|
||||||
|
|
||||||
|
|
||||||
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
|
||||||
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
|
||||||
landmarks = face.landmark_2d_106
|
|
||||||
if landmarks is not None:
|
|
||||||
# Convert landmarks to int32
|
|
||||||
landmarks = landmarks.astype(np.int32)
|
|
||||||
|
|
||||||
# Extract facial features
|
|
||||||
right_side_face = landmarks[0:16]
|
|
||||||
left_side_face = landmarks[17:32]
|
|
||||||
right_eye = landmarks[33:42]
|
|
||||||
right_eye_brow = landmarks[43:51]
|
|
||||||
left_eye = landmarks[87:96]
|
|
||||||
left_eye_brow = landmarks[97:105]
|
|
||||||
|
|
||||||
# Calculate forehead extension
|
|
||||||
right_eyebrow_top = np.min(right_eye_brow[:, 1])
|
|
||||||
left_eyebrow_top = np.min(left_eye_brow[:, 1])
|
|
||||||
eyebrow_top = min(right_eyebrow_top, left_eyebrow_top)
|
|
||||||
|
|
||||||
face_top = np.min([right_side_face[0, 1], left_side_face[-1, 1]])
|
|
||||||
forehead_height = face_top - eyebrow_top
|
|
||||||
extended_forehead_height = int(forehead_height * 5.0) # Extend by 50%
|
|
||||||
|
|
||||||
# Create forehead points
|
|
||||||
forehead_left = right_side_face[0].copy()
|
|
||||||
forehead_right = left_side_face[-1].copy()
|
|
||||||
forehead_left[1] -= extended_forehead_height
|
|
||||||
forehead_right[1] -= extended_forehead_height
|
|
||||||
|
|
||||||
# Combine all points to create the face outline
|
|
||||||
face_outline = np.vstack(
|
|
||||||
[
|
|
||||||
[forehead_left],
|
|
||||||
right_side_face,
|
|
||||||
left_side_face[
|
|
||||||
::-1
|
|
||||||
], # Reverse left side to create a continuous outline
|
|
||||||
[forehead_right],
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Calculate padding
|
|
||||||
padding = int(
|
|
||||||
np.linalg.norm(right_side_face[0] - left_side_face[-1]) * 0.05
|
|
||||||
) # 5% of face width
|
|
||||||
|
|
||||||
# Create a slightly larger convex hull for padding
|
|
||||||
hull = cv2.convexHull(face_outline)
|
|
||||||
hull_padded = []
|
|
||||||
for point in hull:
|
|
||||||
x, y = point[0]
|
|
||||||
center = np.mean(face_outline, axis=0)
|
|
||||||
direction = np.array([x, y]) - center
|
|
||||||
direction = direction / np.linalg.norm(direction)
|
|
||||||
padded_point = np.array([x, y]) + direction * padding
|
|
||||||
hull_padded.append(padded_point)
|
|
||||||
|
|
||||||
hull_padded = np.array(hull_padded, dtype=np.int32)
|
|
||||||
|
|
||||||
# Fill the padded convex hull
|
|
||||||
cv2.fillConvexPoly(mask, hull_padded, 255)
|
|
||||||
|
|
||||||
# Smooth the mask edges
|
|
||||||
mask = cv2.GaussianBlur(mask, (5, 5), 3)
|
|
||||||
|
|
||||||
return mask
|
|
||||||
|
|
||||||
|
|
||||||
def apply_color_transfer(source, target):
|
|
||||||
"""
|
|
||||||
Apply color transfer from target to source image
|
|
||||||
"""
|
|
||||||
source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
|
|
||||||
target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
|
|
||||||
|
|
||||||
source_mean, source_std = cv2.meanStdDev(source)
|
|
||||||
target_mean, target_std = cv2.meanStdDev(target)
|
|
||||||
|
|
||||||
# Reshape mean and std to be broadcastable
|
|
||||||
source_mean = source_mean.reshape(1, 1, 3)
|
|
||||||
source_std = source_std.reshape(1, 1, 3)
|
|
||||||
target_mean = target_mean.reshape(1, 1, 3)
|
|
||||||
target_std = target_std.reshape(1, 1, 3)
|
|
||||||
|
|
||||||
# Perform the color transfer
|
|
||||||
source = (source - source_mean) * (target_std / source_std) + target_mean
|
|
||||||
|
|
||||||
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
|
|
||||||
|
|
|
@ -38,6 +38,39 @@ def run_ffmpeg(args: List[str]) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def start_ffmpeg_writer(width: int, height: int, fps: float, output_path: str) -> subprocess.Popen:
|
||||||
|
# Pass all arguments as a list to avoid shell injection
|
||||||
|
commands = [
|
||||||
|
"ffmpeg",
|
||||||
|
"-hide_banner",
|
||||||
|
"-hwaccel",
|
||||||
|
"auto",
|
||||||
|
"-loglevel",
|
||||||
|
str(modules.globals.log_level),
|
||||||
|
"-f",
|
||||||
|
"rawvideo",
|
||||||
|
"-pix_fmt",
|
||||||
|
"bgr24",
|
||||||
|
"-s",
|
||||||
|
f"{width}x{height}",
|
||||||
|
"-r",
|
||||||
|
str(fps),
|
||||||
|
"-i",
|
||||||
|
"-",
|
||||||
|
"-c:v",
|
||||||
|
str(modules.globals.video_encoder),
|
||||||
|
"-crf",
|
||||||
|
str(modules.globals.video_quality),
|
||||||
|
"-pix_fmt",
|
||||||
|
"yuv420p",
|
||||||
|
"-vf",
|
||||||
|
"colorspace=bt709:iall=bt601-6-625:fast=1",
|
||||||
|
"-y",
|
||||||
|
str(output_path),
|
||||||
|
]
|
||||||
|
return subprocess.Popen(commands, stdin=subprocess.PIPE)
|
||||||
|
|
||||||
|
|
||||||
def detect_fps(target_path: str) -> float:
|
def detect_fps(target_path: str) -> float:
|
||||||
command = [
|
command = [
|
||||||
"ffprobe",
|
"ffprobe",
|
||||||
|
|
Loading…
Reference in New Issue