Improve FPS, blending, maintainability, and UI/UX for Deep-Live-Cam; address all review and Sourcery feedback; ready for deployment

pull/1313/head
rehanbgmi 2025-05-26 15:37:02 +00:00
parent 348e6c424e
commit ba008e72eb
4 changed files with 130 additions and 116 deletions

View File

@ -9,12 +9,14 @@ HAIR_SEGMENTER_PROCESSOR = None
HAIR_SEGMENTER_MODEL = None
MODEL_NAME = "isjackwild/segformer-b0-finetuned-segments-skin-hair-clothing"
def segment_hair(image_np: np.ndarray) -> np.ndarray:
def segment_hair(image_np: np.ndarray, device: str = "cpu", hair_label_index: int = None) -> np.ndarray:
"""
Segments hair from an image.
Args:
image_np: NumPy array representing the image (BGR format from OpenCV).
device: Device to run the model on ("cpu" or "cuda").
hair_label_index: Optional; index of the hair label in the segmentation map. If not provided, will use model config or default to 2.
Returns:
NumPy array representing the binary hair mask.
@ -26,48 +28,38 @@ def segment_hair(image_np: np.ndarray) -> np.ndarray:
try:
HAIR_SEGMENTER_PROCESSOR = SegformerImageProcessor.from_pretrained(MODEL_NAME)
HAIR_SEGMENTER_MODEL = SegformerForSemanticSegmentation.from_pretrained(MODEL_NAME)
# Optional: Move model to GPU if available and if other models use GPU
# if torch.cuda.is_available():
# HAIR_SEGMENTER_MODEL = HAIR_SEGMENTER_MODEL.to('cuda')
# print("Hair segmentation model moved to GPU.")
print("Hair segmentation model and processor loaded successfully.")
HAIR_SEGMENTER_MODEL = HAIR_SEGMENTER_MODEL.to(device)
print(f"Hair segmentation model and processor loaded successfully. Model moved to device: {device}")
except Exception as e:
print(f"Failed to load hair segmentation model/processor: {e}")
# Return an empty mask compatible with expected output shape (H, W)
return np.zeros((image_np.shape[0], image_np.shape[1]), dtype=np.uint8)
# Ensure processor and model are loaded before proceeding
if HAIR_SEGMENTER_PROCESSOR is None or HAIR_SEGMENTER_MODEL is None:
print("Error: Hair segmentation models are not available.")
return np.zeros((image_np.shape[0], image_np.shape[1]), dtype=np.uint8)
# Convert BGR (OpenCV) to RGB (PIL)
image_rgb = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
image_pil = Image.fromarray(image_rgb)
inputs = HAIR_SEGMENTER_PROCESSOR(images=image_pil, return_tensors="pt")
# Optional: Move inputs to GPU if model is on GPU
# if HAIR_SEGMENTER_MODEL.device.type == 'cuda':
# inputs = inputs.to(HAIR_SEGMENTER_MODEL.device)
if device == "cuda" and hasattr(HAIR_SEGMENTER_MODEL, "device") and HAIR_SEGMENTER_MODEL.device.type == "cuda":
inputs = {k: v.to("cuda") for k, v in inputs.items()}
with torch.no_grad(): # Important for inference
with torch.no_grad():
outputs = HAIR_SEGMENTER_MODEL(**inputs)
logits = outputs.logits # Shape: batch_size, num_labels, height, width
# Upsample logits to original image size
logits = outputs.logits
upsampled_logits = torch.nn.functional.interpolate(
logits,
size=(image_np.shape[0], image_np.shape[1]), # H, W
size=(image_np.shape[0], image_np.shape[1]),
mode='bilinear',
align_corners=False
)
segmentation_map = upsampled_logits.argmax(dim=1).squeeze().cpu().numpy().astype(np.uint8)
# Label 2 is for hair in this model
return np.where(segmentation_map == 2, 255, 0).astype(np.uint8)
if hair_label_index is None:
hair_label_index = getattr(HAIR_SEGMENTER_MODEL, "hair_label_index", 2)
return np.where(segmentation_map == hair_label_index, 255, 0).astype(np.uint8)
if __name__ == '__main__':
# This is a conceptual test.

View File

@ -162,67 +162,44 @@ def swap_face(source_face_obj: Face, target_face: Face, source_frame_full: Frame
swapped_frame = face_swapper.get(temp_frame, target_face, source_face_obj, paste_back=True)
final_swapped_frame = swapped_frame
if modules.globals.enable_hair_swapping:
if not (source_face_obj.kps is not None and \
target_face.kps is not None and \
source_face_obj.kps.shape[0] >= 3 and \
target_face.kps.shape[0] >= 3):
def do_hair_blending():
if not (source_face_obj.kps is not None and target_face.kps is not None and source_face_obj.kps.shape[0] >= 3 and target_face.kps.shape[0] >= 3):
logging.warning(
f"Skipping hair blending due to insufficient keypoints. "
f"Source kps: {source_face_obj.kps.shape if source_face_obj.kps is not None else 'None'}, "
f"Target kps: {target_face.kps.shape if target_face.kps is not None else 'None'}."
)
else:
source_kps_float = source_face_obj.kps.astype(np.float32)
target_kps_float = target_face.kps.astype(np.float32)
matrix, _ = cv2.estimateAffinePartial2D(source_kps_float, target_kps_float, method=cv2.LMEDS)
return swapped_frame
source_kps_float = source_face_obj.kps.astype(np.float32)
target_kps_float = target_face.kps.astype(np.float32)
matrix, _ = cv2.estimateAffinePartial2D(source_kps_float, target_kps_float, method=cv2.LMEDS)
if matrix is None:
logging.warning("Failed to estimate affine transformation matrix for hair. Skipping hair blending.")
return swapped_frame
dsize = (temp_frame.shape[1], temp_frame.shape[0])
warped_material, warped_mask = _prepare_warped_source_material_and_mask(
source_face_obj, source_frame_full, matrix, dsize
)
if warped_material is not None and warped_mask is not None:
out = swapped_frame.copy()
color_corrected_material = apply_color_transfer(warped_material, out)
return _blend_material_onto_frame(out, color_corrected_material, warped_mask)
return swapped_frame
if matrix is None:
logging.warning("Failed to estimate affine transformation matrix for hair. Skipping hair blending.")
else:
dsize = (temp_frame.shape[1], temp_frame.shape[0]) # width, height
warped_material, warped_mask = _prepare_warped_source_material_and_mask(
source_face_obj, source_frame_full, matrix, dsize
)
if warped_material is not None and warped_mask is not None:
# Make a copy only now that we are sure we will modify it for hair.
final_swapped_frame = swapped_frame.copy()
color_corrected_material = apply_color_transfer(warped_material, final_swapped_frame) # Use final_swapped_frame for color context
final_swapped_frame = _blend_material_onto_frame(
final_swapped_frame,
color_corrected_material,
warped_mask
)
# Mouth Mask Logic (operates on final_swapped_frame)
if modules.globals.mouth_mask:
# If final_swapped_frame wasn't copied for hair, it needs to be copied now before mouth mask modification.
if final_swapped_frame is swapped_frame: # Check if it's still the same object
final_swapped_frame = swapped_frame.copy()
# Create a mask for the target face
def do_mouth_mask(frame):
out = frame.copy() if frame is swapped_frame else frame
face_mask = create_face_mask(target_face, temp_frame)
# Create the mouth mask
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = (
create_lower_mouth_mask(target_face, temp_frame)
)
# Apply the mouth area
# Apply to final_swapped_frame if hair blending happened, otherwise to swapped_frame
final_swapped_frame = apply_mouth_area(
final_swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
)
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = create_lower_mouth_mask(target_face, temp_frame)
out = apply_mouth_area(out, mouth_cutout, mouth_box, face_mask, lower_lip_polygon)
if modules.globals.show_mouth_mask_box:
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
final_swapped_frame = draw_mouth_mask_visualization(
final_swapped_frame, target_face, mouth_mask_data
)
out = draw_mouth_mask_visualization(out, target_face, mouth_mask_data)
return out
if modules.globals.enable_hair_swapping:
final_swapped_frame = do_hair_blending()
if modules.globals.mouth_mask:
final_swapped_frame = do_mouth_mask(final_swapped_frame)
if PROFILE_FACE_SWAP:
elapsed = time.time() - start_time
@ -293,10 +270,14 @@ def _process_live_target_v2(source_frame_full: Frame, temp_frame: Frame) -> Fram
return temp_frame
if modules.globals.many_faces:
source_face_obj = default_source_face()
if source_face_obj:
if source_face_obj := default_source_face():
swapped_faces = set()
for target_face in detected_faces:
face_id = id(target_face)
if face_id in swapped_faces:
continue
temp_frame = swap_face(source_face_obj, target_face, source_frame_full, temp_frame)
swapped_faces.add(face_id)
else: # not many_faces (apply simple_map logic)
if not modules.globals.simple_map or \
not modules.globals.simple_map.get("target_embeddings") or \

View File

@ -289,6 +289,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
show_fps_switch.place(relx=0.6, rely=0.75)
# Hair Swapping Switch (placed below "Show FPS" on the right column)
segmentation_model_available = getattr(modules.globals, "segmentation_model_available", True)
hair_swapping_value = ctk.BooleanVar(value=modules.globals.enable_hair_swapping)
hair_swapping_switch = ctk.CTkSwitch(
root,
@ -298,9 +299,10 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
command=lambda: (
setattr(modules.globals, "enable_hair_swapping", hair_swapping_value.get()),
save_switch_states(),
)
),
state="normal" if segmentation_model_available else "disabled"
)
hair_swapping_switch.place(relx=0.6, rely=0.80) # Adjusted rely from 0.75 to 0.80
hair_swapping_switch.place(relx=0.6, rely=0.80)
mouth_mask_var = ctk.BooleanVar(value=modules.globals.mouth_mask)
mouth_mask_switch = ctk.CTkSwitch(
@ -911,74 +913,82 @@ def create_webcam_preview(camera_index: int):
update_status("Error: No source image selected for webcam mode.")
cap.release()
PREVIEW.withdraw()
while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
time.sleep(0.05)
def wait_for_withdraw():
if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
PREVIEW.after(50, wait_for_withdraw)
wait_for_withdraw()
return
if not os.path.exists(modules.globals.source_path):
update_status(f"Error: Source image not found at {modules.globals.source_path}")
cap.release()
PREVIEW.withdraw()
while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
time.sleep(0.05)
def wait_for_withdraw():
if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
PREVIEW.after(50, wait_for_withdraw)
wait_for_withdraw()
return
source_frame_full_for_cam = cv2.imread(modules.globals.source_path)
if source_frame_full_for_cam is None:
update_status(f"Error: Could not read source image at {modules.globals.source_path}")
cap.release()
PREVIEW.withdraw()
while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
time.sleep(0.05)
def wait_for_withdraw():
if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
PREVIEW.after(50, wait_for_withdraw)
wait_for_withdraw()
return
source_face_obj_for_cam = get_one_face(source_frame_full_for_cam)
if source_face_obj_for_cam is None:
update_status(f"Error: No face detected in source image {modules.globals.source_path}")
# This error is less critical for stopping immediately, but we'll make it persistent too.
# The loop below will run, but processing for frames will effectively be skipped.
# For consistency in error handling, make it persistent.
cap.release()
PREVIEW.withdraw()
while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
time.sleep(0.05)
def wait_for_withdraw():
if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
PREVIEW.after(50, wait_for_withdraw)
wait_for_withdraw()
return
else: # modules.globals.map_faces is True
if not modules.globals.source_path:
update_status("Error: No global source image selected (for hair/background in map_faces mode).")
cap.release()
PREVIEW.withdraw()
while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
time.sleep(0.05)
def wait_for_withdraw():
if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
PREVIEW.after(50, wait_for_withdraw)
wait_for_withdraw()
return
if not os.path.exists(modules.globals.source_path):
update_status(f"Error: Source image (for hair/background) not found at {modules.globals.source_path}")
cap.release()
PREVIEW.withdraw()
while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
time.sleep(0.05)
def wait_for_withdraw():
if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
PREVIEW.after(50, wait_for_withdraw)
wait_for_withdraw()
return
source_frame_full_for_cam_map_faces = cv2.imread(modules.globals.source_path)
if source_frame_full_for_cam_map_faces is None:
update_status(f"Error: Could not read source image (for hair/background) at {modules.globals.source_path}")
cap.release()
PREVIEW.withdraw()
while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
time.sleep(0.05)
def wait_for_withdraw():
if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists():
ROOT.update_idletasks()
ROOT.update()
PREVIEW.after(50, wait_for_withdraw)
wait_for_withdraw()
return
if not modules.globals.source_target_map and not modules.globals.simple_map:

View File

@ -50,17 +50,48 @@ class VideoCapturer:
continue
else:
# Unix-like systems (Linux/Mac) capture method
backend = getattr(self, "camera_backend", None)
if backend is None:
import os
backend_env = os.environ.get("VIDEO_CAPTURE_BACKEND")
if backend_env is not None:
try:
backend = int(backend_env)
except ValueError:
backend = getattr(cv2, backend_env, None)
if platform.system() == "Darwin": # macOS
print("INFO: Attempting to use cv2.CAP_AVFOUNDATION for macOS camera.")
self.cap = cv2.VideoCapture(self.device_index, cv2.CAP_AVFOUNDATION)
tried_backends = []
if backend is not None:
print(f"INFO: Attempting to use user-specified backend {backend} for macOS camera.")
self.cap = cv2.VideoCapture(self.device_index, backend)
tried_backends.append(backend)
else:
print("INFO: Attempting to use cv2.CAP_AVFOUNDATION for macOS camera.")
self.cap = cv2.VideoCapture(self.device_index, cv2.CAP_AVFOUNDATION)
tried_backends.append(cv2.CAP_AVFOUNDATION)
if not self.cap or not self.cap.isOpened():
print("WARN: cv2.CAP_AVFOUNDATION failed to open camera. Trying default backend for macOS.")
# Release the failed attempt before trying again
print("WARN: First backend failed to open camera. Trying cv2.CAP_QT for macOS.")
if self.cap:
self.cap.release()
if cv2.CAP_QT not in tried_backends:
self.cap = cv2.VideoCapture(self.device_index, cv2.CAP_QT)
tried_backends.append(cv2.CAP_QT)
if not self.cap or not self.cap.isOpened():
print("WARN: cv2.CAP_QT failed to open camera. Trying default backend for macOS.")
if self.cap:
self.cap.release()
self.cap = cv2.VideoCapture(self.device_index) # Fallback to default
else: # Other Unix-like systems (e.g., Linux)
self.cap = cv2.VideoCapture(self.device_index)
if backend is not None:
print(f"INFO: Attempting to use user-specified backend {backend} for camera.")
self.cap = cv2.VideoCapture(self.device_index, backend)
if not self.cap or not self.cap.isOpened():
print("WARN: User-specified backend failed. Trying default backend.")
if self.cap:
self.cap.release()
self.cap = cv2.VideoCapture(self.device_index)
else:
self.cap = cv2.VideoCapture(self.device_index)
if not self.cap or not self.cap.isOpened():
raise RuntimeError("Failed to open camera")