Compare commits

..

No commits in common. "d4905c6bd9e27441eccbf39cbdc2b8168c046bc5" and "8b61cc691fd71eb8e7464afcf3a59e9afd2be6ba" have entirely different histories.

2 changed files with 101 additions and 195 deletions

View File

@ -880,77 +880,42 @@ def create_webcam_preview(camera_index: int):
PREVIEW.deiconify() PREVIEW.deiconify()
frame_processors = get_frame_processors_modules(modules.globals.frame_processors) frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
# Get initial source image if not mapping faces
source_image = None source_image = None
if not modules.globals.map_faces and modules.globals.source_path: prev_time = time.time()
try: fps_update_interval = 0.5
loaded_cv_image = cv2.imread(modules.globals.source_path) frame_count = 0
if loaded_cv_image is None: fps = 0
update_status(f"Error: Could not read source image at {modules.globals.source_path}")
# source_image remains None
else:
source_image = get_one_face(loaded_cv_image)
if source_image is None:
update_status(f"Error: No face detected in source image {os.path.basename(modules.globals.source_path)}")
except Exception as e:
update_status(f"Exception loading source image: {str(e)[:100]}")
source_image = None # Ensure source_image is None on any error
# If source_image is still None AND a source_path was provided (meaning user intended a swap)
# AND we are not using map_faces (which handles its own source logic for sources)
if source_image is None and modules.globals.source_path and not modules.globals.map_faces:
update_status("Warning: Live preview started, but source image is invalid or has no face. No swap will occur.")
# The live preview will start, but no swap will occur if source_image is None.
# Start the update loop
fps_data = { # Moved fps_data initialization here to be passed to the loop
"prev_time": time.time(),
"frame_count": 0,
"fps": 0.0,
"fps_update_interval": 0.5
}
update_webcam_frame_after(cap, frame_processors, source_image, fps_data)
def update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms=15): # Approx 66 FPS target for UI updates
global preview_label, ROOT, PREVIEW
if PREVIEW.state() == "withdrawn":
cap.release()
PREVIEW.withdraw() # Ensure it's withdrawn if loop exits
return
while True:
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
# Handle camera read failure or end of stream (though for webcam, it's usually continuous) break
ROOT.after(delay_ms, lambda: update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms))
return
temp_frame = frame.copy() temp_frame = frame.copy()
if modules.globals.live_mirror: if modules.globals.live_mirror:
temp_frame = cv2.flip(temp_frame, 1) temp_frame = cv2.flip(temp_frame, 1)
# Resizing based on PREVIEW window dimensions. if modules.globals.live_resizable:
preview_width = PREVIEW.winfo_width() temp_frame = fit_image_to_size(
preview_height = PREVIEW.winfo_height() temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
if preview_width > 1 and preview_height > 1: # Ensure valid dimensions )
temp_frame = fit_image_to_size(temp_frame, preview_width, preview_height)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
if not modules.globals.map_faces: if not modules.globals.map_faces:
# current_source_image is the source_image passed in from create_webcam_preview if source_image is None and modules.globals.source_path:
# It's determined once before the loop starts. No reloading here. source_image = get_one_face(cv2.imread(modules.globals.source_path))
current_source_image = source_image
for frame_processor in frame_processors: for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER": if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]: if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame(None, temp_frame) temp_frame = frame_processor.process_frame(None, temp_frame)
else: # This is the face_swapper processor or other default else:
if current_source_image: # Only process if source_image (from create_webcam_preview) is valid temp_frame = frame_processor.process_frame(source_image, temp_frame)
temp_frame = frame_processor.process_frame(current_source_image, temp_frame)
# If current_source_image is None, the frame is not processed by face_swapper, effectively no swap.
else: else:
modules.globals.target_path = None modules.globals.target_path = None
for frame_processor in frame_processors: for frame_processor in frame_processors:
@ -960,19 +925,18 @@ def update_webcam_frame_after(cap, frame_processors, source_image, fps_data, del
else: else:
temp_frame = frame_processor.process_frame_v2(temp_frame) temp_frame = frame_processor.process_frame_v2(temp_frame)
# Calculate and display FPS
current_time = time.time() current_time = time.time()
fps_data["frame_count"] += 1 frame_count += 1
time_diff = current_time - fps_data["prev_time"] if current_time - prev_time >= fps_update_interval:
fps = frame_count / (current_time - prev_time)
if time_diff >= fps_data.get("fps_update_interval", 0.5): frame_count = 0
fps_data["fps"] = fps_data["frame_count"] / time_diff prev_time = current_time
fps_data["frame_count"] = 0
fps_data["prev_time"] = current_time
if modules.globals.show_fps: if modules.globals.show_fps:
cv2.putText( cv2.putText(
temp_frame, temp_frame,
f"FPS: {fps_data['fps']:.1f}", f"FPS: {fps:.1f}",
(10, 30), (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, cv2.FONT_HERSHEY_SIMPLEX,
1, 1,
@ -980,19 +944,20 @@ def update_webcam_frame_after(cap, frame_processors, source_image, fps_data, del
2, 2,
) )
if temp_frame is not None and temp_frame.size > 0:
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(image) image = Image.fromarray(image)
image = ImageOps.contain(
contained_image = ImageOps.contain( image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
pil_image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
) )
ctk_image = ctk.CTkImage(contained_image, size=contained_image.size) image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=ctk_image) preview_label.configure(image=image)
else: ROOT.update()
pass
ROOT.after(delay_ms, lambda: update_webcam_frame_after(cap, frame_processors, source_image, fps_data, delay_ms)) if PREVIEW.state() == "withdrawn":
break
cap.release()
PREVIEW.withdraw()
def create_source_target_popup_for_webcam( def create_source_target_popup_for_webcam(

View File

@ -12,142 +12,83 @@ if platform.system() == "Windows":
class VideoCapturer: class VideoCapturer:
def __init__(self, device_index: int): def __init__(self, device_index: int):
self.device_index = device_index self.device_index = device_index
self._latest_frame: Optional[np.ndarray] = None self.frame_callback = None
self._frame_lock = threading.Lock() self._current_frame = None
self._frame_ready = threading.Event()
self.is_running = False self.is_running = False
self.cap: Optional[cv2.VideoCapture] = None self.cap = None
self._capture_thread: Optional[threading.Thread] = None
# Initialize Windows-specific components if on Windows # Initialize Windows-specific components if on Windows
if platform.system() == "Windows": if platform.system() == "Windows":
try:
self.graph = FilterGraph() self.graph = FilterGraph()
# Verify device exists # Verify device exists
devices = self.graph.get_input_devices() devices = self.graph.get_input_devices()
if self.device_index >= len(devices): if self.device_index >= len(devices):
# Fallback or logging, rather than immediate raise for flexibility raise ValueError(
print(f"Warning: Device index {device_index} might be out of range. Available: {len(devices)}. Will attempt to open anyway.") f"Invalid device index {device_index}. Available devices: {len(devices)}"
except Exception as e: )
print(f"Warning: Could not initialize FilterGraph for device enumeration: {e}")
self.graph = None
def _capture_loop(self) -> None:
while self.is_running and self.cap is not None:
try:
ret, frame = self.cap.read()
if ret:
with self._frame_lock:
self._latest_frame = frame
else:
# Handle camera read failure, e.g., camera disconnected
print("Warning: Failed to read frame from camera in capture loop.")
# Small sleep to prevent tight loop on continuous read errors
threading.Event().wait(0.1)
except Exception as e:
print(f"Error in capture loop: {e}")
self.is_running = False # Stop loop on critical error
break
# Small sleep to yield execution and not busy-wait if camera FPS is low
# Adjust sleep time as needed; too high adds latency, too low uses more CPU.
threading.Event().wait(0.001) # 1 ms sleep
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
"""Initialize and start video capture in a separate thread.""" """Initialize and start video capture"""
if self.is_running:
print("Capture already running.")
return True
try: try:
if platform.system() == "Windows": if platform.system() == "Windows":
# Windows-specific capture methods
capture_methods = [ capture_methods = [
(self.device_index, cv2.CAP_DSHOW), (self.device_index, cv2.CAP_DSHOW), # Try DirectShow first
(self.device_index, cv2.CAP_MSMF), (self.device_index, cv2.CAP_ANY), # Then try default backend
(self.device_index, cv2.CAP_ANY), (-1, cv2.CAP_ANY), # Try -1 as fallback
(-1, cv2.CAP_ANY), (0, cv2.CAP_ANY), # Finally try 0 without specific backend
(0, cv2.CAP_ANY)
] ]
for dev_id, backend in capture_methods: for dev_id, backend in capture_methods:
try: try:
self.cap = cv2.VideoCapture(dev_id, backend) self.cap = cv2.VideoCapture(dev_id, backend)
if self.cap and self.cap.isOpened(): if self.cap.isOpened():
print(f"Successfully opened camera {dev_id} with backend {backend}")
break break
if self.cap:
self.cap.release() self.cap.release()
self.cap = None
except Exception: except Exception:
continue continue
else: # Unix-like else:
# Unix-like systems (Linux/Mac) capture method
self.cap = cv2.VideoCapture(self.device_index) self.cap = cv2.VideoCapture(self.device_index)
if not self.cap or not self.cap.isOpened(): if not self.cap or not self.cap.isOpened():
raise RuntimeError(f"Failed to open camera with device index {self.device_index} using available methods.") raise RuntimeError("Failed to open camera")
# Configure format # Configure format
# Note: Setting properties might not always work or might reset after opening.
# It's often better to request a format the camera natively supports if known.
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.cap.set(cv2.CAP_PROP_FPS, fps) self.cap.set(cv2.CAP_PROP_FPS, fps)
# Verify settings if possible (actual values might differ)
actual_width = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
print(f"Requested: {width}x{height}@{fps}fps. Actual: {actual_width}x{actual_height}@{actual_fps}fps")
self.is_running = True self.is_running = True
self._capture_thread = threading.Thread(target=self._capture_loop, daemon=True)
self._capture_thread.start()
# Wait briefly for the first frame to be captured, makes initial read() more likely to succeed.
# This is optional and can be adjusted or removed.
threading.Event().wait(0.5) # Wait up to 0.5 seconds
return True return True
except Exception as e: except Exception as e:
print(f"Failed to start capture: {str(e)}") print(f"Failed to start capture: {str(e)}")
if self.cap: if self.cap:
self.cap.release() self.cap.release()
self.cap = None
self.is_running = False
return False return False
def read(self) -> Tuple[bool, Optional[np.ndarray]]: def read(self) -> Tuple[bool, Optional[np.ndarray]]:
"""Read the latest frame from the camera (non-blocking).""" """Read a frame from the camera"""
if not self.is_running: if not self.is_running or self.cap is None:
return False, None return False, None
frame_copy = None ret, frame = self.cap.read()
with self._frame_lock: if ret:
if self._latest_frame is not None: self._current_frame = frame
frame_copy = self._latest_frame.copy() if self.frame_callback:
self.frame_callback(frame)
if frame_copy is not None: return True, frame
return True, frame_copy
else:
# No frame available yet, or thread stopped
return False, None return False, None
def release(self) -> None: def release(self) -> None:
"""Stop capture thread and release resources.""" """Stop capture and release resources"""
if self.is_running: if self.is_running and self.cap is not None:
self.is_running = False # Signal the thread to stop
if self._capture_thread is not None:
self._capture_thread.join(timeout=1.0) # Wait for thread to finish
if self._capture_thread.is_alive():
print("Warning: Capture thread did not terminate cleanly.")
self._capture_thread = None
if self.cap is not None:
self.cap.release() self.cap.release()
self.is_running = False
self.cap = None self.cap = None
with self._frame_lock: # Clear last frame def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
self._latest_frame = None """Set callback for frame processing"""
print("Video capture released.") self.frame_callback = callback
# frame_callback is removed as direct polling via read() is now non-blocking and preferred with threaded capture.
# If a callback mechanism is still desired, it would need to be integrated carefully with the thread.