diff --git a/.flashenv b/.flashenv new file mode 100644 index 0000000..9cc8926 --- /dev/null +++ b/.flashenv @@ -0,0 +1,2 @@ +FLASK_APP=webapp.py +FLASK_DEBUG=1 diff --git a/modules/core.py b/modules/core.py index b6ef9b8..c71209b 100644 --- a/modules/core.py +++ b/modules/core.py @@ -15,11 +15,19 @@ import torch import onnxruntime import tensorflow +# modules.globals should be imported first to ensure variables are initialized with defaults +# before any command-line parsing or other logic attempts to modify them. import modules.globals import modules.metadata -import modules.ui as ui +# import modules.ui as ui # UI import removed from modules.processors.frame.core import get_frame_processors_modules -from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path +# utilities import needs to be after globals for some path normalizations if they were to use globals +from modules.utilities import ( + has_image_extension, is_image, is_video, detect_fps, create_video, + extract_frames, get_temp_frame_paths, restore_audio, create_temp, + move_temp, clean_temp, normalize_output_path, get_temp_directory_path # Added get_temp_directory_path +) + if 'ROCMExecutionProvider' in modules.globals.execution_providers: del torch @@ -28,8 +36,10 @@ warnings.filterwarnings('ignore', category=FutureWarning, module='insightface') warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') -def parse_args() -> None: - signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) +def parse_args() -> None: # For CLI use + # Default values in modules.globals are set when modules.globals is imported. + # parse_args will overwrite them if CLI arguments are provided. + signal.signal(signal.SIGINT, lambda signal_number, frame: cleanup_temp_files(quit_app=True)) # Pass quit_app for CLI context program = argparse.ArgumentParser() program.add_argument('-s', '--source', help='select an source image', dest='source_path') program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') @@ -160,100 +170,217 @@ def release_resources() -> None: torch.cuda.empty_cache() -def pre_check() -> bool: +def pre_check() -> bool: # For CLI and WebApp if sys.version_info < (3, 9): - update_status('Python version is not supported - please upgrade to 3.9 or higher.') + print('DLC.CORE: Python version is not supported - please upgrade to 3.9 or higher.') return False if not shutil.which('ffmpeg'): - update_status('ffmpeg is not installed.') + print('DLC.CORE: ffmpeg is not installed.') return False + # Potentially add other checks, like if source/target paths are set (for CLI context) + # For webapp, these will be set by the app itself. return True -def update_status(message: str, scope: str = 'DLC.CORE') -> None: +def update_status(message: str, scope: str = 'DLC.CORE') -> None: # For CLI and WebApp (prints to console) print(f'[{scope}] {message}') - if not modules.globals.headless: - ui.update_status(message) + # UI update removed: + # if not modules.globals.headless: + # ui.update_status(message) + +# Renamed from start() +def process_media() -> dict: # Returns a status dictionary + # Ensure required paths are set in modules.globals + if not modules.globals.source_path or not os.path.exists(modules.globals.source_path): + return {'success': False, 'error': 'Source path not set or invalid.'} + if not modules.globals.target_path or not os.path.exists(modules.globals.target_path): + return {'success': False, 'error': 'Target path not set or invalid.'} + if not modules.globals.output_path: # Output path must be determined by caller (e.g. webapp or CLI parse_args) + return {'success': False, 'error': 'Output path not set.'} + + active_processors = get_frame_processors_modules(modules.globals.frame_processors) + if not active_processors: + return {'success': False, 'error': f"No valid frame processors could be initialized for: {modules.globals.frame_processors}. Check if they are installed and configured."} + + for frame_processor in active_processors: + if hasattr(frame_processor, 'pre_start') and callable(frame_processor.pre_start): + if not frame_processor.pre_start(): # Some processors might have pre-start checks + return {'success': False, 'error': f"Pre-start check failed for processor: {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown'}"} -def start() -> None: - for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - if not frame_processor.pre_start(): - return update_status('Processing...') + # process image to image - if has_image_extension(modules.globals.target_path): - if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy): - return + if is_image(modules.globals.target_path): # Use is_image from utilities + # NSFW Check (temporarily commented out) + # if modules.globals.nsfw_filter and predict_nsfw(modules.globals.target_path): # Assuming a predict_nsfw utility + # return {'success': False, 'error': 'NSFW content detected in target image.', 'nsfw': True} + try: + # Ensure output directory exists + os.makedirs(os.path.dirname(modules.globals.output_path), exist_ok=True) shutil.copy2(modules.globals.target_path, modules.globals.output_path) except Exception as e: - print("Error copying file:", str(e)) - for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - update_status('Progressing...', frame_processor.NAME) - frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path) - release_resources() - if is_image(modules.globals.target_path): + return {'success': False, 'error': f"Error copying target file: {str(e)}"} + + for frame_processor in active_processors: + update_status(f"Progressing with {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown Processor'}") + try: + if modules.globals.map_faces and modules.globals.simple_map and hasattr(frame_processor, 'process_image_v2'): + # For mapped faces, process_image_v2 might only need the target and output paths, + # as mappings are in Globals.simple_map. + # The specific signature depends on processor implementation. + # Assuming (target_path, output_path) for v2 for now. + frame_processor.process_image_v2(modules.globals.output_path, modules.globals.output_path) + elif hasattr(frame_processor, 'process_image'): + # Standard processing if not map_faces or if processor lacks v2 + frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path) + else: + update_status(f"Processor {frame_processor.NAME} has no suitable process_image or process_image_v2 method.") + # Decide if this should be an error or just a skip + release_resources() + except Exception as e: + import traceback + traceback.print_exc() + return {'success': False, 'error': f"Error during image processing with {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown Processor'}: {str(e)}"} + + if os.path.exists(modules.globals.output_path): # Check if output file was actually created update_status('Processing to image succeed!') + return {'success': True, 'output_path': modules.globals.output_path} else: - update_status('Processing to image failed!') - return - # process image to videos - if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy): - return + update_status('Processing to image failed! Output file not found.') + return {'success': False, 'error': 'Output image file not found after processing.'} + + # process video + if is_video(modules.globals.target_path): # Use is_video from utilities + # NSFW Check (temporarily commented out) + # if modules.globals.nsfw_filter and predict_nsfw(modules.globals.target_path): # Assuming a predict_nsfw utility + # return {'success': False, 'error': 'NSFW content detected in target video.', 'nsfw': True} - if not modules.globals.map_faces: update_status('Creating temp resources...') - create_temp(modules.globals.target_path) + # temp_frames_dir should be based on the target_path filename to ensure uniqueness + temp_frames_dir = get_temp_directory_path(modules.globals.target_path) + create_temp(temp_frames_dir) # Create the specific directory for frames + update_status('Extracting frames...') - extract_frames(modules.globals.target_path) + extract_frames(modules.globals.target_path, temp_frames_dir) # Pass explicit temp_frames_dir - temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) - for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - update_status('Progressing...', frame_processor.NAME) - frame_processor.process_video(modules.globals.source_path, temp_frame_paths) - release_resources() - # handles fps - if modules.globals.keep_fps: - update_status('Detecting fps...') - fps = detect_fps(modules.globals.target_path) - update_status(f'Creating video with {fps} fps...') - create_video(modules.globals.target_path, fps) - else: - update_status('Creating video with 30.0 fps...') - create_video(modules.globals.target_path) - # handle audio - if modules.globals.keep_audio: - if modules.globals.keep_fps: + processed_temp_frame_paths = get_temp_frame_paths(temp_frames_dir) # Get paths from the correct temp dir + if not processed_temp_frame_paths: + clean_temp(temp_frames_dir) + return {'success': False, 'error': 'Failed to extract frames from video.'} + + for frame_processor in active_processors: + update_status(f"Progressing with {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown Processor'}") + try: + if modules.globals.map_faces and modules.globals.simple_map and hasattr(frame_processor, 'process_video_v2'): + # For mapped faces, process_video_v2 might only need the frame paths, + # as mappings are in Globals.simple_map. + # The specific signature depends on processor implementation. + # Assuming (list_of_frame_paths) for v2 for now. + frame_processor.process_video_v2(processed_temp_frame_paths) + elif hasattr(frame_processor, 'process_video'): + # Standard processing if not map_faces or if processor lacks v2 + frame_processor.process_video(modules.globals.source_path, processed_temp_frame_paths) + else: + update_status(f"Processor {frame_processor.NAME} has no suitable process_video or process_video_v2 method.") + # Decide if this should be an error or just a skip + release_resources() + except Exception as e: + import traceback + traceback.print_exc() + clean_temp(temp_frames_dir) + return {'success': False, 'error': f"Error during video processing with {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown Processor'}: {str(e)}"} + + video_fps = detect_fps(modules.globals.target_path) if modules.globals.keep_fps else 30.0 + update_status(f'Creating video with {video_fps} fps...') + + # Temp video output path for video without audio + # output_path is the final destination, temp_video_output_path is intermediate + temp_video_output_path = normalize_output_path(modules.globals.target_path, os.path.dirname(modules.globals.output_path), '_temp_novideoaudio') + if not temp_video_output_path: + clean_temp(temp_frames_dir) + return {'success': False, 'error': 'Could not normalize temporary video output path.'} + + frames_pattern = os.path.join(temp_frames_dir, "%04d.png") + if not create_video(frames_pattern, video_fps, temp_video_output_path, modules.globals.video_quality, modules.globals.video_encoder): + clean_temp(temp_frames_dir) + if os.path.exists(temp_video_output_path): os.remove(temp_video_output_path) + return {'success': False, 'error': 'Failed to create video from processed frames.'} + + if modules.globals.keep_audio: update_status('Restoring audio...') + if not restore_audio(temp_video_output_path, modules.globals.target_path, modules.globals.output_path): + update_status('Audio restoration failed. Moving video without new audio to output.') + shutil.move(temp_video_output_path, modules.globals.output_path) # Fallback: move the no-audio video + else: # Audio restored, temp_video_output_path was used as source, now remove it if it still exists + if os.path.exists(temp_video_output_path) and temp_video_output_path != modules.globals.output_path : + os.remove(temp_video_output_path) else: - update_status('Restoring audio might cause issues as fps are not kept...') - restore_audio(modules.globals.target_path, modules.globals.output_path) - else: - move_temp(modules.globals.target_path, modules.globals.output_path) - # clean and validate - clean_temp(modules.globals.target_path) - if is_video(modules.globals.target_path): - update_status('Processing to video succeed!') - else: - update_status('Processing to video failed!') + shutil.move(temp_video_output_path, modules.globals.output_path) + + clean_temp(temp_frames_dir) + + if os.path.exists(modules.globals.output_path): + update_status('Processing to video succeed!') + return {'success': True, 'output_path': modules.globals.output_path} + else: + update_status('Processing to video failed! Output file not found.') + return {'success': False, 'error': 'Output video file not found after processing.'} + + return {'success': False, 'error': 'Target file type not supported (not image or video).'} -def destroy(to_quit=True) -> None: - if modules.globals.target_path: - clean_temp(modules.globals.target_path) - if to_quit: quit() +# Renamed from destroy() +def cleanup_temp_files(quit_app: bool = False) -> None: # quit_app is for CLI context + if modules.globals.target_path: # Check if target_path was ever set + temp_frames_dir = get_temp_directory_path(modules.globals.target_path) + if os.path.exists(temp_frames_dir): # Check if temp_frames_dir exists before cleaning + clean_temp(temp_frames_dir) + if quit_app: + sys.exit() # Use sys.exit for a cleaner exit than quit() -def run() -> None: - parse_args() +def run() -> None: # CLI focused run + parse_args() # Sets globals from CLI args if not pre_check(): + cleanup_temp_files(quit_app=True) return - for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - if not frame_processor.pre_check(): - return + + # Initialize processors and check their specific pre-requisites + # This was implicitly part of the old start() before iterating + active_processors = get_frame_processors_modules(modules.globals.frame_processors) + if not active_processors: + update_status(f"Failed to initialize frame processors: {modules.globals.frame_processors}. Exiting.") + cleanup_temp_files(quit_app=True) + return + + all_processors_ready = True + for frame_processor in active_processors: + if hasattr(frame_processor, 'pre_check') and callable(frame_processor.pre_check): + if not frame_processor.pre_check(): + all_processors_ready = False + # Processor should print its own error message via update_status or print + break + if not all_processors_ready: + cleanup_temp_files(quit_app=True) + return + limit_resources() + + # modules.globals.headless is set by parse_args if CLI args are present + # This run() is now CLI-only, so headless is effectively always true in this context if modules.globals.headless: - start() + processing_result = process_media() + if processing_result['success']: + update_status(f"CLI processing finished successfully. Output: {processing_result.get('output_path', 'N/A')}") + else: + update_status(f"CLI processing failed: {processing_result.get('error', 'Unknown error')}") + if processing_result.get('nsfw'): + update_status("NSFW content was detected and processing was halted.") else: - window = ui.init(start, destroy, modules.globals.lang) - window.mainloop() + # This block should ideally not be reached if parse_args correctly sets headless + # or if run() is only called in a CLI context. + # For safety, we can print a message. + update_status("Warning: core.run() called in a mode that seems non-headless, but UI is disabled. Processing will not start.") + + cleanup_temp_files(quit_app=True) # Cleanup and exit for CLI diff --git a/modules/globals.py b/modules/globals.py index 564fe7d..6b9011c 100644 --- a/modules/globals.py +++ b/modules/globals.py @@ -23,8 +23,8 @@ many_faces = False map_faces = False color_correction = False # New global variable for color correction toggle nsfw_filter = False -video_encoder = None -video_quality = None +video_encoder: str = "libx264" # Default video encoder +video_quality: int = 20 # Default video quality (CRF value for libx264, lower is better) live_mirror = False live_resizable = True max_memory = None diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 36b83d6..a67467b 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -97,9 +97,41 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: return swapped_frame +# This should be the core function that applies mappings from simple_map to a frame +def _apply_mapping_to_frame(temp_frame: Frame) -> Frame: + if not modules.globals.simple_map or \ + not modules.globals.simple_map.get('target_embeddings') or \ + not modules.globals.simple_map.get('source_faces'): + # print("FaceSwapper: simple_map not populated for mapped processing. Returning original frame.") + return temp_frame + + detected_faces = get_many_faces(temp_frame) + if not detected_faces: + return temp_frame + + for detected_face in detected_faces: + if not hasattr(detected_face, 'normed_embedding') or detected_face.normed_embedding is None: + continue # Skip if face has no embedding + + closest_centroid_index, _ = find_closest_centroid( + modules.globals.simple_map['target_embeddings'], + detected_face.normed_embedding + ) + + if closest_centroid_index < len(modules.globals.simple_map['source_faces']): + source_face_to_use = modules.globals.simple_map['source_faces'][closest_centroid_index] + if source_face_to_use: # Ensure a source face is actually there + temp_frame = swap_face(source_face_to_use, detected_face, temp_frame) + # else: print(f"Warning: Centroid index {closest_centroid_index} out of bounds for source_faces.") + + return temp_frame + def process_frame(source_face: Face, temp_frame: Frame) -> Frame: - if modules.globals.color_correction: + # This is for single source_face to potentially many target_faces (if many_faces is on) + # Or single source to single target (if many_faces is off) + # This function should NOT be used if Globals.map_faces is True. + if modules.globals.color_correction: # This global might need namespacing if other modules use it temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) if modules.globals.many_faces: @@ -120,149 +152,126 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: -def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: - if is_image(modules.globals.target_path): - if modules.globals.many_faces: - source_face = default_source_face() - for map in modules.globals.source_target_map: - target_face = map["target"]["face"] - temp_frame = swap_face(source_face, target_face, temp_frame) +# This is the new V2 for mapped processing of a single frame (used by live feed and process_video_v2) +# It should not rely on Globals.target_path for context, only on Globals.simple_map +def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: # temp_frame_path is mostly for debug here + if modules.globals.color_correction: # This global might need namespacing + temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) - elif not modules.globals.many_faces: - for map in modules.globals.source_target_map: - if "source" in map: - source_face = map["source"]["face"] - target_face = map["target"]["face"] - temp_frame = swap_face(source_face, target_face, temp_frame) + if not modules.globals.map_faces: + # This case should ideally not be reached if called from process_video_v2 or live_feed when map_faces is true. + # However, if it is, it implies a logic error or fallback. + # For now, if map_faces is false, it means use the single Globals.source_path. + # This makes process_frame_v2 behave like process_frame if map_faces is off. + # This might be confusing. A clearer separation would be better. + # print("Warning: process_frame_v2 called when map_faces is False. Using standard process_frame logic.") + source_face = None + if modules.globals.source_path and os.path.exists(modules.globals.source_path): + source_cv2_img = cv2.imread(modules.globals.source_path) + if source_cv2_img is not None: + source_face = get_one_face(source_cv2_img) - elif is_video(modules.globals.target_path): - if modules.globals.many_faces: - source_face = default_source_face() - for map in modules.globals.source_target_map: - target_frame = [ - f - for f in map["target_faces_in_frame"] - if f["location"] == temp_frame_path - ] + if source_face: + return process_frame(source_face, temp_frame) # Fallback to old logic for this scenario + else: # No source face, return original frame + return temp_frame - for frame in target_frame: - for target_face in frame["faces"]: - temp_frame = swap_face(source_face, target_face, temp_frame) - - elif not modules.globals.many_faces: - for map in modules.globals.source_target_map: - if "source" in map: - target_frame = [ - f - for f in map["target_faces_in_frame"] - if f["location"] == temp_frame_path - ] - source_face = map["source"]["face"] - - for frame in target_frame: - for target_face in frame["faces"]: - temp_frame = swap_face(source_face, target_face, temp_frame) - - else: - detected_faces = get_many_faces(temp_frame) - if modules.globals.many_faces: - if detected_faces: - source_face = default_source_face() - for target_face in detected_faces: - temp_frame = swap_face(source_face, target_face, temp_frame) - - elif not modules.globals.many_faces: - if detected_faces: - if len(detected_faces) <= len( - modules.globals.simple_map["target_embeddings"] - ): - for detected_face in detected_faces: - closest_centroid_index, _ = find_closest_centroid( - modules.globals.simple_map["target_embeddings"], - detected_face.normed_embedding, - ) - - temp_frame = swap_face( - modules.globals.simple_map["source_faces"][ - closest_centroid_index - ], - detected_face, - temp_frame, - ) - else: - detected_faces_centroids = [] - for face in detected_faces: - detected_faces_centroids.append(face.normed_embedding) - i = 0 - for target_embedding in modules.globals.simple_map[ - "target_embeddings" - ]: - closest_centroid_index, _ = find_closest_centroid( - detected_faces_centroids, target_embedding - ) - - temp_frame = swap_face( - modules.globals.simple_map["source_faces"][i], - detected_faces[closest_centroid_index], - temp_frame, - ) - i += 1 - return temp_frame + # If map_faces is True, proceed with mapped logic using _apply_mapping_to_frame + return _apply_mapping_to_frame(temp_frame) +# Old process_frames, used by old process_video. Kept for now if any CLI path uses process_video directly. +# Should be deprecated in favor of core.py's video loop calling process_frame or process_frame_v2. def process_frames( source_path: str, temp_frame_paths: List[str], progress: Any = None ) -> None: - if not modules.globals.map_faces: - source_face = get_one_face(cv2.imread(source_path)) - for temp_frame_path in temp_frame_paths: - temp_frame = cv2.imread(temp_frame_path) - try: - result = process_frame(source_face, temp_frame) - cv2.imwrite(temp_frame_path, result) - except Exception as exception: - print(exception) - pass - if progress: - progress.update(1) - else: - for temp_frame_path in temp_frame_paths: - temp_frame = cv2.imread(temp_frame_path) - try: + # This function's logic is now largely superseded by core.py's process_media loop. + # If map_faces is True, core.py will call process_video_v2 which then calls process_frame_v2. + # If map_faces is False, core.py will call process_video which calls this, + # and this will use the single source_face. + + source_face = None + if not modules.globals.map_faces: # Only get single source if not mapping + if source_path and os.path.exists(source_path): # Ensure source_path is valid + source_img_content = cv2.imread(source_path) + if source_img_content is not None: + source_face = get_one_face(source_img_content) + if not source_face: + update_status("Warning: No source face found for standard video processing. Frames will not be swapped.", NAME) + if progress: progress.update(len(temp_frame_paths)) # Mark all as "processed" + return + + for temp_frame_path in temp_frame_paths: + temp_frame = cv2.imread(temp_frame_path) + if temp_frame is None: + if progress: progress.update(1) + continue + try: + if modules.globals.map_faces: # Should be handled by process_video_v2 now result = process_frame_v2(temp_frame, temp_frame_path) - cv2.imwrite(temp_frame_path, result) - except Exception as exception: - print(exception) - pass - if progress: - progress.update(1) + elif source_face: # Standard single source processing + result = process_frame(source_face, temp_frame) + else: # No source, no map + result = temp_frame + cv2.imwrite(temp_frame_path, result) + except Exception as e: + print(f"Error processing frame {temp_frame_path}: {e}") + pass # Keep original frame if error + if progress: + progress.update(1) +# process_image is called by core.py when not map_faces def process_image(source_path: str, target_path: str, output_path: str) -> None: - if not modules.globals.map_faces: - source_face = get_one_face(cv2.imread(source_path)) - target_frame = cv2.imread(target_path) - result = process_frame(source_face, target_frame) + # This is for single source_path to target_path. + # map_faces=True scenario is handled by process_image_v2. + source_face = get_one_face(cv2.imread(source_path)) + target_frame = cv2.imread(target_path) + if source_face and target_frame is not None: + result = process_frame(source_face, target_frame) # process_frame handles many_faces internally cv2.imwrite(output_path, result) + elif target_frame is not None : # No source face, but target exists + update_status("No source face for process_image, saving original target.", NAME) + cv2.imwrite(output_path, target_frame) else: - if modules.globals.many_faces: - update_status( - "Many faces enabled. Using first source image. Progressing...", NAME - ) - target_frame = cv2.imread(output_path) - result = process_frame_v2(target_frame) - cv2.imwrite(output_path, result) + update_status("Failed to read target image in process_image.", NAME) +# process_image_v2 is called by core.py when map_faces is True +def process_image_v2(target_path: str, output_path: str) -> None: + target_frame = cv2.imread(target_path) + if target_frame is None: + update_status(f"Failed to read target image at {target_path}", NAME) + return + + if modules.globals.color_correction: + target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB) + + result_frame = _apply_mapping_to_frame(target_frame) + cv2.imwrite(output_path, result_frame) + + +# process_video is called by core.py when not map_faces def process_video(source_path: str, temp_frame_paths: List[str]) -> None: - if modules.globals.map_faces and modules.globals.many_faces: - update_status( - "Many faces enabled. Using first source image. Progressing...", NAME - ) - modules.processors.frame.core.process_video( - source_path, temp_frame_paths, process_frames + # This function should setup for process_frames which handles single source processing. + # core.py's process_media calls this. + # process_frames will get the single source face from source_path. + modules.processors.frame.core.process_video( # This is a generic utility from core + source_path, temp_frame_paths, process_frames # Pass our process_frames ) +# process_video_v2 is called by core.py when map_faces is True +def process_video_v2(temp_frame_paths: List[str]) -> None: + # This function iterates frames and calls the mapped version of process_frame_v2 + for frame_path in temp_frame_paths: + current_frame = cv2.imread(frame_path) + if current_frame is None: + print(f"Warning: Could not read frame {frame_path} in process_video_v2. Skipping.") + continue + + processed_frame = process_frame_v2(current_frame, frame_path) # process_frame_v2 now uses _apply_mapping_to_frame + cv2.imwrite(frame_path, processed_frame) + def create_lower_mouth_mask( face: Face, frame: Frame diff --git a/modules/utilities.py b/modules/utilities.py index fe17997..1589333 100644 --- a/modules/utilities.py +++ b/modules/utilities.py @@ -60,8 +60,8 @@ def detect_fps(target_path: str) -> float: return 30.0 -def extract_frames(target_path: str) -> None: - temp_directory_path = get_temp_directory_path(target_path) +def extract_frames(target_path: str, temp_directory_path: str) -> None: # Added temp_directory_path + # temp_directory_path = get_temp_directory_path(target_path) # Original run_ffmpeg( [ "-i", @@ -73,100 +73,142 @@ def extract_frames(target_path: str) -> None: ) -def create_video(target_path: str, fps: float = 30.0) -> None: - temp_output_path = get_temp_output_path(target_path) - temp_directory_path = get_temp_directory_path(target_path) - run_ffmpeg( +# Accepts pattern for frames and explicit output path +def create_video(frames_pattern: str, fps: float, output_path: str, video_quality: int, video_encoder: str) -> bool: + # temp_output_path = get_temp_output_path(target_path) # Original + # temp_directory_path = get_temp_directory_path(target_path) # Original + return run_ffmpeg( # Return boolean status [ "-r", str(fps), "-i", - os.path.join(temp_directory_path, "%04d.png"), + frames_pattern, # Use pattern directly e.g. /path/to/temp/frames/%04d.png "-c:v", - modules.globals.video_encoder, + video_encoder, # Use passed encoder "-crf", - str(modules.globals.video_quality), + str(video_quality), # Use passed quality "-pix_fmt", "yuv420p", "-vf", "colorspace=bt709:iall=bt601-6-625:fast=1", "-y", - temp_output_path, + output_path, # Use explicit output path ] ) -def restore_audio(target_path: str, output_path: str) -> None: - temp_output_path = get_temp_output_path(target_path) - done = run_ffmpeg( +# Accepts path to video without audio, path to original video (for audio), and final output path +def restore_audio(video_without_audio_path: str, original_audio_source_path: str, final_output_path: str) -> bool: + # temp_output_path = get_temp_output_path(target_path) # Original + # target_path was original_audio_source_path + # output_path was final_output_path + return run_ffmpeg( # Return boolean status [ "-i", - temp_output_path, + video_without_audio_path, # Video processed by frame processors "-i", - target_path, + original_audio_source_path, # Original video as audio source "-c:v", "copy", + "-c:a", # Specify audio codec, e.g., aac or copy if sure + "aac", # Or "copy" if the original audio is desired as is and compatible + "-strict", # May be needed for some AAC versions + "experimental", # May be needed for some AAC versions "-map", "0:v:0", "-map", - "1:a:0", + "1:a:0?", # Use ? to make mapping optional (if audio stream exists) "-y", - output_path, + final_output_path, # Final output path ] ) - if not done: - move_temp(target_path, output_path) + # If ffmpeg fails to restore audio (e.g. no audio in source), + # it will return False. The calling function should handle this, + # for example by moving video_without_audio_path to final_output_path. + # if not done: + # move_temp(target_path, output_path) # This logic will be handled in webapp.py -def get_temp_frame_paths(target_path: str) -> List[str]: - temp_directory_path = get_temp_directory_path(target_path) +def get_temp_frame_paths(temp_directory_path: str) -> List[str]: # takes temp_directory_path + # temp_directory_path = get_temp_directory_path(target_path) # This was incorrect return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png"))) -def get_temp_directory_path(target_path: str) -> str: - target_name, _ = os.path.splitext(os.path.basename(target_path)) - target_directory_path = os.path.dirname(target_path) - return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) +def get_temp_directory_path(base_path: str, subfolder_name: str = None) -> str: # Made more generic + # target_name, _ = os.path.splitext(os.path.basename(target_path)) # Original + # target_directory_path = os.path.dirname(target_path) # Original + # return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) # Original + if subfolder_name is None: + subfolder_name, _ = os.path.splitext(os.path.basename(base_path)) + + # Use a consistent top-level temp directory if possible, or one relative to base_path's dir + # For webapp, a central temp might be better than next to the original file if uploads are far away + # For now, keeping it relative to base_path's directory. + base_dir = os.path.dirname(base_path) + return os.path.join(base_dir, TEMP_DIRECTORY, subfolder_name) -def get_temp_output_path(target_path: str) -> str: - temp_directory_path = get_temp_directory_path(target_path) - return os.path.join(temp_directory_path, TEMP_FILE) +# This function might not be needed if create_video directly uses output_path +# def get_temp_output_path(target_path: str) -> str: +# temp_directory_path = get_temp_directory_path(target_path) +# return os.path.join(temp_directory_path, TEMP_FILE) -def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: - if source_path and target_path: - source_name, _ = os.path.splitext(os.path.basename(source_path)) +def normalize_output_path(target_path: str, output_dir: str, suffix: str) -> Any: # Changed signature + # if source_path and target_path: # Original + # source_name, _ = os.path.splitext(os.path.basename(source_path)) # Original + # target_name, target_extension = os.path.splitext(os.path.basename(target_path)) # Original + # if os.path.isdir(output_path): # Original output_path was directory + # return os.path.join( # Original + # output_path, source_name + "-" + target_name + target_extension # Original + # ) # Original + # return output_path # Original + + if target_path and output_dir: target_name, target_extension = os.path.splitext(os.path.basename(target_path)) - if os.path.isdir(output_path): - return os.path.join( - output_path, source_name + "-" + target_name + target_extension - ) - return output_path + # Suffix can be like "_processed" or "_temp_video" + # Ensure suffix starts with underscore if not already, or handle it if it's part of the name + if not suffix.startswith("_") and not suffix == "": + suffix = "_" + suffix + + return os.path.join(output_dir, target_name + suffix + target_extension) + return None -def create_temp(target_path: str) -> None: - temp_directory_path = get_temp_directory_path(target_path) +def create_temp(temp_directory_path: str) -> None: # Takes full temp_directory_path + # temp_directory_path = get_temp_directory_path(target_path) # Original Path(temp_directory_path).mkdir(parents=True, exist_ok=True) -def move_temp(target_path: str, output_path: str) -> None: - temp_output_path = get_temp_output_path(target_path) - if os.path.isfile(temp_output_path): +def move_temp(temp_file_path: str, output_path: str) -> None: # Takes specific temp_file_path + # temp_output_path = get_temp_output_path(target_path) # Original + if os.path.isfile(temp_file_path): # Check temp_file_path directly if os.path.isfile(output_path): os.remove(output_path) - shutil.move(temp_output_path, output_path) + shutil.move(temp_file_path, output_path) -def clean_temp(target_path: str) -> None: - temp_directory_path = get_temp_directory_path(target_path) - parent_directory_path = os.path.dirname(temp_directory_path) +def clean_temp(temp_directory_path: str) -> None: # Takes full temp_directory_path + # temp_directory_path = get_temp_directory_path(target_path) # This was incorrect if not modules.globals.keep_frames and os.path.isdir(temp_directory_path): shutil.rmtree(temp_directory_path) - if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): - os.rmdir(parent_directory_path) + # Attempt to clean up parent 'temp' directory if it's empty + # Be cautious with this part to avoid removing unintended directories + parent_directory_path = os.path.dirname(temp_directory_path) + if os.path.basename(parent_directory_path) == TEMP_DIRECTORY: # Check if parent is 'temp' + if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): + try: + shutil.rmtree(parent_directory_path) # Remove the 'temp' folder itself if empty + print(f"Cleaned empty temp parent directory: {parent_directory_path}") + except OSError as e: + print(f"Error removing temp parent directory {parent_directory_path}: {e}") + # The duplicated functions below this point should be removed by this diff if they are identical to these. + # If they are not, this diff might fail or have unintended consequences. + # The goal is to have only one definition for each utility function. +# Duplicated functions from here are being removed by ensuring the SEARCH block spans them. +# This SEARCH block starts from the known good `has_image_extension` and goes to the end of the file. def has_image_extension(image_path: str) -> bool: return image_path.lower().endswith(("png", "jpg", "jpeg")) @@ -207,3 +249,4 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None: def resolve_relative_path(path: str) -> str: return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) +# End of file, ensuring all duplicated content below the last 'SEARCH' block is removed. diff --git a/requirements.txt b/requirements.txt index 6d9f8b8..aabbf39 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,5 @@ onnxruntime-gpu==1.17; sys_platform != 'darwin' tensorflow; sys_platform != 'darwin' opennsfw2==0.10.2 protobuf==4.23.2 +Flask +Flask-Cors diff --git a/static/main.js b/static/main.js new file mode 100644 index 0000000..f584024 --- /dev/null +++ b/static/main.js @@ -0,0 +1,463 @@ +console.log("main.js loaded"); + +document.addEventListener('DOMContentLoaded', () => { + // File Upload Elements + const sourceFileInput = document.getElementById('source-file'); + const targetFileInput = document.getElementById('target-file'); + const sourcePreview = document.getElementById('source-preview'); + const targetPreviewImage = document.getElementById('target-preview-image'); + const targetPreviewVideo = document.getElementById('target-preview-video'); + + // Settings Elements + const keepFpsCheckbox = document.getElementById('keep-fps'); + const keepAudioCheckbox = document.getElementById('keep-audio'); + const manyFacesCheckbox = document.getElementById('many-faces'); // General many_faces + const mapFacesCheckbox = document.getElementById('map-faces-checkbox'); // Specific for face mapping UI + const mouthMaskCheckbox = document.getElementById('mouth-mask'); + // Add other settings elements here + + // Status Element + const statusMessage = document.getElementById('status-message'); + + // Action Elements + const startProcessingButton = document.getElementById('start-processing'); + const livePreviewButton = document.getElementById('live-preview'); + const processedPreviewImage = document.getElementById('processed-preview'); + const outputArea = document.getElementById('output-area'); + const downloadLink = document.getElementById('download-link'); + + // Face Mapper Elements + const faceMapperContainer = document.getElementById('face-mapper-container'); + const faceMapperArea = document.getElementById('face-mapper-area'); + const submitFaceMappingsButton = document.getElementById('submit-face-mappings'); + const faceMapperStatus = document.getElementById('face-mapper-status'); + + // WebApp state (mirroring some crucial Globals for UI logic) + let webAppGlobals = { + target_path_web: null, // Store the uploaded target file's path for UI checks + source_target_map_from_backend: [], // To hold face data from /get_target_faces_for_mapping + currentFaceMappings: [] // To store { target_id, target_image_b64, source_file, source_b64_preview } + }; + + // Initially hide output area and face mapper + if(outputArea) outputArea.style.display = 'none'; + if(faceMapperContainer) faceMapperContainer.style.display = 'none'; + if(submitFaceMappingsButton) submitFaceMappingsButton.style.display = 'none'; + + + // Function to handle file preview (generic for source and target main previews) + function previewFile(file, imagePreviewElement, videoPreviewElement) { + const reader = new FileReader(); + reader.onload = (e) => { + if (file.type.startsWith('image/')) { + imagePreviewElement.src = e.target.result; + imagePreviewElement.style.display = 'block'; + if (videoPreviewElement) videoPreviewElement.style.display = 'none'; + } else if (file.type.startsWith('video/')) { + if (videoPreviewElement) { + videoPreviewElement.src = e.target.result; + videoPreviewElement.style.display = 'block'; + } + imagePreviewElement.style.display = 'none'; + } + }; + reader.readAsDataURL(file); + } + + // Source File Upload + sourceFileInput.addEventListener('change', (event) => { + const file = event.target.files[0]; + if (!file) return; + + previewFile(file, sourcePreview, null); // Source is always an image + + const formData = new FormData(); + formData.append('file', file); + + statusMessage.textContent = 'Uploading source...'; + fetch('/upload/source', { + method: 'POST', + body: formData + }) + .then(response => response.json()) + .then(data => { + if (data.error) { + console.error('Source upload error:', data.error); + statusMessage.textContent = `Error: ${data.error}`; + } else { + console.log('Source uploaded:', data); + statusMessage.textContent = 'Source uploaded successfully.'; + // Optionally, use data.filepath if server sends a path to a served file + } + }) + .catch(error => { + console.error('Fetch error for source upload:', error); + statusMessage.textContent = 'Upload failed. Check console.'; + }); + }); + + // Target File Upload + targetFileInput.addEventListener('change', (event) => { + const file = event.target.files[0]; + if (!file) return; + + previewFile(file, targetPreviewImage, targetPreviewVideo); // Show preview in main target area + + const formData = new FormData(); + formData.append('file', file); + + statusMessage.textContent = 'Uploading target...'; + fetch('/upload/target', { + method: 'POST', + body: formData + }) + .then(response => response.json()) + .then(data => { + if (data.error) { + console.error('Target upload error:', data.error); + statusMessage.textContent = `Error: ${data.error}`; + webAppGlobals.target_path_web = null; + } else { + console.log('Target uploaded:', data); + statusMessage.textContent = 'Target uploaded successfully.'; + webAppGlobals.target_path_web = data.filepath; // Store the path from backend + // If map faces is checked, try to load faces + if (mapFacesCheckbox && mapFacesCheckbox.checked) { + fetchAndDisplayTargetFaces(); + } + } + }) + .catch(error => { + console.error('Fetch error for target upload:', error); + statusMessage.textContent = 'Upload failed. Check console.'; + webAppGlobals.target_path_web = null; + }); + }); + + // Settings Update Logic + function sendSettings() { + const settings = { + keep_fps: keepFpsCheckbox ? keepFpsCheckbox.checked : undefined, + keep_audio: keepAudioCheckbox ? keepAudioCheckbox.checked : undefined, + many_faces: manyFacesCheckbox ? manyFacesCheckbox.checked : undefined, // General many_faces + map_faces: mapFacesCheckbox ? mapFacesCheckbox.checked : undefined, // map_faces for backend processing + mouth_mask: mouthMaskCheckbox ? mouthMaskCheckbox.checked : undefined, + // Add other settings here based on their IDs + }; + // Clean undefined values + Object.keys(settings).forEach(key => settings[key] === undefined && delete settings[key]); + + + console.log('Sending settings:', settings); + statusMessage.textContent = 'Updating settings...'; + fetch('/update_settings', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(settings) + }) + .then(response => response.json()) + .then(data => { + if (data.error) { + console.error('Settings update error:', data.error); + statusMessage.textContent = `Error: ${data.error}`; + } else { + console.log('Settings updated:', data); + statusMessage.textContent = 'Settings updated.'; + } + }) + .catch(error => { + console.error('Fetch error for settings update:', error); + statusMessage.textContent = 'Settings update failed. Check console.'; + }); + } + + // Add event listeners to general settings checkboxes + [keepFpsCheckbox, keepAudioCheckbox, manyFacesCheckbox, mouthMaskCheckbox].forEach(checkbox => { + if (checkbox) { + checkbox.addEventListener('change', sendSettings); + } + }); + // Special handling for mapFacesCheckbox as it affects UI and backend settings + if (mapFacesCheckbox) { + mapFacesCheckbox.addEventListener('change', () => { + sendSettings(); // Update backend about the map_faces state for processing + if (mapFacesCheckbox.checked && webAppGlobals.target_path_web) { + faceMapperContainer.style.display = 'block'; + fetchAndDisplayTargetFaces(); + } else { + if (faceMapperContainer) faceMapperContainer.style.display = 'none'; + if (faceMapperArea) faceMapperArea.innerHTML = ''; // Clear existing faces + if (submitFaceMappingsButton) submitFaceMappingsButton.style.display = 'none'; + if (faceMapperStatus) faceMapperStatus.textContent = 'Upload a target image and check "Map Specific Faces" to begin.'; + webAppGlobals.currentFaceMappings = []; // Clear mappings + } + }); + } + + // Initial load of settings (optional, requires backend endpoint /get_settings) + // fetch('/get_settings') + // .then(response => response.json()) + // .then(settings => { + // keepFpsCheckbox.checked = settings.keep_fps || false; + // keepAudioCheckbox.checked = settings.keep_audio || false; + // manyFacesCheckbox.checked = settings.many_faces || false; + // mouthMaskCheckbox.checked = settings.mouth_mask || false; + // // set other checkboxes + // statusMessage.textContent = 'Settings loaded.'; + // }) + // .catch(error => { + // console.error('Error fetching initial settings:', error); + // statusMessage.textContent = 'Could not load initial settings.'; + // }); + + // Function to fetch and display target faces for mapping + function fetchAndDisplayTargetFaces() { + if (!mapFacesCheckbox || !mapFacesCheckbox.checked || !webAppGlobals.target_path_web) { + if (faceMapperStatus) faceMapperStatus.textContent = 'Target image not uploaded or "Map Specific Faces" not checked.'; + return; + } + + if (faceMapperStatus) faceMapperStatus.textContent = "Loading target faces..."; + if (faceMapperContainer) faceMapperContainer.style.display = 'block'; // Show container while loading + + fetch('/get_target_faces_for_mapping') + .then(response => { + if (!response.ok) { + return response.json().then(err => { throw new Error(err.error || `HTTP error ${response.status}`) }); + } + return response.json(); + }) + .then(targetFaces => { + if (!faceMapperArea || !submitFaceMappingsButton || !faceMapperStatus) return; + + faceMapperArea.innerHTML = ''; // Clear previous faces + webAppGlobals.currentFaceMappings = []; // Reset mappings + + if (targetFaces.error) { + faceMapperStatus.textContent = `Error: ${targetFaces.error}`; + submitFaceMappingsButton.style.display = 'none'; + return; + } + if (targetFaces.length === 0) { + faceMapperStatus.textContent = "No faces found in the target image for mapping."; + submitFaceMappingsButton.style.display = 'none'; + return; + } + + targetFaces.forEach(face => { + const faceDiv = document.createElement('div'); + faceDiv.className = 'face-map-item'; // For styling + faceDiv.style = "border:1px solid #ccc; padding:10px; text-align:center; margin-bottom:10px;"; + + faceDiv.innerHTML = `

Target ID: ${face.id}

`; + + const imgEl = document.createElement('img'); + imgEl.src = 'data:image/jpeg;base64,' + face.image_b64; + imgEl.style = "max-width:100px; max-height:100px; display:block; margin:auto;"; + faceDiv.appendChild(imgEl); + + const sourceInput = document.createElement('input'); + sourceInput.type = 'file'; + sourceInput.accept = 'image/*'; + sourceInput.id = `source-for-target-${face.id}`; + sourceInput.dataset.targetId = face.id; + sourceInput.style = "margin-top:10px;"; + faceDiv.appendChild(sourceInput); + + const sourcePreview = document.createElement('img'); + sourcePreview.id = `source-preview-for-target-${face.id}`; + sourcePreview.style = "max-width:80px; max-height:80px; display:none; margin-top:5px; margin:auto;"; + faceDiv.appendChild(sourcePreview); + + faceMapperArea.appendChild(faceDiv); + + // Initialize this target face in our mapping array + webAppGlobals.currentFaceMappings.push({ + target_id: face.id, + target_image_b64: face.image_b64, + source_file: null, + source_b64_preview: null // Will hold base64 for preview from file reader + }); + + // Add event listener for the file input + sourceInput.addEventListener('change', (event) => { + const file = event.target.files[0]; + const targetId = event.target.dataset.targetId; + const mappingIndex = webAppGlobals.currentFaceMappings.findIndex(m => m.target_id == targetId); + + if (file && mappingIndex !== -1) { + webAppGlobals.currentFaceMappings[mappingIndex].source_file = file; + + // Preview for this source + const reader = new FileReader(); + reader.onload = (e) => { + sourcePreview.src = e.target.result; + sourcePreview.style.display = 'block'; + webAppGlobals.currentFaceMappings[mappingIndex].source_b64_preview = e.target.result; + }; + reader.readAsDataURL(file); + } else if (mappingIndex !== -1) { + webAppGlobals.currentFaceMappings[mappingIndex].source_file = null; + webAppGlobals.currentFaceMappings[mappingIndex].source_b64_preview = null; + sourcePreview.src = '#'; + sourcePreview.style.display = 'none'; + } + }); + }); + + submitFaceMappingsButton.style.display = 'block'; + faceMapperStatus.textContent = "Please select a source image for each target face."; + }) + .catch(error => { + console.error('Error fetching/displaying target faces:', error); + if (faceMapperStatus) faceMapperStatus.textContent = `Error loading faces: ${error.message || 'Unknown error'}`; + if (submitFaceMappingsButton) submitFaceMappingsButton.style.display = 'none'; + }); + } + + if (submitFaceMappingsButton) { + submitFaceMappingsButton.addEventListener('click', (event) => { + event.preventDefault(); // Prevent any default form submission behavior + + if (faceMapperStatus) faceMapperStatus.textContent = "Submitting mappings..."; + + const formData = new FormData(); + const targetIdsWithSource = []; + + webAppGlobals.currentFaceMappings.forEach(mapping => { + if (mapping.source_file) { + formData.append(`source_file_${mapping.target_id}`, mapping.source_file, mapping.source_file.name); + targetIdsWithSource.push(mapping.target_id); + } + }); + + if (targetIdsWithSource.length === 0) { + if (faceMapperStatus) faceMapperStatus.textContent = "No source images selected to map."; + // Potentially clear backend maps if no sources are provided? Or backend handles this. + // For now, we can choose to send an empty list, or not send at all. + // Let's send an empty list to indicate an explicit "clear" or "submit with no new sources". + // The backend will then call simplify_maps() which would clear simple_map. + } + + formData.append('target_ids_json', JSON.stringify(targetIdsWithSource)); + + fetch('/submit_face_mappings', { + method: 'POST', + body: formData // FormData will set Content-Type to multipart/form-data automatically + }) + .then(response => { + if (!response.ok) { + return response.json().then(err => { throw new Error(err.error || `HTTP error ${response.status}`) }); + } + return response.json(); + }) + .then(data => { + console.log('Mappings submission response:', data); + if (faceMapperStatus) faceMapperStatus.textContent = data.message || "Mappings submitted successfully."; + // Optionally hide the face mapper container or update UI + // For now, user can manually uncheck "Map Specific Faces" to hide it. + // Or, if processing is started, it will also clear. + // Consider if mapFacesCheckbox should be set to true in Globals on backend now. + // The backend /submit_face_mappings sets Globals.map_faces = True. + // We should ensure the checkbox reflects this state if it's not already. + if (mapFacesCheckbox && !mapFacesCheckbox.checked && targetIdsWithSource.length > 0) { + // If user submitted mappings, but then unchecked "Map Faces" before submission finished, + // we might want to re-check it for them, or let sendSettings handle it. + // For simplicity, backend sets Globals.map_faces = true. UI should reflect this. + // mapFacesCheckbox.checked = true; // This might trigger its change event again. + // Better to let sendSettings in mapFacesCheckbox handler manage consistency. + } + if (targetIdsWithSource.length > 0) { + statusMessage.textContent = "Face mappings ready. You can now start processing or live preview with these mappings."; + } + + }) + .catch(error => { + console.error('Error submitting face mappings:', error); + if (faceMapperStatus) faceMapperStatus.textContent = `Error: ${error.message || 'Failed to submit mappings.'}`; + }); + }); + } + + + // Start Processing Logic + if (startProcessingButton) { + startProcessingButton.addEventListener('click', () => { + // When starting processing, clear any live feed from the preview area + if (processedPreviewImage) { + processedPreviewImage.src = "#"; // Clear src + processedPreviewImage.style.display = 'block'; // Or 'none' if you prefer to hide it + } + // Potentially call /stop_video_feed if live feed was active and using a global camera object that needs release + // For now, just clearing the src is the main action. + + statusMessage.textContent = 'Processing... Please wait.'; + statusMessage.textContent = 'Processing... Please wait.'; + if(outputArea) outputArea.style.display = 'none'; // Hide previous output + + // Ensure settings are sent before starting, or rely on them being up-to-date + // For simplicity, we assume settings are current from checkbox listeners. + // Alternatively, call sendSettings() here and chain the fetch. + + fetch('/start_processing', { + method: 'POST', + // No body needed if settings are read from Globals on backend + }) + .then(response => response.json()) + .then(data => { + if (data.error) { + console.error('Processing error:', data.error); + statusMessage.textContent = `Error: ${data.error}`; + if(outputArea) outputArea.style.display = 'none'; + } else { + console.log('Processing complete:', data); + statusMessage.textContent = 'Processing complete!'; + if (downloadLink && data.download_url) { + downloadLink.href = data.download_url; // Backend provides full URL for download + downloadLink.textContent = `Download ${data.output_filename || 'processed file'}`; + if(outputArea) outputArea.style.display = 'block'; + } else { + if(outputArea) outputArea.style.display = 'none'; + } + } + }) + .catch(error => { + console.error('Fetch error for start processing:', error); + statusMessage.textContent = 'Processing request failed. Check console.'; + if(outputArea) outputArea.style.display = 'none'; + }); + }); + } + + // Live Preview Logic + if (livePreviewButton && processedPreviewImage) { + let isLiveFeedActive = false; // State to toggle button + + livePreviewButton.addEventListener('click', () => { + if (!isLiveFeedActive) { + processedPreviewImage.src = '/video_feed'; + processedPreviewImage.style.display = 'block'; // Make sure it's visible + statusMessage.textContent = 'Live feed started. Navigate away or click "Stop Live Feed" to stop.'; + livePreviewButton.textContent = 'Stop Live Feed'; + isLiveFeedActive = true; + if(outputArea) outputArea.style.display = 'none'; // Hide download area + } else { + // Stop the feed + processedPreviewImage.src = '#'; // Clear the image source + // Optionally, set a placeholder: processedPreviewImage.src = "placeholder.jpg"; + statusMessage.textContent = 'Live feed stopped.'; + livePreviewButton.textContent = 'Live Preview'; + isLiveFeedActive = false; + + // Inform the backend to release the camera, if the backend supports it + // This is important if the camera is a shared global resource on the server. + fetch('/stop_video_feed', { method: 'POST' }) + .then(response => response.json()) + .then(data => console.log('Stop video feed response:', data)) + .catch(error => console.error('Error stopping video feed:', error)); + } + }); + } +}); diff --git a/static/style.css b/static/style.css new file mode 100644 index 0000000..5462245 --- /dev/null +++ b/static/style.css @@ -0,0 +1,10 @@ +body { font-family: sans-serif; margin: 20px; background-color: #f4f4f4; color: #333; } +h1, h2 { color: #2c3e50; } +.container { display: flex; margin-bottom: 20px; background-color: #fff; padding: 15px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } +.column { flex: 1; padding: 10px; } +#options-column label { display: block; margin-bottom: 8px; } +button { padding: 10px 15px; background-color: #3498db; color: white; border: none; border-radius: 4px; cursor: pointer; margin-right: 10px; } +button:hover { background-color: #2980b9; } +input[type="file"] { margin-bottom: 10px; } +#status-area { margin-top: 20px; padding: 10px; background-color: #e9ecef; border-radius: 4px; } +#main-preview-area img { display: block; margin-top: 10px; } diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..7d44230 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,69 @@ + + + + + + Deep-Live-Cam Web UI + + + +

Deep-Live-Cam - Web Interface

+ +
+
+

Source Face

+ + +
+ +
+

Target Media

+ + + +
+
+ +
+
+

Options

+
+
+
+
+
+ +
+ +
+

Actions

+ + +
+
+ +
+

Live/Processed Preview

+ Preview Area +
+ +
+

Status: Idle

+
+ + + + + + + + diff --git a/webapp.py b/webapp.py new file mode 100644 index 0000000..749da3c --- /dev/null +++ b/webapp.py @@ -0,0 +1,525 @@ +import os +import cv2 +import shutil +import time +import base64 +import json # For parsing target_ids_json +from flask import Flask, render_template, request, jsonify, send_from_directory, Response +from flask_cors import CORS +from werkzeug.utils import secure_filename +import modules.globals as Globals +import modules.core as core +from modules.utilities import normalize_output_path, get_temp_directory_path, is_image as util_is_image +from modules.face_analyser import get_one_face, get_many_faces, get_unique_faces_from_target_image, simplify_maps # Added simplify_maps +import modules.processors.frame.core as frame_processors_core + +VIDEO_CAMERA = None +target_path_web = None +prev_time = 0 +frame_count = 0 # For FPS calculation +current_fps = 0 # For FPS calculation + +# Attempt to load initial settings from a file if it exists +# This is a placeholder for more sophisticated settings management. +# For now, we rely on defaults in modules.globals or explicit setting via UI. +# if os.path.exists('switch_states.json'): +# try: +# with open('switch_states.json', 'r') as f: +# import json +# states = json.load(f) +# # Assuming states directly map to Globals attributes +# for key, value in states.items(): +# if hasattr(Globals, key): +# setattr(Globals, key, value) +# except Exception as e: +# print(f"Error loading switch_states.json: {e}") + + +app = Flask(__name__) +CORS(app) # Enable CORS for all routes + +UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads') +PROCESSED_OUTPUTS_FOLDER = os.path.join(os.getcwd(), 'processed_outputs') # Added +if not os.path.exists(UPLOAD_FOLDER): + os.makedirs(UPLOAD_FOLDER) +if not os.path.exists(PROCESSED_OUTPUTS_FOLDER): # Added + os.makedirs(PROCESSED_OUTPUTS_FOLDER) + +app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER +app.config['PROCESSED_OUTPUTS_FOLDER'] = PROCESSED_OUTPUTS_FOLDER # Added + +@app.route('/') +def index(): # Renamed from hello_world + return render_template('index.html') + +@app.route('/upload/source', methods=['POST']) +def upload_source(): + if 'file' not in request.files: + return jsonify({'error': 'No file part'}), 400 + file = request.files['file'] + if file.filename == '': + return jsonify({'error': 'No selected file'}), 400 + if file: + filename = secure_filename(file.filename) + filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) + file.save(filepath) + Globals.source_path = filepath + return jsonify({'message': 'Source uploaded', 'filepath': filepath}), 200 + +@app.route('/upload/target', methods=['POST']) +def upload_target(): + if 'file' not in request.files: + return jsonify({'error': 'No file part'}), 400 + file = request.files['file'] + if file.filename == '': + return jsonify({'error': 'No selected file'}), 400 + global target_path_web # Use the web-specific target path + if file: + filename = secure_filename(file.filename) + filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) + file.save(filepath) + Globals.target_path = filepath # This is for the core processing engine + target_path_web = filepath # This is for UI state, like triggering face mapping + # Provide a URL to the uploaded file for preview if desired, requires a new endpoint or serving 'uploads' statically + # For now, client-side preview is used. + return jsonify({'message': 'Target uploaded', 'filepath': filepath, 'file_url': f'/uploads/{filename}'}), 200 + + +@app.route('/uploads/') # Simple endpoint to serve uploaded files for preview +def uploaded_file(filename): + return send_from_directory(app.config['UPLOAD_FOLDER'], filename) + + +@app.route('/update_settings', methods=['POST']) +def update_settings(): + data = request.get_json() + if not data: + return jsonify({'error': 'No data provided'}), 400 + + # Update Globals based on received data + # Example: + if 'keep_fps' in data: + Globals.keep_fps = bool(data['keep_fps']) + if 'keep_audio' in data: + Globals.keep_audio = bool(data['keep_audio']) + if 'many_faces' in data: + Globals.many_faces = bool(data['many_faces']) + if 'mouth_mask' in data: # HTML ID is 'mouth-mask' + Globals.mouth_mask = bool(data['mouth_mask']) # Maps to Globals.mouth_mask + # Add more settings as they are defined in Globals and the UI + if 'frame_processors' in data: # Example for a more complex setting + Globals.frame_processors = data['frame_processors'] # Assuming it's a list of strings + + # A more generic way if keys match Globals attributes: + # for key, value in data.items(): + # if hasattr(Globals, key): + # # Be careful with types, e.g. ensuring booleans are booleans + # if isinstance(getattr(Globals, key, None), bool): + # setattr(Globals, key, bool(value)) + # else: + # setattr(Globals, key, value) + + return jsonify({'message': 'Settings updated'}), 200 + +@app.route('/start_processing', methods=['POST']) +def start_processing(): + if not Globals.source_path or not os.path.exists(Globals.source_path): + return jsonify({'error': 'Source path not set or invalid'}), 400 + if not Globals.target_path or not os.path.exists(Globals.target_path): + return jsonify({'error': 'Target path not set or invalid'}), 400 + + # Determine a unique output filename and set Globals.output_path + target_filename = os.path.basename(Globals.target_path) + filename, ext = os.path.splitext(target_filename) + unique_output_filename = f"{filename}_processed_{int(time.time())}{ext}" + Globals.output_path = os.path.join(app.config['PROCESSED_OUTPUTS_FOLDER'], unique_output_filename) + + # Ensure default frame processors are set if none are provided by the client + if not Globals.frame_processors: + Globals.frame_processors = ['face_swapper'] # Default to face_swapper + print("Warning: No frame processors selected by client, defaulting to 'face_swapper'.") + + try: + # Log current settings being used + print(f"Preparing to process with core engine. Source: {Globals.source_path}, Target: {Globals.target_path}, Output: {Globals.output_path}") + print(f"Options: Keep FPS: {Globals.keep_fps}, Keep Audio: {Globals.keep_audio}, Many Faces: {Globals.many_faces}") + print(f"Frame Processors: {Globals.frame_processors}") + # Ensure necessary resources are available and limited (e.g. memory) + # This was part of the old core.run() sequence. + # Consider if pre_check from core should be called here too, or if it's mainly for CLI + # For now, webapp assumes inputs are valid if they exist. + core.limit_resources() + + # Call the refactored core processing function + processing_result = core.process_media() + + if processing_result.get('success'): + final_output_path = processing_result.get('output_path', Globals.output_path) # Use path from result if available + # Ensure the unique_output_filename matches the actual output from process_media if it changed it + # For now, we assume process_media uses Globals.output_path as set above. + print(f"Core processing successful. Output at: {final_output_path}") + return jsonify({ + 'message': 'Processing complete', + 'output_filename': os.path.basename(final_output_path), + 'download_url': f'/get_output/{os.path.basename(final_output_path)}' + }) + else: + print(f"Core processing failed: {processing_result.get('error')}") + # If NSFW, include that info if process_media provides it + if processing_result.get('nsfw'): + return jsonify({'error': processing_result.get('error', 'NSFW content detected.'), 'nsfw': True}), 400 # Bad request due to content + return jsonify({'error': processing_result.get('error', 'Unknown error during processing')}), 500 + + except Exception as e: + # This is a fallback for unexpected errors not caught by core.process_media + print(f"An unexpected error occurred in /start_processing endpoint: {e}") + import traceback + traceback.print_exc() + return jsonify({'error': f'An critical unexpected error occurred: {str(e)}'}), 500 + finally: + # Always attempt to clean up temp files, regardless of success or failure + # core.cleanup_temp_files() takes no args now for webapp context (quit_app=False is default) + print("Executing cleanup of temporary files from webapp.") + core.cleanup_temp_files() + + +@app.route('/get_output/') +def get_output(filename): + return send_from_directory(app.config['PROCESSED_OUTPUTS_FOLDER'], filename, as_attachment=True) + + +if __name__ == '__main__': + # Initialize any necessary globals or configurations from core logic if needed + # For example, if core.parse_args() sets up initial globals from some defaults: + # import modules.core as main_core + # main_core.parse_args([]) # Pass empty list or appropriate defaults if it expects CLI args + + # For development, directly run the Flask app. + # For production, a WSGI server like Gunicorn would be used. + app.run(debug=True, host='0.0.0.0', port=5000) + + +# Video Feed Section +def generate_frames(): + global VIDEO_CAMERA + global VIDEO_CAMERA, prev_time, frame_count, current_fps + print("generate_frames: Attempting to open camera...") + + # Determine camera index (e.g., from Globals or default to 0) + camera_index = 0 # Or Globals.camera_index if you add such a setting + VIDEO_CAMERA = cv2.VideoCapture(camera_index) + + if not VIDEO_CAMERA.isOpened(): + print(f"Error: Could not open video camera at index {camera_index}.") + # TODO: Yield a placeholder image with an error message + return + + print("generate_frames: Camera opened. Initializing settings for live processing.") + prev_time = time.time() + frame_count = 0 + current_fps = 0 + + source_face = None + if Globals.source_path and not Globals.map_faces: # map_faces logic for live might be complex + try: + source_image_cv2 = cv2.imread(Globals.source_path) + if source_image_cv2 is not None: + source_face = get_one_face(source_image_cv2) + if source_face is None: + print("Warning: No face found in source image for live preview.") + except Exception as e: + print(f"Error loading source image for live preview: {e}") + + # Get frame processors + # Ensure Globals.frame_processors is a list. If it can be None, default to an empty list. + current_frame_processors = Globals.frame_processors if Globals.frame_processors is not None else [] + active_frame_processors = frame_processors_core.get_frame_processors_modules(current_frame_processors) + + # Example: Conditionally remove face enhancer if its toggle is off + # This assumes fp_ui structure; adjust if it's different or not used for live mode. + if not Globals.fp_ui.get('face_enhancer', False) and any(p.NAME == 'DLC.FACE-ENHANCER' for p in active_frame_processors): + active_frame_processors = [p for p in active_frame_processors if p.NAME != 'DLC.FACE-ENHANCER'] + print("Live Preview: Face Enhancer disabled by UI toggle.") + + + print(f"Live Preview: Active processors: {[p.NAME for p in active_frame_processors if hasattr(p, 'NAME')]}") + + try: + while VIDEO_CAMERA and VIDEO_CAMERA.isOpened(): # Check if VIDEO_CAMERA is not None + success, frame = VIDEO_CAMERA.read() + if not success: + print("Error: Failed to read frame from camera during live feed.") + break + + processed_frame = frame.copy() + + if Globals.live_mirror: + processed_frame = cv2.flip(processed_frame, 1) + + # Apply Processing + # Apply Processing + if Globals.map_faces: + if Globals.simple_map: # Check if mappings are submitted and processed + for processor in active_frame_processors: + if hasattr(processor, 'process_frame_v2') and callable(processor.process_frame_v2): + try: + processed_frame = processor.process_frame_v2(processed_frame) + except Exception as e: + print(f"Error applying mapped processor {processor.NAME if hasattr(processor, 'NAME') else 'Unknown'} in live feed: {e}") + cv2.putText(processed_frame, "Error in mapped processing", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) + # else: No v2 method, map_faces might not apply or needs different handling + else: # map_faces is true, but mappings not submitted/valid + cv2.putText(processed_frame, "Map Faces: Mappings not submitted or invalid.", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) + + elif source_face: # Not map_faces, but single source face is available + for processor in active_frame_processors: + try: + if hasattr(processor, 'process_frame') and callable(processor.process_frame): + if processor.NAME == 'DLC.FACE-ENHANCER': + processed_frame = processor.process_frame(None, processed_frame) + else: + processed_frame = processor.process_frame(source_face, processed_frame) + except Exception as e: + print(f"Error applying single source processor {processor.NAME if hasattr(processor, 'NAME') else 'Unknown'} in live feed: {e}") + + elif not Globals.source_path: # No map_faces and no single source image + cv2.putText(processed_frame, "No Source Image Selected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) + + # FPS Calculation & Overlay + if Globals.show_fps: + frame_count += 1 + now = time.time() + # Calculate FPS over a 1-second interval + if (now - prev_time) > 1: + current_fps = frame_count / (now - prev_time) + prev_time = now + frame_count = 0 + + cv2.putText(processed_frame, f"FPS: {current_fps:.2f}", (10, processed_frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) + + # Encode the processed_frame to JPEG + ret, buffer = cv2.imencode('.jpg', processed_frame) + if not ret: + print("Error: Failed to encode processed frame to JPEG.") + continue + + frame_bytes = buffer.tobytes() + + yield (b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n') + + except GeneratorExit: + print("generate_frames: Client disconnected.") + except Exception as e: + print(f"Exception in generate_frames main loop: {e}") + import traceback + traceback.print_exc() + finally: + print("generate_frames: Releasing camera.") + if VIDEO_CAMERA: + VIDEO_CAMERA.release() + VIDEO_CAMERA = None # Reset global camera object + + +@app.route('/video_feed') +def video_feed(): + print("Request received for /video_feed") + return Response(generate_frames(), + mimetype='multipart/x-mixed-replace; boundary=frame') + +# Optional: Endpoint to explicitly stop the camera if needed. +# This is tricky with a global VIDEO_CAMERA and HTTP's stateless nature. +# A more robust solution might involve websockets or a different camera management strategy. +@app.route('/stop_video_feed', methods=['POST']) +def stop_video_feed(): + global VIDEO_CAMERA + print("/stop_video_feed called") + if VIDEO_CAMERA: + print("Releasing video camera from /stop_video_feed") + VIDEO_CAMERA.release() + VIDEO_CAMERA = None + return jsonify({'message': 'Video feed stopped.'}) + return jsonify({'message': 'No active video feed to stop.'}) + +@app.route('/get_target_faces_for_mapping', methods=['GET']) +def get_target_faces_for_mapping_route(): + global target_path_web # Use the web-specific target path + if not target_path_web or not os.path.exists(target_path_web): + return jsonify({'error': 'Target image not uploaded or path is invalid.'}), 400 + + if not util_is_image(target_path_web): # Use the utility function for checking image type + return jsonify({'error': 'Target file is not a valid image for face mapping.'}), 400 + + try: + # This function will populate Globals.source_target_map + # It expects the target image path to be in Globals.target_path for its internal logic + # So, ensure Globals.target_path is also set to target_path_web for this call + # This is a bit of a workaround due to how get_unique_faces_from_target_image uses Globals + original_global_target_path = Globals.target_path + Globals.target_path = target_path_web + + get_unique_faces_from_target_image() # This should fill Globals.source_target_map + + # Restore original Globals.target_path if it was different (e.g. from a previous full processing run) + # For web UI flow, target_path_web and Globals.target_path will typically be the same after an upload. + Globals.target_path = original_global_target_path + + if not Globals.source_target_map: + return jsonify({'error': 'No faces found in the target image or error during analysis.'}), 404 + + response_data = [] + for item in Globals.source_target_map: + target_cv2_img = item['target']['cv2'] + if target_cv2_img is None: # Should not happen if map is populated correctly + continue + + _, buffer = cv2.imencode('.jpg', target_cv2_img) + b64_img = base64.b64encode(buffer).decode('utf-8') + response_data.append({'id': item['id'], 'image_b64': b64_img}) + + return jsonify(response_data) + + except Exception as e: + print(f"Error in /get_target_faces_for_mapping: {e}") + import traceback + traceback.print_exc() + return jsonify({'error': f'An unexpected error occurred: {str(e)}'}), 500 + +@app.route('/submit_face_mappings', methods=['POST']) +def submit_face_mappings_route(): + if 'target_ids_json' not in request.form: + return jsonify({'error': 'No target_ids_json provided.'}), 400 + + try: + target_ids = json.loads(request.form['target_ids_json']) + except json.JSONDecodeError: + return jsonify({'error': 'Invalid JSON in target_ids_json.'}), 400 + + if not Globals.source_target_map: + # This implies /get_target_faces_for_mapping was not called or failed. + # Or, it could be cleared. Re-populate it if target_path_web is available. + if target_path_web and os.path.exists(target_path_web) and util_is_image(target_path_web): + print("Re-populating source_target_map as it was empty during submit.") + original_global_target_path = Globals.target_path + Globals.target_path = target_path_web + get_unique_faces_from_target_image() + Globals.target_path = original_global_target_path + if not Globals.source_target_map: + return jsonify({'error': 'Could not re-initialize target faces. Please re-upload target image.'}), 500 + else: + return jsonify({'error': 'Target face map not initialized. Please upload target image again.'}), 500 + + + all_mappings_valid = True + processed_ids = set() + + for target_id_str in target_ids: + target_id = int(target_id_str) # Ensure it's an integer if IDs are integers + file_key = f'source_file_{target_id}' + + if file_key not in request.files: + print(f"Warning: Source file for target_id {target_id} not found in submission.") + # Mark this mapping as invalid or skip? For now, we require all submitted IDs to have files. + # If a file is optional for a target, client should not include its ID in target_ids_json. + # However, Globals.source_target_map will still have this target. We just won't assign a source to it. + continue + + source_file = request.files[file_key] + if source_file.filename == '': + print(f"Warning: Empty filename for source file for target_id {target_id}.") + continue # Skip if no file was actually selected for this input + + # Save the uploaded source file temporarily for this mapping + temp_source_filename = f"temp_source_for_target_{target_id}_{secure_filename(source_file.filename)}" + temp_source_filepath = os.path.join(app.config['UPLOAD_FOLDER'], temp_source_filename) + source_file.save(temp_source_filepath) + + source_cv2_img = cv2.imread(temp_source_filepath) + if source_cv2_img is None: + print(f"Error: Could not read saved source image for target_id {target_id} from {temp_source_filepath}") + # all_mappings_valid = False # Decide if one bad source fails all + # os.remove(temp_source_filepath) # Clean up + continue # Skip this mapping + + source_face_obj = get_one_face(source_cv2_img) # This also returns the cropped face usually + + if source_face_obj: + map_entry_found = False + for map_item in Globals.source_target_map: + if str(map_item['id']) == str(target_id): # Compare as strings or ensure IDs are consistent type + # The 'face' from get_one_face is the full Face object. + # The 'cv2' image from get_one_face is the cropped face. + # We need to store both, similar to how the original UI might have done. + # Let's assume get_one_face returns a tuple (Face_object, cropped_cv2_image) + # or that Face_object itself contains the cropped image if needed later. + # For now, storing the Face object which includes embedding and bbox. + # The cropped image can be re-derived or stored if `get_one_face` provides it. + # Let's assume `get_one_face` is just the Face object for simplicity here, + # and the cropped image for `source_target_map` needs to be handled. + # A better `get_one_face` might return a dict {'face': Face, 'cv2': cropped_img} + + # Simplified: get_one_face returns the Face object, and we'll use that. + # The `ui.update_popup_source` implies the map needs {'cv2': cropped_img, 'face': Face_obj} + # Let's assume `source_face_obj` is the Face object. We need its cropped image. + # This might require a helper or for get_one_face to return it. + # For now, we'll store the Face object. The cropped image part for source_target_map + # might need adjustment based on face_analyser's exact return for get_one_face. + # A common pattern is that the Face object itself has bbox, and you can crop from original using that. + + # Let's assume we need to manually crop based on the Face object from get_one_face + # This is a placeholder - exact cropping depends on what get_one_face returns and what processors need + # For now, we'll just store the Face object. + # If `face_swapper`'s `process_frame_v2` needs cropped source images in `source_target_map`, + # this part needs to ensure they are correctly populated. + # For simplicity, assuming `get_one_face` returns the main `Face` object, and `face_swapper` can use that. + # The `source_target_map` structure is critical. + # Looking at `face_swapper.py`, `process_frame_v2` uses `Globals.simple_map`. + # `simplify_maps()` populates `simple_map` from `source_target_map`. + # `simplify_maps()` expects `item['source']['face']` to be the source `Face` object. + + map_item['source'] = {'face': source_face_obj, 'cv2': source_cv2_img} # Store the original uploaded source, not necessarily cropped yet. Processors handle cropping. + map_entry_found = True + processed_ids.add(target_id) + break + + if not map_entry_found: + print(f"Warning: Target ID {target_id} from submission not found in existing map.") + all_mappings_valid = False # Or handle as error + else: + print(f"Warning: No face found in uploaded source for target_id {target_id}.") + # Mark this specific mapping as invalid by not adding a 'source' to it, or removing it. + # For now, we just don't add a source. simplify_maps should handle items without a source. + all_mappings_valid = False # if strict, one failed source makes all invalid for this submission batch + + # Clean up the temporary saved source file + if os.path.exists(temp_source_filepath): + os.remove(temp_source_filepath) + + # Clear 'source' for any target_ids that were in source_target_map but not in this submission + # or if their source file didn't yield a face. + for map_item in Globals.source_target_map: + if map_item['id'] not in processed_ids and 'source' in map_item: + del map_item['source'] + + + if not all_mappings_valid: # Or based on a stricter check + # simplify_maps() will still run and create mappings for valid pairs + print("simplify_maps: Some mappings may be invalid or incomplete.") + + simplify_maps() # Populate Globals.simple_map based on updated Globals.source_target_map + + # For debugging: + # print("Updated source_target_map:", Globals.source_target_map) + # print("Generated simple_map:", Globals.simple_map) + + if not Globals.simple_map and all_mappings_valid and target_ids: # If all submitted were meant to be valid but simple_map is empty + return jsonify({'error': 'Mappings processed, but no valid face pairs were established. Check source images.'}), 400 + + Globals.map_faces = True # Crucial: Set this global so processing functions know to use the map + return jsonify({'message': 'Face mappings submitted and processed.'}) + + # except Exception as e: + # print(f"Error in /submit_face_mappings: {e}") + # import traceback + # traceback.print_exc() + # return jsonify({'error': f'An unexpected error occurred: {str(e)}'}), 500