Merge d4905c6bd9
into f0fae811d8
commit
69daf73c19
|
@ -39,9 +39,12 @@ def parse_args() -> None:
|
||||||
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
|
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
|
||||||
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
|
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
|
||||||
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
|
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
|
||||||
|
program.add_argument('--color-correction', help='apply color correction to the swapped face', dest='color_correction', action='store_true', default=False) # Added this line back
|
||||||
program.add_argument('--nsfw-filter', help='filter the NSFW image or video', dest='nsfw_filter', action='store_true', default=False)
|
program.add_argument('--nsfw-filter', help='filter the NSFW image or video', dest='nsfw_filter', action='store_true', default=False)
|
||||||
program.add_argument('--map-faces', help='map source target faces', dest='map_faces', action='store_true', default=False)
|
program.add_argument('--map-faces', help='map source target faces', dest='map_faces', action='store_true', default=False)
|
||||||
program.add_argument('--mouth-mask', help='mask the mouth region', dest='mouth_mask', action='store_true', default=False)
|
program.add_argument('--mouth-mask', help='mask the mouth region', dest='mouth_mask', action='store_true', default=False)
|
||||||
|
program.add_argument('--poisson-blending', help='use Poisson blending for smoother face integration', dest='poisson_blending', action='store_true', default=False)
|
||||||
|
program.add_argument('--preserve-ears', help='attempt to preserve target ears by modifying the blend mask', dest='preserve_ears', action='store_true', default=False)
|
||||||
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
|
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
|
||||||
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
|
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
|
||||||
program.add_argument('-l', '--lang', help='Ui language', default="en")
|
program.add_argument('-l', '--lang', help='Ui language', default="en")
|
||||||
|
@ -69,7 +72,10 @@ def parse_args() -> None:
|
||||||
modules.globals.keep_audio = args.keep_audio
|
modules.globals.keep_audio = args.keep_audio
|
||||||
modules.globals.keep_frames = args.keep_frames
|
modules.globals.keep_frames = args.keep_frames
|
||||||
modules.globals.many_faces = args.many_faces
|
modules.globals.many_faces = args.many_faces
|
||||||
|
modules.globals.color_correction = args.color_correction
|
||||||
modules.globals.mouth_mask = args.mouth_mask
|
modules.globals.mouth_mask = args.mouth_mask
|
||||||
|
modules.globals.use_poisson_blending = args.poisson_blending
|
||||||
|
modules.globals.preserve_target_ears = args.preserve_ears
|
||||||
modules.globals.nsfw_filter = args.nsfw_filter
|
modules.globals.nsfw_filter = args.nsfw_filter
|
||||||
modules.globals.map_faces = args.map_faces
|
modules.globals.map_faces = args.map_faces
|
||||||
modules.globals.video_encoder = args.video_encoder
|
modules.globals.video_encoder = args.video_encoder
|
||||||
|
|
|
@ -41,3 +41,10 @@ show_mouth_mask_box = False
|
||||||
mask_feather_ratio = 8
|
mask_feather_ratio = 8
|
||||||
mask_down_size = 0.50
|
mask_down_size = 0.50
|
||||||
mask_size = 1
|
mask_size = 1
|
||||||
|
use_poisson_blending = False # Added for Poisson blending
|
||||||
|
poisson_blending_feather_amount = 5 # Feathering for the mask before Poisson blending
|
||||||
|
preserve_target_ears = False # Flag to enable preserving target's ears
|
||||||
|
ear_width_ratio = 0.18 # Width of the ear exclusion box as a ratio of face bbox width
|
||||||
|
ear_height_ratio = 0.35 # Height of the ear exclusion box as a ratio of face bbox height
|
||||||
|
ear_vertical_offset_ratio = 0.20 # Vertical offset of the ear box from top of face bbox
|
||||||
|
ear_horizontal_overlap_ratio = 0.03 # How much the ear exclusion zone can overlap into the face bbox
|
||||||
|
|
|
@ -71,10 +71,43 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||||
face_swapper = get_face_swapper()
|
face_swapper = get_face_swapper()
|
||||||
|
|
||||||
# Apply the face swap
|
# Apply the face swap
|
||||||
swapped_frame = face_swapper.get(
|
swapped_frame_result = face_swapper.get( # Renamed to avoid confusion
|
||||||
temp_frame, target_face, source_face, paste_back=True
|
temp_frame, target_face, source_face, paste_back=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Ensure swapped_frame_result is not None and is a valid image
|
||||||
|
if swapped_frame_result is None or not isinstance(swapped_frame_result, np.ndarray):
|
||||||
|
logging.error("Face swap operation failed or returned invalid result.")
|
||||||
|
return temp_frame # Return original frame if swap failed
|
||||||
|
|
||||||
|
# Color Correction
|
||||||
|
if modules.globals.color_correction:
|
||||||
|
# Get the bounding box of the target face to apply color correction
|
||||||
|
# more accurately to the swapped region.
|
||||||
|
# The target_face object should have bbox attribute (x1, y1, x2, y2)
|
||||||
|
if hasattr(target_face, 'bbox'):
|
||||||
|
x1, y1, x2, y2 = target_face.bbox.astype(int)
|
||||||
|
# Ensure coordinates are within frame bounds
|
||||||
|
x1, y1 = max(0, x1), max(0, y1)
|
||||||
|
x2, y2 = min(swapped_frame_result.shape[1], x2), min(swapped_frame_result.shape[0], y2)
|
||||||
|
|
||||||
|
if x1 < x2 and y1 < y2:
|
||||||
|
swapped_face_region = swapped_frame_result[y1:y2, x1:x2]
|
||||||
|
target_face_region_original = temp_frame[y1:y2, x1:x2]
|
||||||
|
|
||||||
|
if swapped_face_region.size > 0 and target_face_region_original.size > 0:
|
||||||
|
corrected_swapped_face_region = apply_histogram_matching_color_correction(swapped_face_region, target_face_region_original)
|
||||||
|
swapped_frame_result[y1:y2, x1:x2] = corrected_swapped_face_region
|
||||||
|
else:
|
||||||
|
# Fallback to full frame color correction if regions are invalid
|
||||||
|
swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame)
|
||||||
|
else:
|
||||||
|
# Fallback to full frame color correction if bbox is invalid
|
||||||
|
swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame)
|
||||||
|
else:
|
||||||
|
# Fallback to full frame color correction if no bbox
|
||||||
|
swapped_frame_result = apply_histogram_matching_color_correction(swapped_frame_result, temp_frame)
|
||||||
|
|
||||||
if modules.globals.mouth_mask:
|
if modules.globals.mouth_mask:
|
||||||
# Create a mask for the target face
|
# Create a mask for the target face
|
||||||
face_mask = create_face_mask(target_face, temp_frame)
|
face_mask = create_face_mask(target_face, temp_frame)
|
||||||
|
@ -85,22 +118,136 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Apply the mouth area
|
# Apply the mouth area
|
||||||
swapped_frame = apply_mouth_area(
|
swapped_frame_result = apply_mouth_area(
|
||||||
swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
|
swapped_frame_result, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
|
||||||
)
|
)
|
||||||
|
|
||||||
if modules.globals.show_mouth_mask_box:
|
if modules.globals.show_mouth_mask_box:
|
||||||
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
|
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
|
||||||
swapped_frame = draw_mouth_mask_visualization(
|
swapped_frame_result = draw_mouth_mask_visualization(
|
||||||
swapped_frame, target_face, mouth_mask_data
|
swapped_frame_result, target_face, mouth_mask_data
|
||||||
)
|
)
|
||||||
|
|
||||||
return swapped_frame
|
# Poisson Blending
|
||||||
|
if modules.globals.use_poisson_blending and hasattr(target_face, 'bbox'):
|
||||||
|
# Create a mask for the swapped face region for Poisson blending
|
||||||
|
# This mask should cover the area of the swapped face.
|
||||||
|
# We can use the target_face.bbox and perhaps expand it slightly,
|
||||||
|
# or use a more precise mask from face parsing if available.
|
||||||
|
# For simplicity, using a slightly feathered convex hull of landmarks.
|
||||||
|
|
||||||
|
face_mask_for_blending = np.zeros(temp_frame.shape[:2], dtype=np.uint8)
|
||||||
|
|
||||||
|
# Prioritize using the bounding box for a tighter mask
|
||||||
|
if hasattr(target_face, 'bbox'):
|
||||||
|
x1, y1, x2, y2 = target_face.bbox.astype(int)
|
||||||
|
# Ensure coordinates are within frame bounds
|
||||||
|
x1_b, y1_b = max(0, x1), max(0, y1) # Use different var names to avoid conflict with center calculation
|
||||||
|
x2_b, y2_b = min(temp_frame.shape[1], x2), min(temp_frame.shape[0], y2)
|
||||||
|
|
||||||
|
# Create a rectangular mask based on the bounding box
|
||||||
|
if x1_b < x2_b and y1_b < y2_b:
|
||||||
|
face_mask_for_blending[y1_b:y2_b, x1_b:x2_b] = 255
|
||||||
|
else:
|
||||||
|
logging.warning("Invalid bounding box for Poisson mask. Attempting landmark-based mask.")
|
||||||
|
# Fallback to landmark-based convex hull if bbox is invalid
|
||||||
|
landmarks = target_face.landmark_2d_106 if hasattr(target_face, 'landmark_2d_106') else None
|
||||||
|
if landmarks is not None and len(landmarks) > 0:
|
||||||
|
try:
|
||||||
|
hull_points = cv2.convexHull(landmarks.astype(np.int32))
|
||||||
|
cv2.fillConvexPoly(face_mask_for_blending, hull_points, 255)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Could not form convex hull for Poisson mask from landmarks: {e}. Blending will be skipped.")
|
||||||
|
else:
|
||||||
|
logging.error("No valid bbox or landmarks for Poisson mask. Blending will be skipped.")
|
||||||
|
else:
|
||||||
|
# Fallback to landmark-based convex hull if no bbox attribute
|
||||||
|
landmarks = target_face.landmark_2d_106 if hasattr(target_face, 'landmark_2d_106') else None
|
||||||
|
if landmarks is not None and len(landmarks) > 0:
|
||||||
|
try:
|
||||||
|
hull_points = cv2.convexHull(landmarks.astype(np.int32))
|
||||||
|
cv2.fillConvexPoly(face_mask_for_blending, hull_points, 255)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Could not form convex hull for Poisson mask from landmarks (no bbox): {e}. Blending will be skipped.")
|
||||||
|
else:
|
||||||
|
logging.error("No bbox or landmarks available for Poisson mask. Blending will be skipped.")
|
||||||
|
|
||||||
|
# Subtract ear regions if preserve_target_ears is enabled
|
||||||
|
if modules.globals.preserve_target_ears and np.any(face_mask_for_blending > 0):
|
||||||
|
mfx1, mfy1, mfx2, mfy2 = target_face.bbox.astype(int)
|
||||||
|
mfw = mfx2 - mfx1
|
||||||
|
mfh = mfy2 - mfy1
|
||||||
|
|
||||||
|
ear_w = int(mfw * modules.globals.ear_width_ratio)
|
||||||
|
ear_h = int(mfh * modules.globals.ear_height_ratio)
|
||||||
|
ear_v_offset = int(mfh * modules.globals.ear_vertical_offset_ratio)
|
||||||
|
ear_overlap = int(mfw * modules.globals.ear_horizontal_overlap_ratio)
|
||||||
|
|
||||||
|
# Person's Right Ear (image left side of face bbox)
|
||||||
|
# This region in face_mask_for_blending will be set to 0
|
||||||
|
rex1 = max(0, mfx1 - ear_w + ear_overlap)
|
||||||
|
rey1 = max(0, mfy1 + ear_v_offset)
|
||||||
|
rex2 = min(temp_frame.shape[1], mfx1 + ear_overlap) # Extends slightly into face bbox for smoother transition
|
||||||
|
rey2 = min(temp_frame.shape[0], rey1 + ear_h)
|
||||||
|
if rex1 < rex2 and rey1 < rey2:
|
||||||
|
cv2.rectangle(face_mask_for_blending, (rex1, rey1), (rex2, rey2), 0, -1)
|
||||||
|
|
||||||
|
# Person's Left Ear (image right side of face bbox)
|
||||||
|
lex1 = max(0, mfx2 - ear_overlap)
|
||||||
|
ley1 = max(0, mfy1 + ear_v_offset)
|
||||||
|
lex2 = min(temp_frame.shape[1], mfx2 + ear_w - ear_overlap)
|
||||||
|
ley2 = min(temp_frame.shape[0], ley1 + ear_h)
|
||||||
|
if lex1 < lex2 and ley1 < ley2:
|
||||||
|
cv2.rectangle(face_mask_for_blending, (lex1, ley1), (lex2, ley2), 0, -1)
|
||||||
|
|
||||||
|
# Feather the mask to smooth edges for Poisson blending
|
||||||
|
if np.any(face_mask_for_blending > 0): # Only feather if there's a mask
|
||||||
|
feather_amount = modules.globals.poisson_blending_feather_amount
|
||||||
|
if feather_amount > 0:
|
||||||
|
# Ensure kernel size is odd
|
||||||
|
kernel_size = 2 * feather_amount + 1
|
||||||
|
face_mask_for_blending = cv2.GaussianBlur(face_mask_for_blending, (kernel_size, kernel_size), 0)
|
||||||
|
|
||||||
|
# Calculate the center of the target face bbox for seamlessClone
|
||||||
|
if hasattr(target_face, 'bbox'):
|
||||||
|
x1, y1, x2, y2 = target_face.bbox.astype(int)
|
||||||
|
center_x = (x1 + x2) // 2
|
||||||
|
center_y = (y1 + y2) // 2
|
||||||
|
|
||||||
|
# Ensure center is within frame dimensions
|
||||||
|
center_x = np.clip(center_x, 0, temp_frame.shape[1] -1)
|
||||||
|
center_y = np.clip(center_y, 0, temp_frame.shape[0] -1)
|
||||||
|
center = (center_x, center_y)
|
||||||
|
|
||||||
|
# Apply Poisson blending
|
||||||
|
# swapped_frame_result is the source, temp_frame is the destination
|
||||||
|
if np.any(face_mask_for_blending > 0): # Proceed only if mask is not empty
|
||||||
|
try:
|
||||||
|
# Ensure swapped_frame_result and temp_frame are 8-bit 3-channel images
|
||||||
|
if swapped_frame_result.dtype != np.uint8:
|
||||||
|
swapped_frame_result = np.clip(swapped_frame_result, 0, 255).astype(np.uint8)
|
||||||
|
if temp_frame.dtype != np.uint8:
|
||||||
|
temp_frame_uint8 = np.clip(temp_frame, 0, 255).astype(np.uint8)
|
||||||
|
else:
|
||||||
|
temp_frame_uint8 = temp_frame
|
||||||
|
|
||||||
|
swapped_frame_result = cv2.seamlessClone(swapped_frame_result, temp_frame_uint8, face_mask_for_blending, center, cv2.NORMAL_CLONE)
|
||||||
|
except cv2.error as e:
|
||||||
|
logging.error(f"Error during Poisson blending: {e}")
|
||||||
|
# Fallback to non-blended result if seamlessClone fails
|
||||||
|
pass # swapped_frame_result remains as is
|
||||||
|
else:
|
||||||
|
logging.warning("Poisson blending mask is empty. Skipping Poisson blending.")
|
||||||
|
|
||||||
|
return swapped_frame_result
|
||||||
|
|
||||||
|
|
||||||
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
|
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
|
||||||
if modules.globals.color_correction:
|
# The color_correction logic was moved into swap_face.
|
||||||
temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
|
# The initial temp_frame modification `cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)`
|
||||||
|
# was incorrect as it changes the color space of the whole frame before processing,
|
||||||
|
# which is not what we want for color correction of the swapped part.
|
||||||
|
# Histogram matching is now done BGR to BGR.
|
||||||
|
|
||||||
if modules.globals.many_faces:
|
if modules.globals.many_faces:
|
||||||
many_faces = get_many_faces(temp_frame)
|
many_faces = get_many_faces(temp_frame)
|
||||||
|
@ -620,3 +767,37 @@ def apply_color_transfer(source, target):
|
||||||
source = (source - source_mean) * (target_std / source_std) + target_mean
|
source = (source - source_mean) * (target_std / source_std) + target_mean
|
||||||
|
|
||||||
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
|
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_histogram_matching_color_correction(source_img: Frame, target_img: Frame) -> Frame:
|
||||||
|
"""
|
||||||
|
Applies color correction to the source image to match the target image's color distribution
|
||||||
|
using histogram matching on each color channel.
|
||||||
|
"""
|
||||||
|
corrected_img = np.zeros_like(source_img)
|
||||||
|
for i in range(source_img.shape[2]): # Iterate over color channels (B, G, R)
|
||||||
|
source_hist, _ = np.histogram(source_img[:, :, i].flatten(), 256, [0, 256])
|
||||||
|
target_hist, _ = np.histogram(target_img[:, :, i].flatten(), 256, [0, 256])
|
||||||
|
|
||||||
|
# Compute cumulative distribution functions (CDFs)
|
||||||
|
source_cdf = source_hist.cumsum()
|
||||||
|
source_cdf_normalized = source_cdf * source_hist.max() / source_cdf.max() # Normalize
|
||||||
|
|
||||||
|
target_cdf = target_hist.cumsum()
|
||||||
|
target_cdf_normalized = target_cdf * target_hist.max() / target_cdf.max() # Normalize
|
||||||
|
|
||||||
|
# Create lookup table
|
||||||
|
lookup_table = np.zeros(256, 'uint8')
|
||||||
|
|
||||||
|
gj = 0
|
||||||
|
for gi in range(256):
|
||||||
|
while gj < 256 and target_cdf_normalized[gj] < source_cdf_normalized[gi]:
|
||||||
|
gj += 1
|
||||||
|
if gj == 256: # If we reach end of target_cdf, map remaining to max value
|
||||||
|
lookup_table[gi] = 255
|
||||||
|
else:
|
||||||
|
lookup_table[gi] = gj
|
||||||
|
|
||||||
|
corrected_img[:, :, i] = cv2.LUT(source_img[:, :, i], lookup_table)
|
||||||
|
|
||||||
|
return corrected_img
|
||||||
|
|
107
modules/ui.py
107
modules/ui.py
|
@ -880,42 +880,77 @@ def create_webcam_preview(camera_index: int):
|
||||||
PREVIEW.deiconify()
|
PREVIEW.deiconify()
|
||||||
|
|
||||||
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
|
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
|
||||||
|
# Get initial source image if not mapping faces
|
||||||
source_image = None
|
source_image = None
|
||||||
prev_time = time.time()
|
if not modules.globals.map_faces and modules.globals.source_path:
|
||||||
fps_update_interval = 0.5
|
try:
|
||||||
frame_count = 0
|
loaded_cv_image = cv2.imread(modules.globals.source_path)
|
||||||
fps = 0
|
if loaded_cv_image is None:
|
||||||
|
update_status(f"Error: Could not read source image at {modules.globals.source_path}")
|
||||||
|
# source_image remains None
|
||||||
|
else:
|
||||||
|
source_image = get_one_face(loaded_cv_image)
|
||||||
|
if source_image is None:
|
||||||
|
update_status(f"Error: No face detected in source image {os.path.basename(modules.globals.source_path)}")
|
||||||
|
except Exception as e:
|
||||||
|
update_status(f"Exception loading source image: {str(e)[:100]}")
|
||||||
|
source_image = None # Ensure source_image is None on any error
|
||||||
|
|
||||||
|
# If source_image is still None AND a source_path was provided (meaning user intended a swap)
|
||||||
|
# AND we are not using map_faces (which handles its own source logic for sources)
|
||||||
|
if source_image is None and modules.globals.source_path and not modules.globals.map_faces:
|
||||||
|
update_status("Warning: Live preview started, but source image is invalid or has no face. No swap will occur.")
|
||||||
|
# The live preview will start, but no swap will occur if source_image is None.
|
||||||
|
|
||||||
|
# Start the update loop
|
||||||
|
fps_data = { # Moved fps_data initialization here to be passed to the loop
|
||||||
|
"prev_time": time.time(),
|
||||||
|
"frame_count": 0,
|
||||||
|
"fps": 0.0,
|
||||||
|
"fps_update_interval": 0.5
|
||||||
|
}
|
||||||
|
update_webcam_frame_after(cap, frame_processors, source_image, fps_data)
|
||||||
|
|
||||||
|
|
||||||
|
def update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms=15): # Approx 66 FPS target for UI updates
|
||||||
|
global preview_label, ROOT, PREVIEW
|
||||||
|
|
||||||
|
if PREVIEW.state() == "withdrawn":
|
||||||
|
cap.release()
|
||||||
|
PREVIEW.withdraw() # Ensure it's withdrawn if loop exits
|
||||||
|
return
|
||||||
|
|
||||||
while True:
|
|
||||||
ret, frame = cap.read()
|
ret, frame = cap.read()
|
||||||
if not ret:
|
if not ret:
|
||||||
break
|
# Handle camera read failure or end of stream (though for webcam, it's usually continuous)
|
||||||
|
ROOT.after(delay_ms, lambda: update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms))
|
||||||
|
return
|
||||||
|
|
||||||
temp_frame = frame.copy()
|
temp_frame = frame.copy()
|
||||||
|
|
||||||
if modules.globals.live_mirror:
|
if modules.globals.live_mirror:
|
||||||
temp_frame = cv2.flip(temp_frame, 1)
|
temp_frame = cv2.flip(temp_frame, 1)
|
||||||
|
|
||||||
if modules.globals.live_resizable:
|
# Resizing based on PREVIEW window dimensions.
|
||||||
temp_frame = fit_image_to_size(
|
preview_width = PREVIEW.winfo_width()
|
||||||
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
preview_height = PREVIEW.winfo_height()
|
||||||
)
|
if preview_width > 1 and preview_height > 1: # Ensure valid dimensions
|
||||||
|
temp_frame = fit_image_to_size(temp_frame, preview_width, preview_height)
|
||||||
|
|
||||||
else:
|
|
||||||
temp_frame = fit_image_to_size(
|
|
||||||
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
|
||||||
)
|
|
||||||
|
|
||||||
if not modules.globals.map_faces:
|
if not modules.globals.map_faces:
|
||||||
if source_image is None and modules.globals.source_path:
|
# current_source_image is the source_image passed in from create_webcam_preview
|
||||||
source_image = get_one_face(cv2.imread(modules.globals.source_path))
|
# It's determined once before the loop starts. No reloading here.
|
||||||
|
current_source_image = source_image
|
||||||
|
|
||||||
for frame_processor in frame_processors:
|
for frame_processor in frame_processors:
|
||||||
if frame_processor.NAME == "DLC.FACE-ENHANCER":
|
if frame_processor.NAME == "DLC.FACE-ENHANCER":
|
||||||
if modules.globals.fp_ui["face_enhancer"]:
|
if modules.globals.fp_ui["face_enhancer"]:
|
||||||
temp_frame = frame_processor.process_frame(None, temp_frame)
|
temp_frame = frame_processor.process_frame(None, temp_frame)
|
||||||
else:
|
else: # This is the face_swapper processor or other default
|
||||||
temp_frame = frame_processor.process_frame(source_image, temp_frame)
|
if current_source_image: # Only process if source_image (from create_webcam_preview) is valid
|
||||||
|
temp_frame = frame_processor.process_frame(current_source_image, temp_frame)
|
||||||
|
# If current_source_image is None, the frame is not processed by face_swapper, effectively no swap.
|
||||||
else:
|
else:
|
||||||
modules.globals.target_path = None
|
modules.globals.target_path = None
|
||||||
for frame_processor in frame_processors:
|
for frame_processor in frame_processors:
|
||||||
|
@ -925,18 +960,19 @@ def create_webcam_preview(camera_index: int):
|
||||||
else:
|
else:
|
||||||
temp_frame = frame_processor.process_frame_v2(temp_frame)
|
temp_frame = frame_processor.process_frame_v2(temp_frame)
|
||||||
|
|
||||||
# Calculate and display FPS
|
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
frame_count += 1
|
fps_data["frame_count"] += 1
|
||||||
if current_time - prev_time >= fps_update_interval:
|
time_diff = current_time - fps_data["prev_time"]
|
||||||
fps = frame_count / (current_time - prev_time)
|
|
||||||
frame_count = 0
|
if time_diff >= fps_data.get("fps_update_interval", 0.5):
|
||||||
prev_time = current_time
|
fps_data["fps"] = fps_data["frame_count"] / time_diff
|
||||||
|
fps_data["frame_count"] = 0
|
||||||
|
fps_data["prev_time"] = current_time
|
||||||
|
|
||||||
if modules.globals.show_fps:
|
if modules.globals.show_fps:
|
||||||
cv2.putText(
|
cv2.putText(
|
||||||
temp_frame,
|
temp_frame,
|
||||||
f"FPS: {fps:.1f}",
|
f"FPS: {fps_data['fps']:.1f}",
|
||||||
(10, 30),
|
(10, 30),
|
||||||
cv2.FONT_HERSHEY_SIMPLEX,
|
cv2.FONT_HERSHEY_SIMPLEX,
|
||||||
1,
|
1,
|
||||||
|
@ -944,20 +980,19 @@ def create_webcam_preview(camera_index: int):
|
||||||
2,
|
2,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if temp_frame is not None and temp_frame.size > 0:
|
||||||
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
|
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
|
||||||
image = Image.fromarray(image)
|
pil_image = Image.fromarray(image)
|
||||||
image = ImageOps.contain(
|
|
||||||
image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
|
contained_image = ImageOps.contain(
|
||||||
|
pil_image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
|
||||||
)
|
)
|
||||||
image = ctk.CTkImage(image, size=image.size)
|
ctk_image = ctk.CTkImage(contained_image, size=contained_image.size)
|
||||||
preview_label.configure(image=image)
|
preview_label.configure(image=ctk_image)
|
||||||
ROOT.update()
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
if PREVIEW.state() == "withdrawn":
|
ROOT.after(delay_ms, lambda: update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms))
|
||||||
break
|
|
||||||
|
|
||||||
cap.release()
|
|
||||||
PREVIEW.withdraw()
|
|
||||||
|
|
||||||
|
|
||||||
def create_source_target_popup_for_webcam(
|
def create_source_target_popup_for_webcam(
|
||||||
|
|
|
@ -12,83 +12,142 @@ if platform.system() == "Windows":
|
||||||
class VideoCapturer:
|
class VideoCapturer:
|
||||||
def __init__(self, device_index: int):
|
def __init__(self, device_index: int):
|
||||||
self.device_index = device_index
|
self.device_index = device_index
|
||||||
self.frame_callback = None
|
self._latest_frame: Optional[np.ndarray] = None
|
||||||
self._current_frame = None
|
self._frame_lock = threading.Lock()
|
||||||
self._frame_ready = threading.Event()
|
|
||||||
self.is_running = False
|
self.is_running = False
|
||||||
self.cap = None
|
self.cap: Optional[cv2.VideoCapture] = None
|
||||||
|
self._capture_thread: Optional[threading.Thread] = None
|
||||||
|
|
||||||
# Initialize Windows-specific components if on Windows
|
# Initialize Windows-specific components if on Windows
|
||||||
if platform.system() == "Windows":
|
if platform.system() == "Windows":
|
||||||
|
try:
|
||||||
self.graph = FilterGraph()
|
self.graph = FilterGraph()
|
||||||
# Verify device exists
|
# Verify device exists
|
||||||
devices = self.graph.get_input_devices()
|
devices = self.graph.get_input_devices()
|
||||||
if self.device_index >= len(devices):
|
if self.device_index >= len(devices):
|
||||||
raise ValueError(
|
# Fallback or logging, rather than immediate raise for flexibility
|
||||||
f"Invalid device index {device_index}. Available devices: {len(devices)}"
|
print(f"Warning: Device index {device_index} might be out of range. Available: {len(devices)}. Will attempt to open anyway.")
|
||||||
)
|
except Exception as e:
|
||||||
|
print(f"Warning: Could not initialize FilterGraph for device enumeration: {e}")
|
||||||
|
self.graph = None
|
||||||
|
|
||||||
|
|
||||||
|
def _capture_loop(self) -> None:
|
||||||
|
while self.is_running and self.cap is not None:
|
||||||
|
try:
|
||||||
|
ret, frame = self.cap.read()
|
||||||
|
if ret:
|
||||||
|
with self._frame_lock:
|
||||||
|
self._latest_frame = frame
|
||||||
|
else:
|
||||||
|
# Handle camera read failure, e.g., camera disconnected
|
||||||
|
print("Warning: Failed to read frame from camera in capture loop.")
|
||||||
|
# Small sleep to prevent tight loop on continuous read errors
|
||||||
|
threading.Event().wait(0.1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error in capture loop: {e}")
|
||||||
|
self.is_running = False # Stop loop on critical error
|
||||||
|
break
|
||||||
|
# Small sleep to yield execution and not busy-wait if camera FPS is low
|
||||||
|
# Adjust sleep time as needed; too high adds latency, too low uses more CPU.
|
||||||
|
threading.Event().wait(0.001) # 1 ms sleep
|
||||||
|
|
||||||
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
|
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
|
||||||
"""Initialize and start video capture"""
|
"""Initialize and start video capture in a separate thread."""
|
||||||
|
if self.is_running:
|
||||||
|
print("Capture already running.")
|
||||||
|
return True
|
||||||
try:
|
try:
|
||||||
if platform.system() == "Windows":
|
if platform.system() == "Windows":
|
||||||
# Windows-specific capture methods
|
|
||||||
capture_methods = [
|
capture_methods = [
|
||||||
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first
|
(self.device_index, cv2.CAP_DSHOW),
|
||||||
(self.device_index, cv2.CAP_ANY), # Then try default backend
|
(self.device_index, cv2.CAP_MSMF),
|
||||||
(-1, cv2.CAP_ANY), # Try -1 as fallback
|
(self.device_index, cv2.CAP_ANY),
|
||||||
(0, cv2.CAP_ANY), # Finally try 0 without specific backend
|
(-1, cv2.CAP_ANY),
|
||||||
|
(0, cv2.CAP_ANY)
|
||||||
]
|
]
|
||||||
|
|
||||||
for dev_id, backend in capture_methods:
|
for dev_id, backend in capture_methods:
|
||||||
try:
|
try:
|
||||||
self.cap = cv2.VideoCapture(dev_id, backend)
|
self.cap = cv2.VideoCapture(dev_id, backend)
|
||||||
if self.cap.isOpened():
|
if self.cap and self.cap.isOpened():
|
||||||
|
print(f"Successfully opened camera {dev_id} with backend {backend}")
|
||||||
break
|
break
|
||||||
|
if self.cap:
|
||||||
self.cap.release()
|
self.cap.release()
|
||||||
|
self.cap = None
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
else:
|
else: # Unix-like
|
||||||
# Unix-like systems (Linux/Mac) capture method
|
|
||||||
self.cap = cv2.VideoCapture(self.device_index)
|
self.cap = cv2.VideoCapture(self.device_index)
|
||||||
|
|
||||||
if not self.cap or not self.cap.isOpened():
|
if not self.cap or not self.cap.isOpened():
|
||||||
raise RuntimeError("Failed to open camera")
|
raise RuntimeError(f"Failed to open camera with device index {self.device_index} using available methods.")
|
||||||
|
|
||||||
# Configure format
|
# Configure format
|
||||||
|
# Note: Setting properties might not always work or might reset after opening.
|
||||||
|
# It's often better to request a format the camera natively supports if known.
|
||||||
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
||||||
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
||||||
self.cap.set(cv2.CAP_PROP_FPS, fps)
|
self.cap.set(cv2.CAP_PROP_FPS, fps)
|
||||||
|
|
||||||
|
# Verify settings if possible (actual values might differ)
|
||||||
|
actual_width = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)
|
||||||
|
actual_height = self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
||||||
|
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||||||
|
print(f"Requested: {width}x{height}@{fps}fps. Actual: {actual_width}x{actual_height}@{actual_fps}fps")
|
||||||
|
|
||||||
|
|
||||||
self.is_running = True
|
self.is_running = True
|
||||||
|
self._capture_thread = threading.Thread(target=self._capture_loop, daemon=True)
|
||||||
|
self._capture_thread.start()
|
||||||
|
|
||||||
|
# Wait briefly for the first frame to be captured, makes initial read() more likely to succeed.
|
||||||
|
# This is optional and can be adjusted or removed.
|
||||||
|
threading.Event().wait(0.5) # Wait up to 0.5 seconds
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Failed to start capture: {str(e)}")
|
print(f"Failed to start capture: {str(e)}")
|
||||||
if self.cap:
|
if self.cap:
|
||||||
self.cap.release()
|
self.cap.release()
|
||||||
|
self.cap = None
|
||||||
|
self.is_running = False
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
|
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
|
||||||
"""Read a frame from the camera"""
|
"""Read the latest frame from the camera (non-blocking)."""
|
||||||
if not self.is_running or self.cap is None:
|
if not self.is_running:
|
||||||
return False, None
|
return False, None
|
||||||
|
|
||||||
ret, frame = self.cap.read()
|
frame_copy = None
|
||||||
if ret:
|
with self._frame_lock:
|
||||||
self._current_frame = frame
|
if self._latest_frame is not None:
|
||||||
if self.frame_callback:
|
frame_copy = self._latest_frame.copy()
|
||||||
self.frame_callback(frame)
|
|
||||||
return True, frame
|
if frame_copy is not None:
|
||||||
|
return True, frame_copy
|
||||||
|
else:
|
||||||
|
# No frame available yet, or thread stopped
|
||||||
return False, None
|
return False, None
|
||||||
|
|
||||||
def release(self) -> None:
|
def release(self) -> None:
|
||||||
"""Stop capture and release resources"""
|
"""Stop capture thread and release resources."""
|
||||||
if self.is_running and self.cap is not None:
|
if self.is_running:
|
||||||
|
self.is_running = False # Signal the thread to stop
|
||||||
|
if self._capture_thread is not None:
|
||||||
|
self._capture_thread.join(timeout=1.0) # Wait for thread to finish
|
||||||
|
if self._capture_thread.is_alive():
|
||||||
|
print("Warning: Capture thread did not terminate cleanly.")
|
||||||
|
self._capture_thread = None
|
||||||
|
|
||||||
|
if self.cap is not None:
|
||||||
self.cap.release()
|
self.cap.release()
|
||||||
self.is_running = False
|
|
||||||
self.cap = None
|
self.cap = None
|
||||||
|
|
||||||
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
|
with self._frame_lock: # Clear last frame
|
||||||
"""Set callback for frame processing"""
|
self._latest_frame = None
|
||||||
self.frame_callback = callback
|
print("Video capture released.")
|
||||||
|
|
||||||
|
# frame_callback is removed as direct polling via read() is now non-blocking and preferred with threaded capture.
|
||||||
|
# If a callback mechanism is still desired, it would need to be integrated carefully with the thread.
|
||||||
|
|
Loading…
Reference in New Issue