Merge d4905c6bd9 into d0d90ecc03
				
					
				
			
						commit
						608056eec6
					
				|  | @ -39,9 +39,12 @@ def parse_args() -> None: | |||
|     program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) | ||||
|     program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) | ||||
|     program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False) | ||||
|     program.add_argument('--color-correction', help='apply color correction to the swapped face', dest='color_correction', action='store_true', default=False) # Added this line back | ||||
|     program.add_argument('--nsfw-filter', help='filter the NSFW image or video', dest='nsfw_filter', action='store_true', default=False) | ||||
|     program.add_argument('--map-faces', help='map source target faces', dest='map_faces', action='store_true', default=False) | ||||
|     program.add_argument('--mouth-mask', help='mask the mouth region', dest='mouth_mask', action='store_true', default=False) | ||||
|     program.add_argument('--poisson-blending', help='use Poisson blending for smoother face integration', dest='poisson_blending', action='store_true', default=False) | ||||
|     program.add_argument('--preserve-ears', help='attempt to preserve target ears by modifying the blend mask', dest='preserve_ears', action='store_true', default=False) | ||||
|     program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9']) | ||||
|     program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]') | ||||
|     program.add_argument('-l', '--lang', help='Ui language', default="en") | ||||
|  | @ -69,7 +72,10 @@ def parse_args() -> None: | |||
|     modules.globals.keep_audio = args.keep_audio | ||||
|     modules.globals.keep_frames = args.keep_frames | ||||
|     modules.globals.many_faces = args.many_faces | ||||
|     modules.globals.color_correction = args.color_correction | ||||
|     modules.globals.mouth_mask = args.mouth_mask | ||||
|     modules.globals.use_poisson_blending = args.poisson_blending | ||||
|     modules.globals.preserve_target_ears = args.preserve_ears | ||||
|     modules.globals.nsfw_filter = args.nsfw_filter | ||||
|     modules.globals.map_faces = args.map_faces | ||||
|     modules.globals.video_encoder = args.video_encoder | ||||
|  |  | |||
|  | @ -41,3 +41,10 @@ show_mouth_mask_box = False | |||
| mask_feather_ratio = 8 | ||||
| mask_down_size = 0.50 | ||||
| mask_size = 1 | ||||
| use_poisson_blending = False # Added for Poisson blending | ||||
| poisson_blending_feather_amount = 5 # Feathering for the mask before Poisson blending | ||||
| preserve_target_ears = False # Flag to enable preserving target's ears | ||||
| ear_width_ratio = 0.18 # Width of the ear exclusion box as a ratio of face bbox width | ||||
| ear_height_ratio = 0.35 # Height of the ear exclusion box as a ratio of face bbox height | ||||
| ear_vertical_offset_ratio = 0.20 # Vertical offset of the ear box from top of face bbox | ||||
| ear_horizontal_overlap_ratio = 0.03 # How much the ear exclusion zone can overlap into the face bbox | ||||
|  |  | |||
|  | @ -76,10 +76,43 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: | |||
|     face_swapper = get_face_swapper() | ||||
| 
 | ||||
|     # Apply the face swap | ||||
|     swapped_frame = face_swapper.get( | ||||
|     swapped_frame_result = face_swapper.get( # Renamed to avoid confusion | ||||
|         temp_frame, target_face, source_face, paste_back=True | ||||
|     ) | ||||
| 
 | ||||
|     # Ensure swapped_frame_result is not None and is a valid image | ||||
|     if swapped_frame_result is None or not isinstance(swapped_frame_result, np.ndarray): | ||||
|         logging.error("Face swap operation failed or returned invalid result.") | ||||
|         return temp_frame # Return original frame if swap failed | ||||
| 
 | ||||
|     # Color Correction | ||||
|     if modules.globals.color_correction: | ||||
|         # Get the bounding box of the target face to apply color correction | ||||
|         # more accurately to the swapped region. | ||||
|         # The target_face object should have bbox attribute (x1, y1, x2, y2) | ||||
|         if hasattr(target_face, 'bbox'): | ||||
|             x1, y1, x2, y2 = target_face.bbox.astype(int) | ||||
|             # Ensure coordinates are within frame bounds | ||||
|             x1, y1 = max(0, x1), max(0, y1) | ||||
|             x2, y2 = min(swapped_frame_result.shape[1], x2), min(swapped_frame_result.shape[0], y2) | ||||
| 
 | ||||
|             if x1 < x2 and y1 < y2: | ||||
|                 swapped_face_region = swapped_frame_result[y1:y2, x1:x2] | ||||
|                 target_face_region_original = temp_frame[y1:y2, x1:x2] | ||||
| 
 | ||||
|                 if swapped_face_region.size > 0 and target_face_region_original.size > 0: | ||||
|                     corrected_swapped_face_region = apply_histogram_matching_color_correction(swapped_face_region, target_face_region_original) | ||||
|                     swapped_frame_result[y1:y2, x1:x2] = corrected_swapped_face_region | ||||
|                 else: | ||||
|                     # Fallback to full frame color correction if regions are invalid | ||||
|                     swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame) | ||||
|             else: | ||||
|                 # Fallback to full frame color correction if bbox is invalid | ||||
|                 swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame) | ||||
|         else: | ||||
|             # Fallback to full frame color correction if no bbox | ||||
|             swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame) | ||||
| 
 | ||||
|     if modules.globals.mouth_mask: | ||||
|         # Create a mask for the target face | ||||
|         face_mask = create_face_mask(target_face, temp_frame) | ||||
|  | @ -90,22 +123,136 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: | |||
|         ) | ||||
| 
 | ||||
|         # Apply the mouth area | ||||
|         swapped_frame = apply_mouth_area( | ||||
|             swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon | ||||
|         swapped_frame_result = apply_mouth_area( | ||||
|             swapped_frame_result, mouth_cutout, mouth_box, face_mask, lower_lip_polygon | ||||
|         ) | ||||
| 
 | ||||
|         if modules.globals.show_mouth_mask_box: | ||||
|             mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) | ||||
|             swapped_frame = draw_mouth_mask_visualization( | ||||
|                 swapped_frame, target_face, mouth_mask_data | ||||
|             swapped_frame_result = draw_mouth_mask_visualization( | ||||
|                 swapped_frame_result, target_face, mouth_mask_data | ||||
|             ) | ||||
| 
 | ||||
|     return swapped_frame | ||||
|     # Poisson Blending | ||||
|     if modules.globals.use_poisson_blending and hasattr(target_face, 'bbox'): | ||||
|         # Create a mask for the swapped face region for Poisson blending | ||||
|         # This mask should cover the area of the swapped face. | ||||
|         # We can use the target_face.bbox and perhaps expand it slightly, | ||||
|         # or use a more precise mask from face parsing if available. | ||||
|         # For simplicity, using a slightly feathered convex hull of landmarks. | ||||
| 
 | ||||
|         face_mask_for_blending = np.zeros(temp_frame.shape[:2], dtype=np.uint8) | ||||
| 
 | ||||
|         # Prioritize using the bounding box for a tighter mask | ||||
|         if hasattr(target_face, 'bbox'): | ||||
|             x1, y1, x2, y2 = target_face.bbox.astype(int) | ||||
|             # Ensure coordinates are within frame bounds | ||||
|             x1_b, y1_b = max(0, x1), max(0, y1) # Use different var names to avoid conflict with center calculation | ||||
|             x2_b, y2_b = min(temp_frame.shape[1], x2), min(temp_frame.shape[0], y2) | ||||
| 
 | ||||
|             # Create a rectangular mask based on the bounding box | ||||
|             if x1_b < x2_b and y1_b < y2_b: | ||||
|                 face_mask_for_blending[y1_b:y2_b, x1_b:x2_b] = 255 | ||||
|             else: | ||||
|                 logging.warning("Invalid bounding box for Poisson mask. Attempting landmark-based mask.") | ||||
|                 # Fallback to landmark-based convex hull if bbox is invalid | ||||
|                 landmarks = target_face.landmark_2d_106 if hasattr(target_face, 'landmark_2d_106') else None | ||||
|                 if landmarks is not None and len(landmarks) > 0: | ||||
|                     try: | ||||
|                         hull_points = cv2.convexHull(landmarks.astype(np.int32)) | ||||
|                         cv2.fillConvexPoly(face_mask_for_blending, hull_points, 255) | ||||
|                     except Exception as e: | ||||
|                         logging.error(f"Could not form convex hull for Poisson mask from landmarks: {e}. Blending will be skipped.") | ||||
|                 else: | ||||
|                     logging.error("No valid bbox or landmarks for Poisson mask. Blending will be skipped.") | ||||
|         else: | ||||
|             # Fallback to landmark-based convex hull if no bbox attribute | ||||
|             landmarks = target_face.landmark_2d_106 if hasattr(target_face, 'landmark_2d_106') else None | ||||
|             if landmarks is not None and len(landmarks) > 0: | ||||
|                 try: | ||||
|                     hull_points = cv2.convexHull(landmarks.astype(np.int32)) | ||||
|                     cv2.fillConvexPoly(face_mask_for_blending, hull_points, 255) | ||||
|                 except Exception as e: | ||||
|                     logging.error(f"Could not form convex hull for Poisson mask from landmarks (no bbox): {e}. Blending will be skipped.") | ||||
|             else: | ||||
|                 logging.error("No bbox or landmarks available for Poisson mask. Blending will be skipped.") | ||||
| 
 | ||||
|         # Subtract ear regions if preserve_target_ears is enabled | ||||
|         if modules.globals.preserve_target_ears and np.any(face_mask_for_blending > 0): | ||||
|             mfx1, mfy1, mfx2, mfy2 = target_face.bbox.astype(int) | ||||
|             mfw = mfx2 - mfx1 | ||||
|             mfh = mfy2 - mfy1 | ||||
| 
 | ||||
|             ear_w = int(mfw * modules.globals.ear_width_ratio) | ||||
|             ear_h = int(mfh * modules.globals.ear_height_ratio) | ||||
|             ear_v_offset = int(mfh * modules.globals.ear_vertical_offset_ratio) | ||||
|             ear_overlap = int(mfw * modules.globals.ear_horizontal_overlap_ratio) | ||||
| 
 | ||||
|             # Person's Right Ear (image left side of face bbox) | ||||
|             # This region in face_mask_for_blending will be set to 0 | ||||
|             rex1 = max(0, mfx1 - ear_w + ear_overlap) | ||||
|             rey1 = max(0, mfy1 + ear_v_offset) | ||||
|             rex2 = min(temp_frame.shape[1], mfx1 + ear_overlap) # Extends slightly into face bbox for smoother transition | ||||
|             rey2 = min(temp_frame.shape[0], rey1 + ear_h) | ||||
|             if rex1 < rex2 and rey1 < rey2: | ||||
|                 cv2.rectangle(face_mask_for_blending, (rex1, rey1), (rex2, rey2), 0, -1) | ||||
| 
 | ||||
|             # Person's Left Ear (image right side of face bbox) | ||||
|             lex1 = max(0, mfx2 - ear_overlap) | ||||
|             ley1 = max(0, mfy1 + ear_v_offset) | ||||
|             lex2 = min(temp_frame.shape[1], mfx2 + ear_w - ear_overlap) | ||||
|             ley2 = min(temp_frame.shape[0], ley1 + ear_h) | ||||
|             if lex1 < lex2 and ley1 < ley2: | ||||
|                 cv2.rectangle(face_mask_for_blending, (lex1, ley1), (lex2, ley2), 0, -1) | ||||
| 
 | ||||
|         # Feather the mask to smooth edges for Poisson blending | ||||
|         if np.any(face_mask_for_blending > 0): # Only feather if there's a mask | ||||
|             feather_amount = modules.globals.poisson_blending_feather_amount | ||||
|             if feather_amount > 0: | ||||
|                 # Ensure kernel size is odd | ||||
|                 kernel_size = 2 * feather_amount + 1 | ||||
|                 face_mask_for_blending = cv2.GaussianBlur(face_mask_for_blending, (kernel_size, kernel_size), 0) | ||||
| 
 | ||||
|         # Calculate the center of the target face bbox for seamlessClone | ||||
|         if hasattr(target_face, 'bbox'): | ||||
|             x1, y1, x2, y2 = target_face.bbox.astype(int) | ||||
|             center_x = (x1 + x2) // 2 | ||||
|             center_y = (y1 + y2) // 2 | ||||
| 
 | ||||
|             # Ensure center is within frame dimensions | ||||
|             center_x = np.clip(center_x, 0, temp_frame.shape[1] -1) | ||||
|             center_y = np.clip(center_y, 0, temp_frame.shape[0] -1) | ||||
|             center = (center_x, center_y) | ||||
| 
 | ||||
|             # Apply Poisson blending | ||||
|             # swapped_frame_result is the source, temp_frame is the destination | ||||
|             if np.any(face_mask_for_blending > 0): # Proceed only if mask is not empty | ||||
|                 try: | ||||
|                     # Ensure swapped_frame_result and temp_frame are 8-bit 3-channel images | ||||
|                     if swapped_frame_result.dtype != np.uint8: | ||||
|                         swapped_frame_result = np.clip(swapped_frame_result, 0, 255).astype(np.uint8) | ||||
|                     if temp_frame.dtype != np.uint8: | ||||
|                         temp_frame_uint8 = np.clip(temp_frame, 0, 255).astype(np.uint8) | ||||
|                     else: | ||||
|                         temp_frame_uint8 = temp_frame | ||||
| 
 | ||||
|                     swapped_frame_result = cv2.seamlessClone(swapped_frame_result, temp_frame_uint8, face_mask_for_blending, center, cv2.NORMAL_CLONE) | ||||
|                 except cv2.error as e: | ||||
|                     logging.error(f"Error during Poisson blending: {e}") | ||||
|                     # Fallback to non-blended result if seamlessClone fails | ||||
|                     pass # swapped_frame_result remains as is | ||||
|             else: | ||||
|                 logging.warning("Poisson blending mask is empty. Skipping Poisson blending.") | ||||
| 
 | ||||
|     return swapped_frame_result | ||||
| 
 | ||||
| 
 | ||||
| def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | ||||
|     if modules.globals.color_correction: | ||||
|         temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) | ||||
|     # The color_correction logic was moved into swap_face. | ||||
|     # The initial temp_frame modification `cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)` | ||||
|     # was incorrect as it changes the color space of the whole frame before processing, | ||||
|     # which is not what we want for color correction of the swapped part. | ||||
|     # Histogram matching is now done BGR to BGR. | ||||
| 
 | ||||
|     if modules.globals.many_faces: | ||||
|         many_faces = get_many_faces(temp_frame) | ||||
|  | @ -625,3 +772,37 @@ def apply_color_transfer(source, target): | |||
|     source = (source - source_mean) * (target_std / source_std) + target_mean | ||||
| 
 | ||||
|     return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) | ||||
| 
 | ||||
| 
 | ||||
| def apply_histogram_matching_color_correction(source_img: Frame, target_img: Frame) -> Frame: | ||||
|     """ | ||||
|     Applies color correction to the source image to match the target image's color distribution | ||||
|     using histogram matching on each color channel. | ||||
|     """ | ||||
|     corrected_img = np.zeros_like(source_img) | ||||
|     for i in range(source_img.shape[2]):  # Iterate over color channels (B, G, R) | ||||
|         source_hist, _ = np.histogram(source_img[:, :, i].flatten(), 256, [0, 256]) | ||||
|         target_hist, _ = np.histogram(target_img[:, :, i].flatten(), 256, [0, 256]) | ||||
| 
 | ||||
|         # Compute cumulative distribution functions (CDFs) | ||||
|         source_cdf = source_hist.cumsum() | ||||
|         source_cdf_normalized = source_cdf * source_hist.max() / source_cdf.max() # Normalize | ||||
| 
 | ||||
|         target_cdf = target_hist.cumsum() | ||||
|         target_cdf_normalized = target_cdf * target_hist.max() / target_cdf.max() # Normalize | ||||
| 
 | ||||
|         # Create lookup table | ||||
|         lookup_table = np.zeros(256, 'uint8') | ||||
| 
 | ||||
|         gj = 0 | ||||
|         for gi in range(256): | ||||
|             while gj < 256 and target_cdf_normalized[gj] < source_cdf_normalized[gi]: | ||||
|                 gj += 1 | ||||
|             if gj == 256: # If we reach end of target_cdf, map remaining to max value | ||||
|                 lookup_table[gi] = 255 | ||||
|             else: | ||||
|                 lookup_table[gi] = gj | ||||
| 
 | ||||
|         corrected_img[:, :, i] = cv2.LUT(source_img[:, :, i], lookup_table) | ||||
| 
 | ||||
|     return corrected_img | ||||
|  |  | |||
								
									
									
										
											107
										
									
									modules/ui.py
									
									
									
									
								
								
							
							
										
											107
										
									
									modules/ui.py
									
									
									
									
								|  | @ -880,42 +880,77 @@ def create_webcam_preview(camera_index: int): | |||
|     PREVIEW.deiconify() | ||||
| 
 | ||||
|     frame_processors = get_frame_processors_modules(modules.globals.frame_processors) | ||||
|     # Get initial source image if not mapping faces | ||||
|     source_image = None | ||||
|     prev_time = time.time() | ||||
|     fps_update_interval = 0.5 | ||||
|     frame_count = 0 | ||||
|     fps = 0 | ||||
|     if not modules.globals.map_faces and modules.globals.source_path: | ||||
|         try: | ||||
|             loaded_cv_image = cv2.imread(modules.globals.source_path) | ||||
|             if loaded_cv_image is None: | ||||
|                 update_status(f"Error: Could not read source image at {modules.globals.source_path}") | ||||
|                 # source_image remains None | ||||
|             else: | ||||
|                 source_image = get_one_face(loaded_cv_image) | ||||
|                 if source_image is None: | ||||
|                     update_status(f"Error: No face detected in source image {os.path.basename(modules.globals.source_path)}") | ||||
|         except Exception as e: | ||||
|             update_status(f"Exception loading source image: {str(e)[:100]}") | ||||
|             source_image = None # Ensure source_image is None on any error | ||||
| 
 | ||||
|     # If source_image is still None AND a source_path was provided (meaning user intended a swap) | ||||
|     # AND we are not using map_faces (which handles its own source logic for sources) | ||||
|     if source_image is None and modules.globals.source_path and not modules.globals.map_faces: | ||||
|         update_status("Warning: Live preview started, but source image is invalid or has no face. No swap will occur.") | ||||
|         # The live preview will start, but no swap will occur if source_image is None. | ||||
| 
 | ||||
|     # Start the update loop | ||||
|     fps_data = { # Moved fps_data initialization here to be passed to the loop | ||||
|         "prev_time": time.time(), | ||||
|         "frame_count": 0, | ||||
|         "fps": 0.0, | ||||
|         "fps_update_interval": 0.5 | ||||
|     } | ||||
|     update_webcam_frame_after(cap, frame_processors, source_image, fps_data) | ||||
| 
 | ||||
| 
 | ||||
| def update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms=15): # Approx 66 FPS target for UI updates | ||||
|     global preview_label, ROOT, PREVIEW | ||||
| 
 | ||||
|     if PREVIEW.state() == "withdrawn": | ||||
|         cap.release() | ||||
|         PREVIEW.withdraw() # Ensure it's withdrawn if loop exits | ||||
|         return | ||||
| 
 | ||||
|     while True: | ||||
|     ret, frame = cap.read() | ||||
|     if not ret: | ||||
|             break | ||||
|         # Handle camera read failure or end of stream (though for webcam, it's usually continuous) | ||||
|         ROOT.after(delay_ms, lambda: update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms)) | ||||
|         return | ||||
| 
 | ||||
|     temp_frame = frame.copy() | ||||
| 
 | ||||
|     if modules.globals.live_mirror: | ||||
|         temp_frame = cv2.flip(temp_frame, 1) | ||||
| 
 | ||||
|         if modules.globals.live_resizable: | ||||
|             temp_frame = fit_image_to_size( | ||||
|                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() | ||||
|             ) | ||||
|     # Resizing based on PREVIEW window dimensions. | ||||
|     preview_width = PREVIEW.winfo_width() | ||||
|     preview_height = PREVIEW.winfo_height() | ||||
|     if preview_width > 1 and preview_height > 1: # Ensure valid dimensions | ||||
|          temp_frame = fit_image_to_size(temp_frame, preview_width, preview_height) | ||||
| 
 | ||||
|         else: | ||||
|             temp_frame = fit_image_to_size( | ||||
|                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() | ||||
|             ) | ||||
| 
 | ||||
|     if not modules.globals.map_faces: | ||||
|             if source_image is None and modules.globals.source_path: | ||||
|                 source_image = get_one_face(cv2.imread(modules.globals.source_path)) | ||||
|         # current_source_image is the source_image passed in from create_webcam_preview | ||||
|         # It's determined once before the loop starts. No reloading here. | ||||
|         current_source_image = source_image | ||||
| 
 | ||||
|         for frame_processor in frame_processors: | ||||
|             if frame_processor.NAME == "DLC.FACE-ENHANCER": | ||||
|                 if modules.globals.fp_ui["face_enhancer"]: | ||||
|                     temp_frame = frame_processor.process_frame(None, temp_frame) | ||||
|                 else: | ||||
|                     temp_frame = frame_processor.process_frame(source_image, temp_frame) | ||||
|             else: # This is the face_swapper processor or other default | ||||
|                 if current_source_image: # Only process if source_image (from create_webcam_preview) is valid | ||||
|                     temp_frame = frame_processor.process_frame(current_source_image, temp_frame) | ||||
|                 # If current_source_image is None, the frame is not processed by face_swapper, effectively no swap. | ||||
|     else: | ||||
|         modules.globals.target_path = None | ||||
|         for frame_processor in frame_processors: | ||||
|  | @ -925,18 +960,19 @@ def create_webcam_preview(camera_index: int): | |||
|             else: | ||||
|                 temp_frame = frame_processor.process_frame_v2(temp_frame) | ||||
| 
 | ||||
|         # Calculate and display FPS | ||||
|     current_time = time.time() | ||||
|         frame_count += 1 | ||||
|         if current_time - prev_time >= fps_update_interval: | ||||
|             fps = frame_count / (current_time - prev_time) | ||||
|             frame_count = 0 | ||||
|             prev_time = current_time | ||||
|     fps_data["frame_count"] += 1 | ||||
|     time_diff = current_time - fps_data["prev_time"] | ||||
| 
 | ||||
|     if time_diff >= fps_data.get("fps_update_interval", 0.5): | ||||
|         fps_data["fps"] = fps_data["frame_count"] / time_diff | ||||
|         fps_data["frame_count"] = 0 | ||||
|         fps_data["prev_time"] = current_time | ||||
| 
 | ||||
|     if modules.globals.show_fps: | ||||
|         cv2.putText( | ||||
|             temp_frame, | ||||
|                 f"FPS: {fps:.1f}", | ||||
|             f"FPS: {fps_data['fps']:.1f}", | ||||
|             (10, 30), | ||||
|             cv2.FONT_HERSHEY_SIMPLEX, | ||||
|             1, | ||||
|  | @ -944,20 +980,19 @@ def create_webcam_preview(camera_index: int): | |||
|             2, | ||||
|         ) | ||||
| 
 | ||||
|     if temp_frame is not None and temp_frame.size > 0: | ||||
|         image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) | ||||
|         image = Image.fromarray(image) | ||||
|         image = ImageOps.contain( | ||||
|             image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS | ||||
|         pil_image = Image.fromarray(image) | ||||
| 
 | ||||
|         contained_image = ImageOps.contain( | ||||
|             pil_image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS | ||||
|         ) | ||||
|         image = ctk.CTkImage(image, size=image.size) | ||||
|         preview_label.configure(image=image) | ||||
|         ROOT.update() | ||||
|         ctk_image = ctk.CTkImage(contained_image, size=contained_image.size) | ||||
|         preview_label.configure(image=ctk_image) | ||||
|     else: | ||||
|         pass | ||||
| 
 | ||||
|         if PREVIEW.state() == "withdrawn": | ||||
|             break | ||||
| 
 | ||||
|     cap.release() | ||||
|     PREVIEW.withdraw() | ||||
|     ROOT.after(delay_ms, lambda: update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms)) | ||||
| 
 | ||||
| 
 | ||||
| def create_source_target_popup_for_webcam( | ||||
|  |  | |||
|  | @ -12,83 +12,142 @@ if platform.system() == "Windows": | |||
| class VideoCapturer: | ||||
|     def __init__(self, device_index: int): | ||||
|         self.device_index = device_index | ||||
|         self.frame_callback = None | ||||
|         self._current_frame = None | ||||
|         self._frame_ready = threading.Event() | ||||
|         self._latest_frame: Optional[np.ndarray] = None | ||||
|         self._frame_lock = threading.Lock() | ||||
|         self.is_running = False | ||||
|         self.cap = None | ||||
|         self.cap: Optional[cv2.VideoCapture] = None | ||||
|         self._capture_thread: Optional[threading.Thread] = None | ||||
| 
 | ||||
|         # Initialize Windows-specific components if on Windows | ||||
|         if platform.system() == "Windows": | ||||
|             try: | ||||
|                 self.graph = FilterGraph() | ||||
|                 # Verify device exists | ||||
|                 devices = self.graph.get_input_devices() | ||||
|                 if self.device_index >= len(devices): | ||||
|                 raise ValueError( | ||||
|                     f"Invalid device index {device_index}. Available devices: {len(devices)}" | ||||
|                 ) | ||||
|                     # Fallback or logging, rather than immediate raise for flexibility | ||||
|                     print(f"Warning: Device index {device_index} might be out of range. Available: {len(devices)}. Will attempt to open anyway.") | ||||
|             except Exception as e: | ||||
|                 print(f"Warning: Could not initialize FilterGraph for device enumeration: {e}") | ||||
|                 self.graph = None | ||||
| 
 | ||||
| 
 | ||||
|     def _capture_loop(self) -> None: | ||||
|         while self.is_running and self.cap is not None: | ||||
|             try: | ||||
|                 ret, frame = self.cap.read() | ||||
|                 if ret: | ||||
|                     with self._frame_lock: | ||||
|                         self._latest_frame = frame | ||||
|                 else: | ||||
|                     # Handle camera read failure, e.g., camera disconnected | ||||
|                     print("Warning: Failed to read frame from camera in capture loop.") | ||||
|                     # Small sleep to prevent tight loop on continuous read errors | ||||
|                     threading.Event().wait(0.1) | ||||
|             except Exception as e: | ||||
|                 print(f"Error in capture loop: {e}") | ||||
|                 self.is_running = False # Stop loop on critical error | ||||
|                 break | ||||
|             # Small sleep to yield execution and not busy-wait if camera FPS is low | ||||
|             # Adjust sleep time as needed; too high adds latency, too low uses more CPU. | ||||
|             threading.Event().wait(0.001) # 1 ms sleep | ||||
| 
 | ||||
|     def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: | ||||
|         """Initialize and start video capture""" | ||||
|         """Initialize and start video capture in a separate thread.""" | ||||
|         if self.is_running: | ||||
|             print("Capture already running.") | ||||
|             return True | ||||
|         try: | ||||
|             if platform.system() == "Windows": | ||||
|                 # Windows-specific capture methods | ||||
|                 capture_methods = [ | ||||
|                     (self.device_index, cv2.CAP_DSHOW),  # Try DirectShow first | ||||
|                     (self.device_index, cv2.CAP_ANY),  # Then try default backend | ||||
|                     (-1, cv2.CAP_ANY),  # Try -1 as fallback | ||||
|                     (0, cv2.CAP_ANY),  # Finally try 0 without specific backend | ||||
|                     (self.device_index, cv2.CAP_DSHOW), | ||||
|                     (self.device_index, cv2.CAP_MSMF), | ||||
|                     (self.device_index, cv2.CAP_ANY), | ||||
|                     (-1, cv2.CAP_ANY), | ||||
|                     (0, cv2.CAP_ANY) | ||||
|                 ] | ||||
| 
 | ||||
|                 for dev_id, backend in capture_methods: | ||||
|                     try: | ||||
|                         self.cap = cv2.VideoCapture(dev_id, backend) | ||||
|                         if self.cap.isOpened(): | ||||
|                         if self.cap and self.cap.isOpened(): | ||||
|                             print(f"Successfully opened camera {dev_id} with backend {backend}") | ||||
|                             break | ||||
|                         if self.cap: | ||||
|                             self.cap.release() | ||||
|                             self.cap = None | ||||
|                     except Exception: | ||||
|                         continue | ||||
|             else: | ||||
|                 # Unix-like systems (Linux/Mac) capture method | ||||
|             else: # Unix-like | ||||
|                 self.cap = cv2.VideoCapture(self.device_index) | ||||
| 
 | ||||
|             if not self.cap or not self.cap.isOpened(): | ||||
|                 raise RuntimeError("Failed to open camera") | ||||
|                 raise RuntimeError(f"Failed to open camera with device index {self.device_index} using available methods.") | ||||
| 
 | ||||
|             # Configure format | ||||
|             # Note: Setting properties might not always work or might reset after opening. | ||||
|             # It's often better to request a format the camera natively supports if known. | ||||
|             self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | ||||
|             self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | ||||
|             self.cap.set(cv2.CAP_PROP_FPS, fps) | ||||
| 
 | ||||
|             # Verify settings if possible (actual values might differ) | ||||
|             actual_width = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) | ||||
|             actual_height = self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) | ||||
|             actual_fps = self.cap.get(cv2.CAP_PROP_FPS) | ||||
|             print(f"Requested: {width}x{height}@{fps}fps. Actual: {actual_width}x{actual_height}@{actual_fps}fps") | ||||
| 
 | ||||
| 
 | ||||
|             self.is_running = True | ||||
|             self._capture_thread = threading.Thread(target=self._capture_loop, daemon=True) | ||||
|             self._capture_thread.start() | ||||
| 
 | ||||
|             # Wait briefly for the first frame to be captured, makes initial read() more likely to succeed. | ||||
|             # This is optional and can be adjusted or removed. | ||||
|             threading.Event().wait(0.5) # Wait up to 0.5 seconds | ||||
| 
 | ||||
|             return True | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             print(f"Failed to start capture: {str(e)}") | ||||
|             if self.cap: | ||||
|                 self.cap.release() | ||||
|                 self.cap = None | ||||
|             self.is_running = False | ||||
|             return False | ||||
| 
 | ||||
|     def read(self) -> Tuple[bool, Optional[np.ndarray]]: | ||||
|         """Read a frame from the camera""" | ||||
|         if not self.is_running or self.cap is None: | ||||
|         """Read the latest frame from the camera (non-blocking).""" | ||||
|         if not self.is_running: | ||||
|             return False, None | ||||
| 
 | ||||
|         ret, frame = self.cap.read() | ||||
|         if ret: | ||||
|             self._current_frame = frame | ||||
|             if self.frame_callback: | ||||
|                 self.frame_callback(frame) | ||||
|             return True, frame | ||||
|         frame_copy = None | ||||
|         with self._frame_lock: | ||||
|             if self._latest_frame is not None: | ||||
|                 frame_copy = self._latest_frame.copy() | ||||
| 
 | ||||
|         if frame_copy is not None: | ||||
|             return True, frame_copy | ||||
|         else: | ||||
|             # No frame available yet, or thread stopped | ||||
|             return False, None | ||||
| 
 | ||||
|     def release(self) -> None: | ||||
|         """Stop capture and release resources""" | ||||
|         if self.is_running and self.cap is not None: | ||||
|         """Stop capture thread and release resources.""" | ||||
|         if self.is_running: | ||||
|             self.is_running = False # Signal the thread to stop | ||||
|             if self._capture_thread is not None: | ||||
|                 self._capture_thread.join(timeout=1.0) # Wait for thread to finish | ||||
|                 if self._capture_thread.is_alive(): | ||||
|                     print("Warning: Capture thread did not terminate cleanly.") | ||||
|             self._capture_thread = None | ||||
| 
 | ||||
|         if self.cap is not None: | ||||
|             self.cap.release() | ||||
|             self.is_running = False | ||||
|             self.cap = None | ||||
| 
 | ||||
|     def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: | ||||
|         """Set callback for frame processing""" | ||||
|         self.frame_callback = callback | ||||
|         with self._frame_lock: # Clear last frame | ||||
|             self._latest_frame = None | ||||
|         print("Video capture released.") | ||||
| 
 | ||||
|     # frame_callback is removed as direct polling via read() is now non-blocking and preferred with threaded capture. | ||||
|     # If a callback mechanism is still desired, it would need to be integrated carefully with the thread. | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue