diff --git a/modules/hair_segmenter.py b/modules/hair_segmenter.py index 69ae5f1..9d96a6b 100644 --- a/modules/hair_segmenter.py +++ b/modules/hair_segmenter.py @@ -9,12 +9,14 @@ HAIR_SEGMENTER_PROCESSOR = None HAIR_SEGMENTER_MODEL = None MODEL_NAME = "isjackwild/segformer-b0-finetuned-segments-skin-hair-clothing" -def segment_hair(image_np: np.ndarray) -> np.ndarray: +def segment_hair(image_np: np.ndarray, device: str = "cpu", hair_label_index: int = None) -> np.ndarray: """ Segments hair from an image. Args: image_np: NumPy array representing the image (BGR format from OpenCV). + device: Device to run the model on ("cpu" or "cuda"). + hair_label_index: Optional; index of the hair label in the segmentation map. If not provided, will use model config or default to 2. Returns: NumPy array representing the binary hair mask. @@ -26,48 +28,38 @@ def segment_hair(image_np: np.ndarray) -> np.ndarray: try: HAIR_SEGMENTER_PROCESSOR = SegformerImageProcessor.from_pretrained(MODEL_NAME) HAIR_SEGMENTER_MODEL = SegformerForSemanticSegmentation.from_pretrained(MODEL_NAME) - # Optional: Move model to GPU if available and if other models use GPU - # if torch.cuda.is_available(): - # HAIR_SEGMENTER_MODEL = HAIR_SEGMENTER_MODEL.to('cuda') - # print("Hair segmentation model moved to GPU.") - print("Hair segmentation model and processor loaded successfully.") + HAIR_SEGMENTER_MODEL = HAIR_SEGMENTER_MODEL.to(device) + print(f"Hair segmentation model and processor loaded successfully. Model moved to device: {device}") except Exception as e: print(f"Failed to load hair segmentation model/processor: {e}") - # Return an empty mask compatible with expected output shape (H, W) return np.zeros((image_np.shape[0], image_np.shape[1]), dtype=np.uint8) - # Ensure processor and model are loaded before proceeding if HAIR_SEGMENTER_PROCESSOR is None or HAIR_SEGMENTER_MODEL is None: print("Error: Hair segmentation models are not available.") return np.zeros((image_np.shape[0], image_np.shape[1]), dtype=np.uint8) - # Convert BGR (OpenCV) to RGB (PIL) image_rgb = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB) image_pil = Image.fromarray(image_rgb) inputs = HAIR_SEGMENTER_PROCESSOR(images=image_pil, return_tensors="pt") - - # Optional: Move inputs to GPU if model is on GPU - # if HAIR_SEGMENTER_MODEL.device.type == 'cuda': - # inputs = inputs.to(HAIR_SEGMENTER_MODEL.device) + if device == "cuda" and hasattr(HAIR_SEGMENTER_MODEL, "device") and HAIR_SEGMENTER_MODEL.device.type == "cuda": + inputs = {k: v.to("cuda") for k, v in inputs.items()} - with torch.no_grad(): # Important for inference + with torch.no_grad(): outputs = HAIR_SEGMENTER_MODEL(**inputs) - - logits = outputs.logits # Shape: batch_size, num_labels, height, width - # Upsample logits to original image size + logits = outputs.logits upsampled_logits = torch.nn.functional.interpolate( logits, - size=(image_np.shape[0], image_np.shape[1]), # H, W + size=(image_np.shape[0], image_np.shape[1]), mode='bilinear', align_corners=False ) - segmentation_map = upsampled_logits.argmax(dim=1).squeeze().cpu().numpy().astype(np.uint8) - # Label 2 is for hair in this model - return np.where(segmentation_map == 2, 255, 0).astype(np.uint8) + if hair_label_index is None: + hair_label_index = getattr(HAIR_SEGMENTER_MODEL, "hair_label_index", 2) + return np.where(segmentation_map == hair_label_index, 255, 0).astype(np.uint8) if __name__ == '__main__': # This is a conceptual test. diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 9f33e3a..85eb9b7 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -162,67 +162,44 @@ def swap_face(source_face_obj: Face, target_face: Face, source_frame_full: Frame swapped_frame = face_swapper.get(temp_frame, target_face, source_face_obj, paste_back=True) final_swapped_frame = swapped_frame - if modules.globals.enable_hair_swapping: - if not (source_face_obj.kps is not None and \ - target_face.kps is not None and \ - source_face_obj.kps.shape[0] >= 3 and \ - target_face.kps.shape[0] >= 3): + def do_hair_blending(): + if not (source_face_obj.kps is not None and target_face.kps is not None and source_face_obj.kps.shape[0] >= 3 and target_face.kps.shape[0] >= 3): logging.warning( f"Skipping hair blending due to insufficient keypoints. " f"Source kps: {source_face_obj.kps.shape if source_face_obj.kps is not None else 'None'}, " f"Target kps: {target_face.kps.shape if target_face.kps is not None else 'None'}." ) - else: - source_kps_float = source_face_obj.kps.astype(np.float32) - target_kps_float = target_face.kps.astype(np.float32) - matrix, _ = cv2.estimateAffinePartial2D(source_kps_float, target_kps_float, method=cv2.LMEDS) + return swapped_frame + source_kps_float = source_face_obj.kps.astype(np.float32) + target_kps_float = target_face.kps.astype(np.float32) + matrix, _ = cv2.estimateAffinePartial2D(source_kps_float, target_kps_float, method=cv2.LMEDS) + if matrix is None: + logging.warning("Failed to estimate affine transformation matrix for hair. Skipping hair blending.") + return swapped_frame + dsize = (temp_frame.shape[1], temp_frame.shape[0]) + warped_material, warped_mask = _prepare_warped_source_material_and_mask( + source_face_obj, source_frame_full, matrix, dsize + ) + if warped_material is not None and warped_mask is not None: + out = swapped_frame.copy() + color_corrected_material = apply_color_transfer(warped_material, out) + return _blend_material_onto_frame(out, color_corrected_material, warped_mask) + return swapped_frame - if matrix is None: - logging.warning("Failed to estimate affine transformation matrix for hair. Skipping hair blending.") - else: - dsize = (temp_frame.shape[1], temp_frame.shape[0]) # width, height - - warped_material, warped_mask = _prepare_warped_source_material_and_mask( - source_face_obj, source_frame_full, matrix, dsize - ) - - if warped_material is not None and warped_mask is not None: - # Make a copy only now that we are sure we will modify it for hair. - final_swapped_frame = swapped_frame.copy() - - color_corrected_material = apply_color_transfer(warped_material, final_swapped_frame) # Use final_swapped_frame for color context - - final_swapped_frame = _blend_material_onto_frame( - final_swapped_frame, - color_corrected_material, - warped_mask - ) - - # Mouth Mask Logic (operates on final_swapped_frame) - if modules.globals.mouth_mask: - # If final_swapped_frame wasn't copied for hair, it needs to be copied now before mouth mask modification. - if final_swapped_frame is swapped_frame: # Check if it's still the same object - final_swapped_frame = swapped_frame.copy() - - # Create a mask for the target face + def do_mouth_mask(frame): + out = frame.copy() if frame is swapped_frame else frame face_mask = create_face_mask(target_face, temp_frame) - - # Create the mouth mask - mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = ( - create_lower_mouth_mask(target_face, temp_frame) - ) - - # Apply the mouth area - # Apply to final_swapped_frame if hair blending happened, otherwise to swapped_frame - final_swapped_frame = apply_mouth_area( - final_swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon - ) - + mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = create_lower_mouth_mask(target_face, temp_frame) + out = apply_mouth_area(out, mouth_cutout, mouth_box, face_mask, lower_lip_polygon) if modules.globals.show_mouth_mask_box: mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) - final_swapped_frame = draw_mouth_mask_visualization( - final_swapped_frame, target_face, mouth_mask_data - ) + out = draw_mouth_mask_visualization(out, target_face, mouth_mask_data) + return out + + if modules.globals.enable_hair_swapping: + final_swapped_frame = do_hair_blending() + if modules.globals.mouth_mask: + final_swapped_frame = do_mouth_mask(final_swapped_frame) if PROFILE_FACE_SWAP: elapsed = time.time() - start_time @@ -293,10 +270,14 @@ def _process_live_target_v2(source_frame_full: Frame, temp_frame: Frame) -> Fram return temp_frame if modules.globals.many_faces: - source_face_obj = default_source_face() - if source_face_obj: + if source_face_obj := default_source_face(): + swapped_faces = set() for target_face in detected_faces: + face_id = id(target_face) + if face_id in swapped_faces: + continue temp_frame = swap_face(source_face_obj, target_face, source_frame_full, temp_frame) + swapped_faces.add(face_id) else: # not many_faces (apply simple_map logic) if not modules.globals.simple_map or \ not modules.globals.simple_map.get("target_embeddings") or \ diff --git a/modules/ui.py b/modules/ui.py index 6f50274..e38866f 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -289,6 +289,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C show_fps_switch.place(relx=0.6, rely=0.75) # Hair Swapping Switch (placed below "Show FPS" on the right column) + segmentation_model_available = getattr(modules.globals, "segmentation_model_available", True) hair_swapping_value = ctk.BooleanVar(value=modules.globals.enable_hair_swapping) hair_swapping_switch = ctk.CTkSwitch( root, @@ -298,9 +299,10 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C command=lambda: ( setattr(modules.globals, "enable_hair_swapping", hair_swapping_value.get()), save_switch_states(), - ) + ), + state="normal" if segmentation_model_available else "disabled" ) - hair_swapping_switch.place(relx=0.6, rely=0.80) # Adjusted rely from 0.75 to 0.80 + hair_swapping_switch.place(relx=0.6, rely=0.80) mouth_mask_var = ctk.BooleanVar(value=modules.globals.mouth_mask) mouth_mask_switch = ctk.CTkSwitch( @@ -911,74 +913,82 @@ def create_webcam_preview(camera_index: int): update_status("Error: No source image selected for webcam mode.") cap.release() PREVIEW.withdraw() - while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): - ROOT.update_idletasks() - ROOT.update() - time.sleep(0.05) + def wait_for_withdraw(): + if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + PREVIEW.after(50, wait_for_withdraw) + wait_for_withdraw() return if not os.path.exists(modules.globals.source_path): update_status(f"Error: Source image not found at {modules.globals.source_path}") cap.release() PREVIEW.withdraw() - while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): - ROOT.update_idletasks() - ROOT.update() - time.sleep(0.05) + def wait_for_withdraw(): + if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + PREVIEW.after(50, wait_for_withdraw) + wait_for_withdraw() return - source_frame_full_for_cam = cv2.imread(modules.globals.source_path) if source_frame_full_for_cam is None: update_status(f"Error: Could not read source image at {modules.globals.source_path}") cap.release() PREVIEW.withdraw() - while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): - ROOT.update_idletasks() - ROOT.update() - time.sleep(0.05) + def wait_for_withdraw(): + if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + PREVIEW.after(50, wait_for_withdraw) + wait_for_withdraw() return - source_face_obj_for_cam = get_one_face(source_frame_full_for_cam) if source_face_obj_for_cam is None: update_status(f"Error: No face detected in source image {modules.globals.source_path}") - # This error is less critical for stopping immediately, but we'll make it persistent too. - # The loop below will run, but processing for frames will effectively be skipped. - # For consistency in error handling, make it persistent. cap.release() PREVIEW.withdraw() - while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): - ROOT.update_idletasks() - ROOT.update() - time.sleep(0.05) + def wait_for_withdraw(): + if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + PREVIEW.after(50, wait_for_withdraw) + wait_for_withdraw() return else: # modules.globals.map_faces is True if not modules.globals.source_path: update_status("Error: No global source image selected (for hair/background in map_faces mode).") cap.release() PREVIEW.withdraw() - while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): - ROOT.update_idletasks() - ROOT.update() - time.sleep(0.05) + def wait_for_withdraw(): + if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + PREVIEW.after(50, wait_for_withdraw) + wait_for_withdraw() return if not os.path.exists(modules.globals.source_path): update_status(f"Error: Source image (for hair/background) not found at {modules.globals.source_path}") cap.release() PREVIEW.withdraw() - while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): - ROOT.update_idletasks() - ROOT.update() - time.sleep(0.05) + def wait_for_withdraw(): + if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + PREVIEW.after(50, wait_for_withdraw) + wait_for_withdraw() return - source_frame_full_for_cam_map_faces = cv2.imread(modules.globals.source_path) if source_frame_full_for_cam_map_faces is None: update_status(f"Error: Could not read source image (for hair/background) at {modules.globals.source_path}") cap.release() PREVIEW.withdraw() - while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): - ROOT.update_idletasks() - ROOT.update() - time.sleep(0.05) + def wait_for_withdraw(): + if PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + PREVIEW.after(50, wait_for_withdraw) + wait_for_withdraw() return if not modules.globals.source_target_map and not modules.globals.simple_map: diff --git a/modules/video_capture.py b/modules/video_capture.py index 5721212..76f2b49 100644 --- a/modules/video_capture.py +++ b/modules/video_capture.py @@ -50,17 +50,48 @@ class VideoCapturer: continue else: # Unix-like systems (Linux/Mac) capture method + backend = getattr(self, "camera_backend", None) + if backend is None: + import os + backend_env = os.environ.get("VIDEO_CAPTURE_BACKEND") + if backend_env is not None: + try: + backend = int(backend_env) + except ValueError: + backend = getattr(cv2, backend_env, None) if platform.system() == "Darwin": # macOS - print("INFO: Attempting to use cv2.CAP_AVFOUNDATION for macOS camera.") - self.cap = cv2.VideoCapture(self.device_index, cv2.CAP_AVFOUNDATION) + tried_backends = [] + if backend is not None: + print(f"INFO: Attempting to use user-specified backend {backend} for macOS camera.") + self.cap = cv2.VideoCapture(self.device_index, backend) + tried_backends.append(backend) + else: + print("INFO: Attempting to use cv2.CAP_AVFOUNDATION for macOS camera.") + self.cap = cv2.VideoCapture(self.device_index, cv2.CAP_AVFOUNDATION) + tried_backends.append(cv2.CAP_AVFOUNDATION) if not self.cap or not self.cap.isOpened(): - print("WARN: cv2.CAP_AVFOUNDATION failed to open camera. Trying default backend for macOS.") - # Release the failed attempt before trying again + print("WARN: First backend failed to open camera. Trying cv2.CAP_QT for macOS.") + if self.cap: + self.cap.release() + if cv2.CAP_QT not in tried_backends: + self.cap = cv2.VideoCapture(self.device_index, cv2.CAP_QT) + tried_backends.append(cv2.CAP_QT) + if not self.cap or not self.cap.isOpened(): + print("WARN: cv2.CAP_QT failed to open camera. Trying default backend for macOS.") if self.cap: self.cap.release() self.cap = cv2.VideoCapture(self.device_index) # Fallback to default else: # Other Unix-like systems (e.g., Linux) - self.cap = cv2.VideoCapture(self.device_index) + if backend is not None: + print(f"INFO: Attempting to use user-specified backend {backend} for camera.") + self.cap = cv2.VideoCapture(self.device_index, backend) + if not self.cap or not self.cap.isOpened(): + print("WARN: User-specified backend failed. Trying default backend.") + if self.cap: + self.cap.release() + self.cap = cv2.VideoCapture(self.device_index) + else: + self.cap = cv2.VideoCapture(self.device_index) if not self.cap or not self.cap.isOpened(): raise RuntimeError("Failed to open camera")