Merge d4905c6bd9 into 2b70131e6a
				
					
				
			
						commit
						44c60aa8d4
					
				|  | @ -39,9 +39,12 @@ def parse_args() -> None: | ||||||
|     program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) |     program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) | ||||||
|     program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) |     program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) | ||||||
|     program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False) |     program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False) | ||||||
|  |     program.add_argument('--color-correction', help='apply color correction to the swapped face', dest='color_correction', action='store_true', default=False) # Added this line back | ||||||
|     program.add_argument('--nsfw-filter', help='filter the NSFW image or video', dest='nsfw_filter', action='store_true', default=False) |     program.add_argument('--nsfw-filter', help='filter the NSFW image or video', dest='nsfw_filter', action='store_true', default=False) | ||||||
|     program.add_argument('--map-faces', help='map source target faces', dest='map_faces', action='store_true', default=False) |     program.add_argument('--map-faces', help='map source target faces', dest='map_faces', action='store_true', default=False) | ||||||
|     program.add_argument('--mouth-mask', help='mask the mouth region', dest='mouth_mask', action='store_true', default=False) |     program.add_argument('--mouth-mask', help='mask the mouth region', dest='mouth_mask', action='store_true', default=False) | ||||||
|  |     program.add_argument('--poisson-blending', help='use Poisson blending for smoother face integration', dest='poisson_blending', action='store_true', default=False) | ||||||
|  |     program.add_argument('--preserve-ears', help='attempt to preserve target ears by modifying the blend mask', dest='preserve_ears', action='store_true', default=False) | ||||||
|     program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9']) |     program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9']) | ||||||
|     program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]') |     program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]') | ||||||
|     program.add_argument('-l', '--lang', help='Ui language', default="en") |     program.add_argument('-l', '--lang', help='Ui language', default="en") | ||||||
|  | @ -69,7 +72,10 @@ def parse_args() -> None: | ||||||
|     modules.globals.keep_audio = args.keep_audio |     modules.globals.keep_audio = args.keep_audio | ||||||
|     modules.globals.keep_frames = args.keep_frames |     modules.globals.keep_frames = args.keep_frames | ||||||
|     modules.globals.many_faces = args.many_faces |     modules.globals.many_faces = args.many_faces | ||||||
|  |     modules.globals.color_correction = args.color_correction | ||||||
|     modules.globals.mouth_mask = args.mouth_mask |     modules.globals.mouth_mask = args.mouth_mask | ||||||
|  |     modules.globals.use_poisson_blending = args.poisson_blending | ||||||
|  |     modules.globals.preserve_target_ears = args.preserve_ears | ||||||
|     modules.globals.nsfw_filter = args.nsfw_filter |     modules.globals.nsfw_filter = args.nsfw_filter | ||||||
|     modules.globals.map_faces = args.map_faces |     modules.globals.map_faces = args.map_faces | ||||||
|     modules.globals.video_encoder = args.video_encoder |     modules.globals.video_encoder = args.video_encoder | ||||||
|  |  | ||||||
|  | @ -41,3 +41,10 @@ show_mouth_mask_box = False | ||||||
| mask_feather_ratio = 8 | mask_feather_ratio = 8 | ||||||
| mask_down_size = 0.50 | mask_down_size = 0.50 | ||||||
| mask_size = 1 | mask_size = 1 | ||||||
|  | use_poisson_blending = False # Added for Poisson blending | ||||||
|  | poisson_blending_feather_amount = 5 # Feathering for the mask before Poisson blending | ||||||
|  | preserve_target_ears = False # Flag to enable preserving target's ears | ||||||
|  | ear_width_ratio = 0.18 # Width of the ear exclusion box as a ratio of face bbox width | ||||||
|  | ear_height_ratio = 0.35 # Height of the ear exclusion box as a ratio of face bbox height | ||||||
|  | ear_vertical_offset_ratio = 0.20 # Vertical offset of the ear box from top of face bbox | ||||||
|  | ear_horizontal_overlap_ratio = 0.03 # How much the ear exclusion zone can overlap into the face bbox | ||||||
|  |  | ||||||
|  | @ -71,10 +71,43 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: | ||||||
|     face_swapper = get_face_swapper() |     face_swapper = get_face_swapper() | ||||||
| 
 | 
 | ||||||
|     # Apply the face swap |     # Apply the face swap | ||||||
|     swapped_frame = face_swapper.get( |     swapped_frame_result = face_swapper.get( # Renamed to avoid confusion | ||||||
|         temp_frame, target_face, source_face, paste_back=True |         temp_frame, target_face, source_face, paste_back=True | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|  |     # Ensure swapped_frame_result is not None and is a valid image | ||||||
|  |     if swapped_frame_result is None or not isinstance(swapped_frame_result, np.ndarray): | ||||||
|  |         logging.error("Face swap operation failed or returned invalid result.") | ||||||
|  |         return temp_frame # Return original frame if swap failed | ||||||
|  | 
 | ||||||
|  |     # Color Correction | ||||||
|  |     if modules.globals.color_correction: | ||||||
|  |         # Get the bounding box of the target face to apply color correction | ||||||
|  |         # more accurately to the swapped region. | ||||||
|  |         # The target_face object should have bbox attribute (x1, y1, x2, y2) | ||||||
|  |         if hasattr(target_face, 'bbox'): | ||||||
|  |             x1, y1, x2, y2 = target_face.bbox.astype(int) | ||||||
|  |             # Ensure coordinates are within frame bounds | ||||||
|  |             x1, y1 = max(0, x1), max(0, y1) | ||||||
|  |             x2, y2 = min(swapped_frame_result.shape[1], x2), min(swapped_frame_result.shape[0], y2) | ||||||
|  | 
 | ||||||
|  |             if x1 < x2 and y1 < y2: | ||||||
|  |                 swapped_face_region = swapped_frame_result[y1:y2, x1:x2] | ||||||
|  |                 target_face_region_original = temp_frame[y1:y2, x1:x2] | ||||||
|  | 
 | ||||||
|  |                 if swapped_face_region.size > 0 and target_face_region_original.size > 0: | ||||||
|  |                     corrected_swapped_face_region = apply_histogram_matching_color_correction(swapped_face_region, target_face_region_original) | ||||||
|  |                     swapped_frame_result[y1:y2, x1:x2] = corrected_swapped_face_region | ||||||
|  |                 else: | ||||||
|  |                     # Fallback to full frame color correction if regions are invalid | ||||||
|  |                     swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame) | ||||||
|  |             else: | ||||||
|  |                 # Fallback to full frame color correction if bbox is invalid | ||||||
|  |                 swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame) | ||||||
|  |         else: | ||||||
|  |             # Fallback to full frame color correction if no bbox | ||||||
|  |             swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame) | ||||||
|  | 
 | ||||||
|     if modules.globals.mouth_mask: |     if modules.globals.mouth_mask: | ||||||
|         # Create a mask for the target face |         # Create a mask for the target face | ||||||
|         face_mask = create_face_mask(target_face, temp_frame) |         face_mask = create_face_mask(target_face, temp_frame) | ||||||
|  | @ -85,22 +118,136 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         # Apply the mouth area |         # Apply the mouth area | ||||||
|         swapped_frame = apply_mouth_area( |         swapped_frame_result = apply_mouth_area( | ||||||
|             swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon |             swapped_frame_result, mouth_cutout, mouth_box, face_mask, lower_lip_polygon | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         if modules.globals.show_mouth_mask_box: |         if modules.globals.show_mouth_mask_box: | ||||||
|             mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) |             mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) | ||||||
|             swapped_frame = draw_mouth_mask_visualization( |             swapped_frame_result = draw_mouth_mask_visualization( | ||||||
|                 swapped_frame, target_face, mouth_mask_data |                 swapped_frame_result, target_face, mouth_mask_data | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|     return swapped_frame |     # Poisson Blending | ||||||
|  |     if modules.globals.use_poisson_blending and hasattr(target_face, 'bbox'): | ||||||
|  |         # Create a mask for the swapped face region for Poisson blending | ||||||
|  |         # This mask should cover the area of the swapped face. | ||||||
|  |         # We can use the target_face.bbox and perhaps expand it slightly, | ||||||
|  |         # or use a more precise mask from face parsing if available. | ||||||
|  |         # For simplicity, using a slightly feathered convex hull of landmarks. | ||||||
|  | 
 | ||||||
|  |         face_mask_for_blending = np.zeros(temp_frame.shape[:2], dtype=np.uint8) | ||||||
|  | 
 | ||||||
|  |         # Prioritize using the bounding box for a tighter mask | ||||||
|  |         if hasattr(target_face, 'bbox'): | ||||||
|  |             x1, y1, x2, y2 = target_face.bbox.astype(int) | ||||||
|  |             # Ensure coordinates are within frame bounds | ||||||
|  |             x1_b, y1_b = max(0, x1), max(0, y1) # Use different var names to avoid conflict with center calculation | ||||||
|  |             x2_b, y2_b = min(temp_frame.shape[1], x2), min(temp_frame.shape[0], y2) | ||||||
|  | 
 | ||||||
|  |             # Create a rectangular mask based on the bounding box | ||||||
|  |             if x1_b < x2_b and y1_b < y2_b: | ||||||
|  |                 face_mask_for_blending[y1_b:y2_b, x1_b:x2_b] = 255 | ||||||
|  |             else: | ||||||
|  |                 logging.warning("Invalid bounding box for Poisson mask. Attempting landmark-based mask.") | ||||||
|  |                 # Fallback to landmark-based convex hull if bbox is invalid | ||||||
|  |                 landmarks = target_face.landmark_2d_106 if hasattr(target_face, 'landmark_2d_106') else None | ||||||
|  |                 if landmarks is not None and len(landmarks) > 0: | ||||||
|  |                     try: | ||||||
|  |                         hull_points = cv2.convexHull(landmarks.astype(np.int32)) | ||||||
|  |                         cv2.fillConvexPoly(face_mask_for_blending, hull_points, 255) | ||||||
|  |                     except Exception as e: | ||||||
|  |                         logging.error(f"Could not form convex hull for Poisson mask from landmarks: {e}. Blending will be skipped.") | ||||||
|  |                 else: | ||||||
|  |                     logging.error("No valid bbox or landmarks for Poisson mask. Blending will be skipped.") | ||||||
|  |         else: | ||||||
|  |             # Fallback to landmark-based convex hull if no bbox attribute | ||||||
|  |             landmarks = target_face.landmark_2d_106 if hasattr(target_face, 'landmark_2d_106') else None | ||||||
|  |             if landmarks is not None and len(landmarks) > 0: | ||||||
|  |                 try: | ||||||
|  |                     hull_points = cv2.convexHull(landmarks.astype(np.int32)) | ||||||
|  |                     cv2.fillConvexPoly(face_mask_for_blending, hull_points, 255) | ||||||
|  |                 except Exception as e: | ||||||
|  |                     logging.error(f"Could not form convex hull for Poisson mask from landmarks (no bbox): {e}. Blending will be skipped.") | ||||||
|  |             else: | ||||||
|  |                 logging.error("No bbox or landmarks available for Poisson mask. Blending will be skipped.") | ||||||
|  | 
 | ||||||
|  |         # Subtract ear regions if preserve_target_ears is enabled | ||||||
|  |         if modules.globals.preserve_target_ears and np.any(face_mask_for_blending > 0): | ||||||
|  |             mfx1, mfy1, mfx2, mfy2 = target_face.bbox.astype(int) | ||||||
|  |             mfw = mfx2 - mfx1 | ||||||
|  |             mfh = mfy2 - mfy1 | ||||||
|  | 
 | ||||||
|  |             ear_w = int(mfw * modules.globals.ear_width_ratio) | ||||||
|  |             ear_h = int(mfh * modules.globals.ear_height_ratio) | ||||||
|  |             ear_v_offset = int(mfh * modules.globals.ear_vertical_offset_ratio) | ||||||
|  |             ear_overlap = int(mfw * modules.globals.ear_horizontal_overlap_ratio) | ||||||
|  | 
 | ||||||
|  |             # Person's Right Ear (image left side of face bbox) | ||||||
|  |             # This region in face_mask_for_blending will be set to 0 | ||||||
|  |             rex1 = max(0, mfx1 - ear_w + ear_overlap) | ||||||
|  |             rey1 = max(0, mfy1 + ear_v_offset) | ||||||
|  |             rex2 = min(temp_frame.shape[1], mfx1 + ear_overlap) # Extends slightly into face bbox for smoother transition | ||||||
|  |             rey2 = min(temp_frame.shape[0], rey1 + ear_h) | ||||||
|  |             if rex1 < rex2 and rey1 < rey2: | ||||||
|  |                 cv2.rectangle(face_mask_for_blending, (rex1, rey1), (rex2, rey2), 0, -1) | ||||||
|  | 
 | ||||||
|  |             # Person's Left Ear (image right side of face bbox) | ||||||
|  |             lex1 = max(0, mfx2 - ear_overlap) | ||||||
|  |             ley1 = max(0, mfy1 + ear_v_offset) | ||||||
|  |             lex2 = min(temp_frame.shape[1], mfx2 + ear_w - ear_overlap) | ||||||
|  |             ley2 = min(temp_frame.shape[0], ley1 + ear_h) | ||||||
|  |             if lex1 < lex2 and ley1 < ley2: | ||||||
|  |                 cv2.rectangle(face_mask_for_blending, (lex1, ley1), (lex2, ley2), 0, -1) | ||||||
|  | 
 | ||||||
|  |         # Feather the mask to smooth edges for Poisson blending | ||||||
|  |         if np.any(face_mask_for_blending > 0): # Only feather if there's a mask | ||||||
|  |             feather_amount = modules.globals.poisson_blending_feather_amount | ||||||
|  |             if feather_amount > 0: | ||||||
|  |                 # Ensure kernel size is odd | ||||||
|  |                 kernel_size = 2 * feather_amount + 1 | ||||||
|  |                 face_mask_for_blending = cv2.GaussianBlur(face_mask_for_blending, (kernel_size, kernel_size), 0) | ||||||
|  | 
 | ||||||
|  |         # Calculate the center of the target face bbox for seamlessClone | ||||||
|  |         if hasattr(target_face, 'bbox'): | ||||||
|  |             x1, y1, x2, y2 = target_face.bbox.astype(int) | ||||||
|  |             center_x = (x1 + x2) // 2 | ||||||
|  |             center_y = (y1 + y2) // 2 | ||||||
|  | 
 | ||||||
|  |             # Ensure center is within frame dimensions | ||||||
|  |             center_x = np.clip(center_x, 0, temp_frame.shape[1] -1) | ||||||
|  |             center_y = np.clip(center_y, 0, temp_frame.shape[0] -1) | ||||||
|  |             center = (center_x, center_y) | ||||||
|  | 
 | ||||||
|  |             # Apply Poisson blending | ||||||
|  |             # swapped_frame_result is the source, temp_frame is the destination | ||||||
|  |             if np.any(face_mask_for_blending > 0): # Proceed only if mask is not empty | ||||||
|  |                 try: | ||||||
|  |                     # Ensure swapped_frame_result and temp_frame are 8-bit 3-channel images | ||||||
|  |                     if swapped_frame_result.dtype != np.uint8: | ||||||
|  |                         swapped_frame_result = np.clip(swapped_frame_result, 0, 255).astype(np.uint8) | ||||||
|  |                     if temp_frame.dtype != np.uint8: | ||||||
|  |                         temp_frame_uint8 = np.clip(temp_frame, 0, 255).astype(np.uint8) | ||||||
|  |                     else: | ||||||
|  |                         temp_frame_uint8 = temp_frame | ||||||
|  | 
 | ||||||
|  |                     swapped_frame_result = cv2.seamlessClone(swapped_frame_result, temp_frame_uint8, face_mask_for_blending, center, cv2.NORMAL_CLONE) | ||||||
|  |                 except cv2.error as e: | ||||||
|  |                     logging.error(f"Error during Poisson blending: {e}") | ||||||
|  |                     # Fallback to non-blended result if seamlessClone fails | ||||||
|  |                     pass # swapped_frame_result remains as is | ||||||
|  |             else: | ||||||
|  |                 logging.warning("Poisson blending mask is empty. Skipping Poisson blending.") | ||||||
|  | 
 | ||||||
|  |     return swapped_frame_result | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | ||||||
|     if modules.globals.color_correction: |     # The color_correction logic was moved into swap_face. | ||||||
|         temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) |     # The initial temp_frame modification `cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)` | ||||||
|  |     # was incorrect as it changes the color space of the whole frame before processing, | ||||||
|  |     # which is not what we want for color correction of the swapped part. | ||||||
|  |     # Histogram matching is now done BGR to BGR. | ||||||
| 
 | 
 | ||||||
|     if modules.globals.many_faces: |     if modules.globals.many_faces: | ||||||
|         many_faces = get_many_faces(temp_frame) |         many_faces = get_many_faces(temp_frame) | ||||||
|  | @ -620,3 +767,37 @@ def apply_color_transfer(source, target): | ||||||
|     source = (source - source_mean) * (target_std / source_std) + target_mean |     source = (source - source_mean) * (target_std / source_std) + target_mean | ||||||
| 
 | 
 | ||||||
|     return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) |     return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def apply_histogram_matching_color_correction(source_img: Frame, target_img: Frame) -> Frame: | ||||||
|  |     """ | ||||||
|  |     Applies color correction to the source image to match the target image's color distribution | ||||||
|  |     using histogram matching on each color channel. | ||||||
|  |     """ | ||||||
|  |     corrected_img = np.zeros_like(source_img) | ||||||
|  |     for i in range(source_img.shape[2]):  # Iterate over color channels (B, G, R) | ||||||
|  |         source_hist, _ = np.histogram(source_img[:, :, i].flatten(), 256, [0, 256]) | ||||||
|  |         target_hist, _ = np.histogram(target_img[:, :, i].flatten(), 256, [0, 256]) | ||||||
|  | 
 | ||||||
|  |         # Compute cumulative distribution functions (CDFs) | ||||||
|  |         source_cdf = source_hist.cumsum() | ||||||
|  |         source_cdf_normalized = source_cdf * source_hist.max() / source_cdf.max() # Normalize | ||||||
|  | 
 | ||||||
|  |         target_cdf = target_hist.cumsum() | ||||||
|  |         target_cdf_normalized = target_cdf * target_hist.max() / target_cdf.max() # Normalize | ||||||
|  | 
 | ||||||
|  |         # Create lookup table | ||||||
|  |         lookup_table = np.zeros(256, 'uint8') | ||||||
|  | 
 | ||||||
|  |         gj = 0 | ||||||
|  |         for gi in range(256): | ||||||
|  |             while gj < 256 and target_cdf_normalized[gj] < source_cdf_normalized[gi]: | ||||||
|  |                 gj += 1 | ||||||
|  |             if gj == 256: # If we reach end of target_cdf, map remaining to max value | ||||||
|  |                 lookup_table[gi] = 255 | ||||||
|  |             else: | ||||||
|  |                 lookup_table[gi] = gj | ||||||
|  | 
 | ||||||
|  |         corrected_img[:, :, i] = cv2.LUT(source_img[:, :, i], lookup_table) | ||||||
|  | 
 | ||||||
|  |     return corrected_img | ||||||
|  |  | ||||||
								
									
									
										
											161
										
									
									modules/ui.py
									
									
									
									
								
								
							
							
										
											161
										
									
									modules/ui.py
									
									
									
									
								|  | @ -880,84 +880,119 @@ def create_webcam_preview(camera_index: int): | ||||||
|     PREVIEW.deiconify() |     PREVIEW.deiconify() | ||||||
| 
 | 
 | ||||||
|     frame_processors = get_frame_processors_modules(modules.globals.frame_processors) |     frame_processors = get_frame_processors_modules(modules.globals.frame_processors) | ||||||
|  |     # Get initial source image if not mapping faces | ||||||
|     source_image = None |     source_image = None | ||||||
|     prev_time = time.time() |     if not modules.globals.map_faces and modules.globals.source_path: | ||||||
|     fps_update_interval = 0.5 |         try: | ||||||
|     frame_count = 0 |             loaded_cv_image = cv2.imread(modules.globals.source_path) | ||||||
|     fps = 0 |             if loaded_cv_image is None: | ||||||
|  |                 update_status(f"Error: Could not read source image at {modules.globals.source_path}") | ||||||
|  |                 # source_image remains None | ||||||
|  |             else: | ||||||
|  |                 source_image = get_one_face(loaded_cv_image) | ||||||
|  |                 if source_image is None: | ||||||
|  |                     update_status(f"Error: No face detected in source image {os.path.basename(modules.globals.source_path)}") | ||||||
|  |         except Exception as e: | ||||||
|  |             update_status(f"Exception loading source image: {str(e)[:100]}") | ||||||
|  |             source_image = None # Ensure source_image is None on any error | ||||||
| 
 | 
 | ||||||
|     while True: |     # If source_image is still None AND a source_path was provided (meaning user intended a swap) | ||||||
|         ret, frame = cap.read() |     # AND we are not using map_faces (which handles its own source logic for sources) | ||||||
|         if not ret: |     if source_image is None and modules.globals.source_path and not modules.globals.map_faces: | ||||||
|             break |         update_status("Warning: Live preview started, but source image is invalid or has no face. No swap will occur.") | ||||||
|  |         # The live preview will start, but no swap will occur if source_image is None. | ||||||
| 
 | 
 | ||||||
|         temp_frame = frame.copy() |     # Start the update loop | ||||||
|  |     fps_data = { # Moved fps_data initialization here to be passed to the loop | ||||||
|  |         "prev_time": time.time(), | ||||||
|  |         "frame_count": 0, | ||||||
|  |         "fps": 0.0, | ||||||
|  |         "fps_update_interval": 0.5 | ||||||
|  |     } | ||||||
|  |     update_webcam_frame_after(cap, frame_processors, source_image, fps_data) | ||||||
| 
 | 
 | ||||||
|         if modules.globals.live_mirror: |  | ||||||
|             temp_frame = cv2.flip(temp_frame, 1) |  | ||||||
| 
 | 
 | ||||||
|         if modules.globals.live_resizable: | def update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms=15): # Approx 66 FPS target for UI updates | ||||||
|             temp_frame = fit_image_to_size( |     global preview_label, ROOT, PREVIEW | ||||||
|                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() |  | ||||||
|             ) |  | ||||||
| 
 | 
 | ||||||
|         else: |     if PREVIEW.state() == "withdrawn": | ||||||
|             temp_frame = fit_image_to_size( |         cap.release() | ||||||
|                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() |         PREVIEW.withdraw() # Ensure it's withdrawn if loop exits | ||||||
|             ) |         return | ||||||
| 
 | 
 | ||||||
|         if not modules.globals.map_faces: |     ret, frame = cap.read() | ||||||
|             if source_image is None and modules.globals.source_path: |     if not ret: | ||||||
|                 source_image = get_one_face(cv2.imread(modules.globals.source_path)) |         # Handle camera read failure or end of stream (though for webcam, it's usually continuous) | ||||||
|  |         ROOT.after(delay_ms, lambda: update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms)) | ||||||
|  |         return | ||||||
| 
 | 
 | ||||||
|             for frame_processor in frame_processors: |     temp_frame = frame.copy() | ||||||
|                 if frame_processor.NAME == "DLC.FACE-ENHANCER": | 
 | ||||||
|                     if modules.globals.fp_ui["face_enhancer"]: |     if modules.globals.live_mirror: | ||||||
|                         temp_frame = frame_processor.process_frame(None, temp_frame) |         temp_frame = cv2.flip(temp_frame, 1) | ||||||
|                 else: | 
 | ||||||
|                     temp_frame = frame_processor.process_frame(source_image, temp_frame) |     # Resizing based on PREVIEW window dimensions. | ||||||
|         else: |     preview_width = PREVIEW.winfo_width() | ||||||
|             modules.globals.target_path = None |     preview_height = PREVIEW.winfo_height() | ||||||
|             for frame_processor in frame_processors: |     if preview_width > 1 and preview_height > 1: # Ensure valid dimensions | ||||||
|                 if frame_processor.NAME == "DLC.FACE-ENHANCER": |          temp_frame = fit_image_to_size(temp_frame, preview_width, preview_height) | ||||||
|                     if modules.globals.fp_ui["face_enhancer"]: | 
 | ||||||
|                         temp_frame = frame_processor.process_frame_v2(temp_frame) | 
 | ||||||
|                 else: |     if not modules.globals.map_faces: | ||||||
|  |         # current_source_image is the source_image passed in from create_webcam_preview | ||||||
|  |         # It's determined once before the loop starts. No reloading here. | ||||||
|  |         current_source_image = source_image | ||||||
|  | 
 | ||||||
|  |         for frame_processor in frame_processors: | ||||||
|  |             if frame_processor.NAME == "DLC.FACE-ENHANCER": | ||||||
|  |                 if modules.globals.fp_ui["face_enhancer"]: | ||||||
|  |                     temp_frame = frame_processor.process_frame(None, temp_frame) | ||||||
|  |             else: # This is the face_swapper processor or other default | ||||||
|  |                 if current_source_image: # Only process if source_image (from create_webcam_preview) is valid | ||||||
|  |                     temp_frame = frame_processor.process_frame(current_source_image, temp_frame) | ||||||
|  |                 # If current_source_image is None, the frame is not processed by face_swapper, effectively no swap. | ||||||
|  |     else: | ||||||
|  |         modules.globals.target_path = None | ||||||
|  |         for frame_processor in frame_processors: | ||||||
|  |             if frame_processor.NAME == "DLC.FACE-ENHANCER": | ||||||
|  |                 if modules.globals.fp_ui["face_enhancer"]: | ||||||
|                     temp_frame = frame_processor.process_frame_v2(temp_frame) |                     temp_frame = frame_processor.process_frame_v2(temp_frame) | ||||||
|  |             else: | ||||||
|  |                 temp_frame = frame_processor.process_frame_v2(temp_frame) | ||||||
| 
 | 
 | ||||||
|         # Calculate and display FPS |     current_time = time.time() | ||||||
|         current_time = time.time() |     fps_data["frame_count"] += 1 | ||||||
|         frame_count += 1 |     time_diff = current_time - fps_data["prev_time"] | ||||||
|         if current_time - prev_time >= fps_update_interval: |  | ||||||
|             fps = frame_count / (current_time - prev_time) |  | ||||||
|             frame_count = 0 |  | ||||||
|             prev_time = current_time |  | ||||||
| 
 | 
 | ||||||
|         if modules.globals.show_fps: |     if time_diff >= fps_data.get("fps_update_interval", 0.5): | ||||||
|             cv2.putText( |         fps_data["fps"] = fps_data["frame_count"] / time_diff | ||||||
|                 temp_frame, |         fps_data["frame_count"] = 0 | ||||||
|                 f"FPS: {fps:.1f}", |         fps_data["prev_time"] = current_time | ||||||
|                 (10, 30), |  | ||||||
|                 cv2.FONT_HERSHEY_SIMPLEX, |  | ||||||
|                 1, |  | ||||||
|                 (0, 255, 0), |  | ||||||
|                 2, |  | ||||||
|             ) |  | ||||||
| 
 | 
 | ||||||
|         image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) |     if modules.globals.show_fps: | ||||||
|         image = Image.fromarray(image) |         cv2.putText( | ||||||
|         image = ImageOps.contain( |             temp_frame, | ||||||
|             image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS |             f"FPS: {fps_data['fps']:.1f}", | ||||||
|  |             (10, 30), | ||||||
|  |             cv2.FONT_HERSHEY_SIMPLEX, | ||||||
|  |             1, | ||||||
|  |             (0, 255, 0), | ||||||
|  |             2, | ||||||
|         ) |         ) | ||||||
|         image = ctk.CTkImage(image, size=image.size) |  | ||||||
|         preview_label.configure(image=image) |  | ||||||
|         ROOT.update() |  | ||||||
| 
 | 
 | ||||||
|         if PREVIEW.state() == "withdrawn": |     if temp_frame is not None and temp_frame.size > 0: | ||||||
|             break |         image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) | ||||||
|  |         pil_image = Image.fromarray(image) | ||||||
| 
 | 
 | ||||||
|     cap.release() |         contained_image = ImageOps.contain( | ||||||
|     PREVIEW.withdraw() |             pil_image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS | ||||||
|  |         ) | ||||||
|  |         ctk_image = ctk.CTkImage(contained_image, size=contained_image.size) | ||||||
|  |         preview_label.configure(image=ctk_image) | ||||||
|  |     else: | ||||||
|  |         pass | ||||||
|  | 
 | ||||||
|  |     ROOT.after(delay_ms, lambda: update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms)) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_source_target_popup_for_webcam( | def create_source_target_popup_for_webcam( | ||||||
|  |  | ||||||
|  | @ -12,83 +12,142 @@ if platform.system() == "Windows": | ||||||
| class VideoCapturer: | class VideoCapturer: | ||||||
|     def __init__(self, device_index: int): |     def __init__(self, device_index: int): | ||||||
|         self.device_index = device_index |         self.device_index = device_index | ||||||
|         self.frame_callback = None |         self._latest_frame: Optional[np.ndarray] = None | ||||||
|         self._current_frame = None |         self._frame_lock = threading.Lock() | ||||||
|         self._frame_ready = threading.Event() |  | ||||||
|         self.is_running = False |         self.is_running = False | ||||||
|         self.cap = None |         self.cap: Optional[cv2.VideoCapture] = None | ||||||
|  |         self._capture_thread: Optional[threading.Thread] = None | ||||||
| 
 | 
 | ||||||
|         # Initialize Windows-specific components if on Windows |         # Initialize Windows-specific components if on Windows | ||||||
|         if platform.system() == "Windows": |         if platform.system() == "Windows": | ||||||
|             self.graph = FilterGraph() |             try: | ||||||
|             # Verify device exists |                 self.graph = FilterGraph() | ||||||
|             devices = self.graph.get_input_devices() |                 # Verify device exists | ||||||
|             if self.device_index >= len(devices): |                 devices = self.graph.get_input_devices() | ||||||
|                 raise ValueError( |                 if self.device_index >= len(devices): | ||||||
|                     f"Invalid device index {device_index}. Available devices: {len(devices)}" |                     # Fallback or logging, rather than immediate raise for flexibility | ||||||
|                 ) |                     print(f"Warning: Device index {device_index} might be out of range. Available: {len(devices)}. Will attempt to open anyway.") | ||||||
|  |             except Exception as e: | ||||||
|  |                 print(f"Warning: Could not initialize FilterGraph for device enumeration: {e}") | ||||||
|  |                 self.graph = None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |     def _capture_loop(self) -> None: | ||||||
|  |         while self.is_running and self.cap is not None: | ||||||
|  |             try: | ||||||
|  |                 ret, frame = self.cap.read() | ||||||
|  |                 if ret: | ||||||
|  |                     with self._frame_lock: | ||||||
|  |                         self._latest_frame = frame | ||||||
|  |                 else: | ||||||
|  |                     # Handle camera read failure, e.g., camera disconnected | ||||||
|  |                     print("Warning: Failed to read frame from camera in capture loop.") | ||||||
|  |                     # Small sleep to prevent tight loop on continuous read errors | ||||||
|  |                     threading.Event().wait(0.1) | ||||||
|  |             except Exception as e: | ||||||
|  |                 print(f"Error in capture loop: {e}") | ||||||
|  |                 self.is_running = False # Stop loop on critical error | ||||||
|  |                 break | ||||||
|  |             # Small sleep to yield execution and not busy-wait if camera FPS is low | ||||||
|  |             # Adjust sleep time as needed; too high adds latency, too low uses more CPU. | ||||||
|  |             threading.Event().wait(0.001) # 1 ms sleep | ||||||
| 
 | 
 | ||||||
|     def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: |     def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: | ||||||
|         """Initialize and start video capture""" |         """Initialize and start video capture in a separate thread.""" | ||||||
|  |         if self.is_running: | ||||||
|  |             print("Capture already running.") | ||||||
|  |             return True | ||||||
|         try: |         try: | ||||||
|             if platform.system() == "Windows": |             if platform.system() == "Windows": | ||||||
|                 # Windows-specific capture methods |  | ||||||
|                 capture_methods = [ |                 capture_methods = [ | ||||||
|                     (self.device_index, cv2.CAP_DSHOW),  # Try DirectShow first |                     (self.device_index, cv2.CAP_DSHOW), | ||||||
|                     (self.device_index, cv2.CAP_ANY),  # Then try default backend |                     (self.device_index, cv2.CAP_MSMF), | ||||||
|                     (-1, cv2.CAP_ANY),  # Try -1 as fallback |                     (self.device_index, cv2.CAP_ANY), | ||||||
|                     (0, cv2.CAP_ANY),  # Finally try 0 without specific backend |                     (-1, cv2.CAP_ANY), | ||||||
|  |                     (0, cv2.CAP_ANY) | ||||||
|                 ] |                 ] | ||||||
| 
 |  | ||||||
|                 for dev_id, backend in capture_methods: |                 for dev_id, backend in capture_methods: | ||||||
|                     try: |                     try: | ||||||
|                         self.cap = cv2.VideoCapture(dev_id, backend) |                         self.cap = cv2.VideoCapture(dev_id, backend) | ||||||
|                         if self.cap.isOpened(): |                         if self.cap and self.cap.isOpened(): | ||||||
|  |                             print(f"Successfully opened camera {dev_id} with backend {backend}") | ||||||
|                             break |                             break | ||||||
|                         self.cap.release() |                         if self.cap: | ||||||
|  |                             self.cap.release() | ||||||
|  |                             self.cap = None | ||||||
|                     except Exception: |                     except Exception: | ||||||
|                         continue |                         continue | ||||||
|             else: |             else: # Unix-like | ||||||
|                 # Unix-like systems (Linux/Mac) capture method |  | ||||||
|                 self.cap = cv2.VideoCapture(self.device_index) |                 self.cap = cv2.VideoCapture(self.device_index) | ||||||
| 
 | 
 | ||||||
|             if not self.cap or not self.cap.isOpened(): |             if not self.cap or not self.cap.isOpened(): | ||||||
|                 raise RuntimeError("Failed to open camera") |                 raise RuntimeError(f"Failed to open camera with device index {self.device_index} using available methods.") | ||||||
| 
 | 
 | ||||||
|             # Configure format |             # Configure format | ||||||
|  |             # Note: Setting properties might not always work or might reset after opening. | ||||||
|  |             # It's often better to request a format the camera natively supports if known. | ||||||
|             self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) |             self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | ||||||
|             self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) |             self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | ||||||
|             self.cap.set(cv2.CAP_PROP_FPS, fps) |             self.cap.set(cv2.CAP_PROP_FPS, fps) | ||||||
| 
 | 
 | ||||||
|  |             # Verify settings if possible (actual values might differ) | ||||||
|  |             actual_width = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) | ||||||
|  |             actual_height = self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) | ||||||
|  |             actual_fps = self.cap.get(cv2.CAP_PROP_FPS) | ||||||
|  |             print(f"Requested: {width}x{height}@{fps}fps. Actual: {actual_width}x{actual_height}@{actual_fps}fps") | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|             self.is_running = True |             self.is_running = True | ||||||
|  |             self._capture_thread = threading.Thread(target=self._capture_loop, daemon=True) | ||||||
|  |             self._capture_thread.start() | ||||||
|  | 
 | ||||||
|  |             # Wait briefly for the first frame to be captured, makes initial read() more likely to succeed. | ||||||
|  |             # This is optional and can be adjusted or removed. | ||||||
|  |             threading.Event().wait(0.5) # Wait up to 0.5 seconds | ||||||
|  | 
 | ||||||
|             return True |             return True | ||||||
| 
 | 
 | ||||||
|         except Exception as e: |         except Exception as e: | ||||||
|             print(f"Failed to start capture: {str(e)}") |             print(f"Failed to start capture: {str(e)}") | ||||||
|             if self.cap: |             if self.cap: | ||||||
|                 self.cap.release() |                 self.cap.release() | ||||||
|  |                 self.cap = None | ||||||
|  |             self.is_running = False | ||||||
|             return False |             return False | ||||||
| 
 | 
 | ||||||
|     def read(self) -> Tuple[bool, Optional[np.ndarray]]: |     def read(self) -> Tuple[bool, Optional[np.ndarray]]: | ||||||
|         """Read a frame from the camera""" |         """Read the latest frame from the camera (non-blocking).""" | ||||||
|         if not self.is_running or self.cap is None: |         if not self.is_running: | ||||||
|             return False, None |             return False, None | ||||||
| 
 | 
 | ||||||
|         ret, frame = self.cap.read() |         frame_copy = None | ||||||
|         if ret: |         with self._frame_lock: | ||||||
|             self._current_frame = frame |             if self._latest_frame is not None: | ||||||
|             if self.frame_callback: |                 frame_copy = self._latest_frame.copy() | ||||||
|                 self.frame_callback(frame) | 
 | ||||||
|             return True, frame |         if frame_copy is not None: | ||||||
|         return False, None |             return True, frame_copy | ||||||
|  |         else: | ||||||
|  |             # No frame available yet, or thread stopped | ||||||
|  |             return False, None | ||||||
| 
 | 
 | ||||||
|     def release(self) -> None: |     def release(self) -> None: | ||||||
|         """Stop capture and release resources""" |         """Stop capture thread and release resources.""" | ||||||
|         if self.is_running and self.cap is not None: |         if self.is_running: | ||||||
|  |             self.is_running = False # Signal the thread to stop | ||||||
|  |             if self._capture_thread is not None: | ||||||
|  |                 self._capture_thread.join(timeout=1.0) # Wait for thread to finish | ||||||
|  |                 if self._capture_thread.is_alive(): | ||||||
|  |                     print("Warning: Capture thread did not terminate cleanly.") | ||||||
|  |             self._capture_thread = None | ||||||
|  | 
 | ||||||
|  |         if self.cap is not None: | ||||||
|             self.cap.release() |             self.cap.release() | ||||||
|             self.is_running = False |  | ||||||
|             self.cap = None |             self.cap = None | ||||||
| 
 | 
 | ||||||
|     def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: |         with self._frame_lock: # Clear last frame | ||||||
|         """Set callback for frame processing""" |             self._latest_frame = None | ||||||
|         self.frame_callback = callback |         print("Video capture released.") | ||||||
|  | 
 | ||||||
|  |     # frame_callback is removed as direct polling via read() is now non-blocking and preferred with threaded capture. | ||||||
|  |     # If a callback mechanism is still desired, it would need to be integrated carefully with the thread. | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue