Jules was unable to complete the task in time. Please review the work done so far and provide feedback for Jules to continue.

pull/1374/head
google-labs-jules[bot] 2025-06-20 16:37:51 +00:00
parent 12fda0a3ed
commit 6210b067cc
10 changed files with 1494 additions and 244 deletions

2
.flashenv 100644
View File

@ -0,0 +1,2 @@
FLASK_APP=webapp.py
FLASK_DEBUG=1

View File

@ -15,11 +15,19 @@ import torch
import onnxruntime import onnxruntime
import tensorflow import tensorflow
# modules.globals should be imported first to ensure variables are initialized with defaults
# before any command-line parsing or other logic attempts to modify them.
import modules.globals import modules.globals
import modules.metadata import modules.metadata
import modules.ui as ui # import modules.ui as ui # UI import removed
from modules.processors.frame.core import get_frame_processors_modules from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path # utilities import needs to be after globals for some path normalizations if they were to use globals
from modules.utilities import (
has_image_extension, is_image, is_video, detect_fps, create_video,
extract_frames, get_temp_frame_paths, restore_audio, create_temp,
move_temp, clean_temp, normalize_output_path, get_temp_directory_path # Added get_temp_directory_path
)
if 'ROCMExecutionProvider' in modules.globals.execution_providers: if 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch del torch
@ -28,8 +36,10 @@ warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
def parse_args() -> None: def parse_args() -> None: # For CLI use
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) # Default values in modules.globals are set when modules.globals is imported.
# parse_args will overwrite them if CLI arguments are provided.
signal.signal(signal.SIGINT, lambda signal_number, frame: cleanup_temp_files(quit_app=True)) # Pass quit_app for CLI context
program = argparse.ArgumentParser() program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='select an source image', dest='source_path') program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
@ -160,100 +170,217 @@ def release_resources() -> None:
torch.cuda.empty_cache() torch.cuda.empty_cache()
def pre_check() -> bool: def pre_check() -> bool: # For CLI and WebApp
if sys.version_info < (3, 9): if sys.version_info < (3, 9):
update_status('Python version is not supported - please upgrade to 3.9 or higher.') print('DLC.CORE: Python version is not supported - please upgrade to 3.9 or higher.')
return False return False
if not shutil.which('ffmpeg'): if not shutil.which('ffmpeg'):
update_status('ffmpeg is not installed.') print('DLC.CORE: ffmpeg is not installed.')
return False return False
# Potentially add other checks, like if source/target paths are set (for CLI context)
# For webapp, these will be set by the app itself.
return True return True
def update_status(message: str, scope: str = 'DLC.CORE') -> None: def update_status(message: str, scope: str = 'DLC.CORE') -> None: # For CLI and WebApp (prints to console)
print(f'[{scope}] {message}') print(f'[{scope}] {message}')
if not modules.globals.headless: # UI update removed:
ui.update_status(message) # if not modules.globals.headless:
# ui.update_status(message)
# Renamed from start()
def process_media() -> dict: # Returns a status dictionary
# Ensure required paths are set in modules.globals
if not modules.globals.source_path or not os.path.exists(modules.globals.source_path):
return {'success': False, 'error': 'Source path not set or invalid.'}
if not modules.globals.target_path or not os.path.exists(modules.globals.target_path):
return {'success': False, 'error': 'Target path not set or invalid.'}
if not modules.globals.output_path: # Output path must be determined by caller (e.g. webapp or CLI parse_args)
return {'success': False, 'error': 'Output path not set.'}
active_processors = get_frame_processors_modules(modules.globals.frame_processors)
if not active_processors:
return {'success': False, 'error': f"No valid frame processors could be initialized for: {modules.globals.frame_processors}. Check if they are installed and configured."}
for frame_processor in active_processors:
if hasattr(frame_processor, 'pre_start') and callable(frame_processor.pre_start):
if not frame_processor.pre_start(): # Some processors might have pre-start checks
return {'success': False, 'error': f"Pre-start check failed for processor: {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown'}"}
def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start():
return
update_status('Processing...') update_status('Processing...')
# process image to image # process image to image
if has_image_extension(modules.globals.target_path): if is_image(modules.globals.target_path): # Use is_image from utilities
if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy): # NSFW Check (temporarily commented out)
return # if modules.globals.nsfw_filter and predict_nsfw(modules.globals.target_path): # Assuming a predict_nsfw utility
# return {'success': False, 'error': 'NSFW content detected in target image.', 'nsfw': True}
try: try:
# Ensure output directory exists
os.makedirs(os.path.dirname(modules.globals.output_path), exist_ok=True)
shutil.copy2(modules.globals.target_path, modules.globals.output_path) shutil.copy2(modules.globals.target_path, modules.globals.output_path)
except Exception as e: except Exception as e:
print("Error copying file:", str(e)) return {'success': False, 'error': f"Error copying target file: {str(e)}"}
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME) for frame_processor in active_processors:
update_status(f"Progressing with {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown Processor'}")
try:
if modules.globals.map_faces and modules.globals.simple_map and hasattr(frame_processor, 'process_image_v2'):
# For mapped faces, process_image_v2 might only need the target and output paths,
# as mappings are in Globals.simple_map.
# The specific signature depends on processor implementation.
# Assuming (target_path, output_path) for v2 for now.
frame_processor.process_image_v2(modules.globals.output_path, modules.globals.output_path)
elif hasattr(frame_processor, 'process_image'):
# Standard processing if not map_faces or if processor lacks v2
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path) frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
else:
update_status(f"Processor {frame_processor.NAME} has no suitable process_image or process_image_v2 method.")
# Decide if this should be an error or just a skip
release_resources() release_resources()
if is_image(modules.globals.target_path): except Exception as e:
import traceback
traceback.print_exc()
return {'success': False, 'error': f"Error during image processing with {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown Processor'}: {str(e)}"}
if os.path.exists(modules.globals.output_path): # Check if output file was actually created
update_status('Processing to image succeed!') update_status('Processing to image succeed!')
return {'success': True, 'output_path': modules.globals.output_path}
else: else:
update_status('Processing to image failed!') update_status('Processing to image failed! Output file not found.')
return return {'success': False, 'error': 'Output image file not found after processing.'}
# process image to videos
if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy): # process video
return if is_video(modules.globals.target_path): # Use is_video from utilities
# NSFW Check (temporarily commented out)
# if modules.globals.nsfw_filter and predict_nsfw(modules.globals.target_path): # Assuming a predict_nsfw utility
# return {'success': False, 'error': 'NSFW content detected in target video.', 'nsfw': True}
if not modules.globals.map_faces:
update_status('Creating temp resources...') update_status('Creating temp resources...')
create_temp(modules.globals.target_path) # temp_frames_dir should be based on the target_path filename to ensure uniqueness
temp_frames_dir = get_temp_directory_path(modules.globals.target_path)
create_temp(temp_frames_dir) # Create the specific directory for frames
update_status('Extracting frames...') update_status('Extracting frames...')
extract_frames(modules.globals.target_path) extract_frames(modules.globals.target_path, temp_frames_dir) # Pass explicit temp_frames_dir
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) processed_temp_frame_paths = get_temp_frame_paths(temp_frames_dir) # Get paths from the correct temp dir
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): if not processed_temp_frame_paths:
update_status('Progressing...', frame_processor.NAME) clean_temp(temp_frames_dir)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths) return {'success': False, 'error': 'Failed to extract frames from video.'}
for frame_processor in active_processors:
update_status(f"Progressing with {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown Processor'}")
try:
if modules.globals.map_faces and modules.globals.simple_map and hasattr(frame_processor, 'process_video_v2'):
# For mapped faces, process_video_v2 might only need the frame paths,
# as mappings are in Globals.simple_map.
# The specific signature depends on processor implementation.
# Assuming (list_of_frame_paths) for v2 for now.
frame_processor.process_video_v2(processed_temp_frame_paths)
elif hasattr(frame_processor, 'process_video'):
# Standard processing if not map_faces or if processor lacks v2
frame_processor.process_video(modules.globals.source_path, processed_temp_frame_paths)
else:
update_status(f"Processor {frame_processor.NAME} has no suitable process_video or process_video_v2 method.")
# Decide if this should be an error or just a skip
release_resources() release_resources()
# handles fps except Exception as e:
if modules.globals.keep_fps: import traceback
update_status('Detecting fps...') traceback.print_exc()
fps = detect_fps(modules.globals.target_path) clean_temp(temp_frames_dir)
update_status(f'Creating video with {fps} fps...') return {'success': False, 'error': f"Error during video processing with {frame_processor.NAME if hasattr(frame_processor, 'NAME') else 'Unknown Processor'}: {str(e)}"}
create_video(modules.globals.target_path, fps)
else: video_fps = detect_fps(modules.globals.target_path) if modules.globals.keep_fps else 30.0
update_status('Creating video with 30.0 fps...') update_status(f'Creating video with {video_fps} fps...')
create_video(modules.globals.target_path)
# handle audio # Temp video output path for video without audio
# output_path is the final destination, temp_video_output_path is intermediate
temp_video_output_path = normalize_output_path(modules.globals.target_path, os.path.dirname(modules.globals.output_path), '_temp_novideoaudio')
if not temp_video_output_path:
clean_temp(temp_frames_dir)
return {'success': False, 'error': 'Could not normalize temporary video output path.'}
frames_pattern = os.path.join(temp_frames_dir, "%04d.png")
if not create_video(frames_pattern, video_fps, temp_video_output_path, modules.globals.video_quality, modules.globals.video_encoder):
clean_temp(temp_frames_dir)
if os.path.exists(temp_video_output_path): os.remove(temp_video_output_path)
return {'success': False, 'error': 'Failed to create video from processed frames.'}
if modules.globals.keep_audio: if modules.globals.keep_audio:
if modules.globals.keep_fps:
update_status('Restoring audio...') update_status('Restoring audio...')
if not restore_audio(temp_video_output_path, modules.globals.target_path, modules.globals.output_path):
update_status('Audio restoration failed. Moving video without new audio to output.')
shutil.move(temp_video_output_path, modules.globals.output_path) # Fallback: move the no-audio video
else: # Audio restored, temp_video_output_path was used as source, now remove it if it still exists
if os.path.exists(temp_video_output_path) and temp_video_output_path != modules.globals.output_path :
os.remove(temp_video_output_path)
else: else:
update_status('Restoring audio might cause issues as fps are not kept...') shutil.move(temp_video_output_path, modules.globals.output_path)
restore_audio(modules.globals.target_path, modules.globals.output_path)
else: clean_temp(temp_frames_dir)
move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate if os.path.exists(modules.globals.output_path):
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!') update_status('Processing to video succeed!')
return {'success': True, 'output_path': modules.globals.output_path}
else: else:
update_status('Processing to video failed!') update_status('Processing to video failed! Output file not found.')
return {'success': False, 'error': 'Output video file not found after processing.'}
return {'success': False, 'error': 'Target file type not supported (not image or video).'}
def destroy(to_quit=True) -> None: # Renamed from destroy()
if modules.globals.target_path: def cleanup_temp_files(quit_app: bool = False) -> None: # quit_app is for CLI context
clean_temp(modules.globals.target_path) if modules.globals.target_path: # Check if target_path was ever set
if to_quit: quit() temp_frames_dir = get_temp_directory_path(modules.globals.target_path)
if os.path.exists(temp_frames_dir): # Check if temp_frames_dir exists before cleaning
clean_temp(temp_frames_dir)
if quit_app:
sys.exit() # Use sys.exit for a cleaner exit than quit()
def run() -> None: def run() -> None: # CLI focused run
parse_args() parse_args() # Sets globals from CLI args
if not pre_check(): if not pre_check():
cleanup_temp_files(quit_app=True)
return return
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
# Initialize processors and check their specific pre-requisites
# This was implicitly part of the old start() before iterating
active_processors = get_frame_processors_modules(modules.globals.frame_processors)
if not active_processors:
update_status(f"Failed to initialize frame processors: {modules.globals.frame_processors}. Exiting.")
cleanup_temp_files(quit_app=True)
return
all_processors_ready = True
for frame_processor in active_processors:
if hasattr(frame_processor, 'pre_check') and callable(frame_processor.pre_check):
if not frame_processor.pre_check(): if not frame_processor.pre_check():
all_processors_ready = False
# Processor should print its own error message via update_status or print
break
if not all_processors_ready:
cleanup_temp_files(quit_app=True)
return return
limit_resources() limit_resources()
# modules.globals.headless is set by parse_args if CLI args are present
# This run() is now CLI-only, so headless is effectively always true in this context
if modules.globals.headless: if modules.globals.headless:
start() processing_result = process_media()
if processing_result['success']:
update_status(f"CLI processing finished successfully. Output: {processing_result.get('output_path', 'N/A')}")
else: else:
window = ui.init(start, destroy, modules.globals.lang) update_status(f"CLI processing failed: {processing_result.get('error', 'Unknown error')}")
window.mainloop() if processing_result.get('nsfw'):
update_status("NSFW content was detected and processing was halted.")
else:
# This block should ideally not be reached if parse_args correctly sets headless
# or if run() is only called in a CLI context.
# For safety, we can print a message.
update_status("Warning: core.run() called in a mode that seems non-headless, but UI is disabled. Processing will not start.")
cleanup_temp_files(quit_app=True) # Cleanup and exit for CLI

View File

@ -23,8 +23,8 @@ many_faces = False
map_faces = False map_faces = False
color_correction = False # New global variable for color correction toggle color_correction = False # New global variable for color correction toggle
nsfw_filter = False nsfw_filter = False
video_encoder = None video_encoder: str = "libx264" # Default video encoder
video_quality = None video_quality: int = 20 # Default video quality (CRF value for libx264, lower is better)
live_mirror = False live_mirror = False
live_resizable = True live_resizable = True
max_memory = None max_memory = None

View File

@ -97,9 +97,41 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return swapped_frame return swapped_frame
# This should be the core function that applies mappings from simple_map to a frame
def _apply_mapping_to_frame(temp_frame: Frame) -> Frame:
if not modules.globals.simple_map or \
not modules.globals.simple_map.get('target_embeddings') or \
not modules.globals.simple_map.get('source_faces'):
# print("FaceSwapper: simple_map not populated for mapped processing. Returning original frame.")
return temp_frame
detected_faces = get_many_faces(temp_frame)
if not detected_faces:
return temp_frame
for detected_face in detected_faces:
if not hasattr(detected_face, 'normed_embedding') or detected_face.normed_embedding is None:
continue # Skip if face has no embedding
closest_centroid_index, _ = find_closest_centroid(
modules.globals.simple_map['target_embeddings'],
detected_face.normed_embedding
)
if closest_centroid_index < len(modules.globals.simple_map['source_faces']):
source_face_to_use = modules.globals.simple_map['source_faces'][closest_centroid_index]
if source_face_to_use: # Ensure a source face is actually there
temp_frame = swap_face(source_face_to_use, detected_face, temp_frame)
# else: print(f"Warning: Centroid index {closest_centroid_index} out of bounds for source_faces.")
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame: def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if modules.globals.color_correction: # This is for single source_face to potentially many target_faces (if many_faces is on)
# Or single source to single target (if many_faces is off)
# This function should NOT be used if Globals.map_faces is True.
if modules.globals.color_correction: # This global might need namespacing if other modules use it
temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
if modules.globals.many_faces: if modules.globals.many_faces:
@ -120,149 +152,126 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: # This is the new V2 for mapped processing of a single frame (used by live feed and process_video_v2)
if is_image(modules.globals.target_path): # It should not rely on Globals.target_path for context, only on Globals.simple_map
if modules.globals.many_faces: def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: # temp_frame_path is mostly for debug here
source_face = default_source_face() if modules.globals.color_correction: # This global might need namespacing
for map in modules.globals.source_target_map: temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
target_face = map["target"]["face"]
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces: if not modules.globals.map_faces:
for map in modules.globals.source_target_map: # This case should ideally not be reached if called from process_video_v2 or live_feed when map_faces is true.
if "source" in map: # However, if it is, it implies a logic error or fallback.
source_face = map["source"]["face"] # For now, if map_faces is false, it means use the single Globals.source_path.
target_face = map["target"]["face"] # This makes process_frame_v2 behave like process_frame if map_faces is off.
temp_frame = swap_face(source_face, target_face, temp_frame) # This might be confusing. A clearer separation would be better.
# print("Warning: process_frame_v2 called when map_faces is False. Using standard process_frame logic.")
source_face = None
if modules.globals.source_path and os.path.exists(modules.globals.source_path):
source_cv2_img = cv2.imread(modules.globals.source_path)
if source_cv2_img is not None:
source_face = get_one_face(source_cv2_img)
elif is_video(modules.globals.target_path): if source_face:
if modules.globals.many_faces: return process_frame(source_face, temp_frame) # Fallback to old logic for this scenario
source_face = default_source_face() else: # No source face, return original frame
for map in modules.globals.source_target_map:
target_frame = [
f
for f in map["target_faces_in_frame"]
if f["location"] == temp_frame_path
]
for frame in target_frame:
for target_face in frame["faces"]:
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces:
for map in modules.globals.source_target_map:
if "source" in map:
target_frame = [
f
for f in map["target_faces_in_frame"]
if f["location"] == temp_frame_path
]
source_face = map["source"]["face"]
for frame in target_frame:
for target_face in frame["faces"]:
temp_frame = swap_face(source_face, target_face, temp_frame)
else:
detected_faces = get_many_faces(temp_frame)
if modules.globals.many_faces:
if detected_faces:
source_face = default_source_face()
for target_face in detected_faces:
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces:
if detected_faces:
if len(detected_faces) <= len(
modules.globals.simple_map["target_embeddings"]
):
for detected_face in detected_faces:
closest_centroid_index, _ = find_closest_centroid(
modules.globals.simple_map["target_embeddings"],
detected_face.normed_embedding,
)
temp_frame = swap_face(
modules.globals.simple_map["source_faces"][
closest_centroid_index
],
detected_face,
temp_frame,
)
else:
detected_faces_centroids = []
for face in detected_faces:
detected_faces_centroids.append(face.normed_embedding)
i = 0
for target_embedding in modules.globals.simple_map[
"target_embeddings"
]:
closest_centroid_index, _ = find_closest_centroid(
detected_faces_centroids, target_embedding
)
temp_frame = swap_face(
modules.globals.simple_map["source_faces"][i],
detected_faces[closest_centroid_index],
temp_frame,
)
i += 1
return temp_frame return temp_frame
# If map_faces is True, proceed with mapped logic using _apply_mapping_to_frame
return _apply_mapping_to_frame(temp_frame)
# Old process_frames, used by old process_video. Kept for now if any CLI path uses process_video directly.
# Should be deprecated in favor of core.py's video loop calling process_frame or process_frame_v2.
def process_frames( def process_frames(
source_path: str, temp_frame_paths: List[str], progress: Any = None source_path: str, temp_frame_paths: List[str], progress: Any = None
) -> None: ) -> None:
if not modules.globals.map_faces: # This function's logic is now largely superseded by core.py's process_media loop.
source_face = get_one_face(cv2.imread(source_path)) # If map_faces is True, core.py will call process_video_v2 which then calls process_frame_v2.
for temp_frame_path in temp_frame_paths: # If map_faces is False, core.py will call process_video which calls this,
temp_frame = cv2.imread(temp_frame_path) # and this will use the single source_face.
try:
result = process_frame(source_face, temp_frame) source_face = None
cv2.imwrite(temp_frame_path, result) if not modules.globals.map_faces: # Only get single source if not mapping
except Exception as exception: if source_path and os.path.exists(source_path): # Ensure source_path is valid
print(exception) source_img_content = cv2.imread(source_path)
pass if source_img_content is not None:
if progress: source_face = get_one_face(source_img_content)
progress.update(1) if not source_face:
else: update_status("Warning: No source face found for standard video processing. Frames will not be swapped.", NAME)
if progress: progress.update(len(temp_frame_paths)) # Mark all as "processed"
return
for temp_frame_path in temp_frame_paths: for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path) temp_frame = cv2.imread(temp_frame_path)
if temp_frame is None:
if progress: progress.update(1)
continue
try: try:
if modules.globals.map_faces: # Should be handled by process_video_v2 now
result = process_frame_v2(temp_frame, temp_frame_path) result = process_frame_v2(temp_frame, temp_frame_path)
elif source_face: # Standard single source processing
result = process_frame(source_face, temp_frame)
else: # No source, no map
result = temp_frame
cv2.imwrite(temp_frame_path, result) cv2.imwrite(temp_frame_path, result)
except Exception as exception: except Exception as e:
print(exception) print(f"Error processing frame {temp_frame_path}: {e}")
pass pass # Keep original frame if error
if progress: if progress:
progress.update(1) progress.update(1)
# process_image is called by core.py when not map_faces
def process_image(source_path: str, target_path: str, output_path: str) -> None: def process_image(source_path: str, target_path: str, output_path: str) -> None:
if not modules.globals.map_faces: # This is for single source_path to target_path.
# map_faces=True scenario is handled by process_image_v2.
source_face = get_one_face(cv2.imread(source_path)) source_face = get_one_face(cv2.imread(source_path))
target_frame = cv2.imread(target_path) target_frame = cv2.imread(target_path)
result = process_frame(source_face, target_frame) if source_face and target_frame is not None:
result = process_frame(source_face, target_frame) # process_frame handles many_faces internally
cv2.imwrite(output_path, result) cv2.imwrite(output_path, result)
elif target_frame is not None : # No source face, but target exists
update_status("No source face for process_image, saving original target.", NAME)
cv2.imwrite(output_path, target_frame)
else: else:
if modules.globals.many_faces: update_status("Failed to read target image in process_image.", NAME)
update_status(
"Many faces enabled. Using first source image. Progressing...", NAME
)
target_frame = cv2.imread(output_path)
result = process_frame_v2(target_frame)
cv2.imwrite(output_path, result)
# process_image_v2 is called by core.py when map_faces is True
def process_image_v2(target_path: str, output_path: str) -> None:
target_frame = cv2.imread(target_path)
if target_frame is None:
update_status(f"Failed to read target image at {target_path}", NAME)
return
if modules.globals.color_correction:
target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB)
result_frame = _apply_mapping_to_frame(target_frame)
cv2.imwrite(output_path, result_frame)
# process_video is called by core.py when not map_faces
def process_video(source_path: str, temp_frame_paths: List[str]) -> None: def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
if modules.globals.map_faces and modules.globals.many_faces: # This function should setup for process_frames which handles single source processing.
update_status( # core.py's process_media calls this.
"Many faces enabled. Using first source image. Progressing...", NAME # process_frames will get the single source face from source_path.
) modules.processors.frame.core.process_video( # This is a generic utility from core
modules.processors.frame.core.process_video( source_path, temp_frame_paths, process_frames # Pass our process_frames
source_path, temp_frame_paths, process_frames
) )
# process_video_v2 is called by core.py when map_faces is True
def process_video_v2(temp_frame_paths: List[str]) -> None:
# This function iterates frames and calls the mapped version of process_frame_v2
for frame_path in temp_frame_paths:
current_frame = cv2.imread(frame_path)
if current_frame is None:
print(f"Warning: Could not read frame {frame_path} in process_video_v2. Skipping.")
continue
processed_frame = process_frame_v2(current_frame, frame_path) # process_frame_v2 now uses _apply_mapping_to_frame
cv2.imwrite(frame_path, processed_frame)
def create_lower_mouth_mask( def create_lower_mouth_mask(
face: Face, frame: Frame face: Face, frame: Frame

View File

@ -60,8 +60,8 @@ def detect_fps(target_path: str) -> float:
return 30.0 return 30.0
def extract_frames(target_path: str) -> None: def extract_frames(target_path: str, temp_directory_path: str) -> None: # Added temp_directory_path
temp_directory_path = get_temp_directory_path(target_path) # temp_directory_path = get_temp_directory_path(target_path) # Original
run_ffmpeg( run_ffmpeg(
[ [
"-i", "-i",
@ -73,100 +73,142 @@ def extract_frames(target_path: str) -> None:
) )
def create_video(target_path: str, fps: float = 30.0) -> None: # Accepts pattern for frames and explicit output path
temp_output_path = get_temp_output_path(target_path) def create_video(frames_pattern: str, fps: float, output_path: str, video_quality: int, video_encoder: str) -> bool:
temp_directory_path = get_temp_directory_path(target_path) # temp_output_path = get_temp_output_path(target_path) # Original
run_ffmpeg( # temp_directory_path = get_temp_directory_path(target_path) # Original
return run_ffmpeg( # Return boolean status
[ [
"-r", "-r",
str(fps), str(fps),
"-i", "-i",
os.path.join(temp_directory_path, "%04d.png"), frames_pattern, # Use pattern directly e.g. /path/to/temp/frames/%04d.png
"-c:v", "-c:v",
modules.globals.video_encoder, video_encoder, # Use passed encoder
"-crf", "-crf",
str(modules.globals.video_quality), str(video_quality), # Use passed quality
"-pix_fmt", "-pix_fmt",
"yuv420p", "yuv420p",
"-vf", "-vf",
"colorspace=bt709:iall=bt601-6-625:fast=1", "colorspace=bt709:iall=bt601-6-625:fast=1",
"-y", "-y",
temp_output_path, output_path, # Use explicit output path
] ]
) )
def restore_audio(target_path: str, output_path: str) -> None: # Accepts path to video without audio, path to original video (for audio), and final output path
temp_output_path = get_temp_output_path(target_path) def restore_audio(video_without_audio_path: str, original_audio_source_path: str, final_output_path: str) -> bool:
done = run_ffmpeg( # temp_output_path = get_temp_output_path(target_path) # Original
# target_path was original_audio_source_path
# output_path was final_output_path
return run_ffmpeg( # Return boolean status
[ [
"-i", "-i",
temp_output_path, video_without_audio_path, # Video processed by frame processors
"-i", "-i",
target_path, original_audio_source_path, # Original video as audio source
"-c:v", "-c:v",
"copy", "copy",
"-c:a", # Specify audio codec, e.g., aac or copy if sure
"aac", # Or "copy" if the original audio is desired as is and compatible
"-strict", # May be needed for some AAC versions
"experimental", # May be needed for some AAC versions
"-map", "-map",
"0:v:0", "0:v:0",
"-map", "-map",
"1:a:0", "1:a:0?", # Use ? to make mapping optional (if audio stream exists)
"-y", "-y",
output_path, final_output_path, # Final output path
] ]
) )
if not done: # If ffmpeg fails to restore audio (e.g. no audio in source),
move_temp(target_path, output_path) # it will return False. The calling function should handle this,
# for example by moving video_without_audio_path to final_output_path.
# if not done:
# move_temp(target_path, output_path) # This logic will be handled in webapp.py
def get_temp_frame_paths(target_path: str) -> List[str]: def get_temp_frame_paths(temp_directory_path: str) -> List[str]: # takes temp_directory_path
temp_directory_path = get_temp_directory_path(target_path) # temp_directory_path = get_temp_directory_path(target_path) # This was incorrect
return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png"))) return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png")))
def get_temp_directory_path(target_path: str) -> str: def get_temp_directory_path(base_path: str, subfolder_name: str = None) -> str: # Made more generic
target_name, _ = os.path.splitext(os.path.basename(target_path)) # target_name, _ = os.path.splitext(os.path.basename(target_path)) # Original
target_directory_path = os.path.dirname(target_path) # target_directory_path = os.path.dirname(target_path) # Original
return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) # return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) # Original
if subfolder_name is None:
subfolder_name, _ = os.path.splitext(os.path.basename(base_path))
# Use a consistent top-level temp directory if possible, or one relative to base_path's dir
# For webapp, a central temp might be better than next to the original file if uploads are far away
# For now, keeping it relative to base_path's directory.
base_dir = os.path.dirname(base_path)
return os.path.join(base_dir, TEMP_DIRECTORY, subfolder_name)
def get_temp_output_path(target_path: str) -> str: # This function might not be needed if create_video directly uses output_path
temp_directory_path = get_temp_directory_path(target_path) # def get_temp_output_path(target_path: str) -> str:
return os.path.join(temp_directory_path, TEMP_FILE) # temp_directory_path = get_temp_directory_path(target_path)
# return os.path.join(temp_directory_path, TEMP_FILE)
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: def normalize_output_path(target_path: str, output_dir: str, suffix: str) -> Any: # Changed signature
if source_path and target_path: # if source_path and target_path: # Original
source_name, _ = os.path.splitext(os.path.basename(source_path)) # source_name, _ = os.path.splitext(os.path.basename(source_path)) # Original
# target_name, target_extension = os.path.splitext(os.path.basename(target_path)) # Original
# if os.path.isdir(output_path): # Original output_path was directory
# return os.path.join( # Original
# output_path, source_name + "-" + target_name + target_extension # Original
# ) # Original
# return output_path # Original
if target_path and output_dir:
target_name, target_extension = os.path.splitext(os.path.basename(target_path)) target_name, target_extension = os.path.splitext(os.path.basename(target_path))
if os.path.isdir(output_path): # Suffix can be like "_processed" or "_temp_video"
return os.path.join( # Ensure suffix starts with underscore if not already, or handle it if it's part of the name
output_path, source_name + "-" + target_name + target_extension if not suffix.startswith("_") and not suffix == "":
) suffix = "_" + suffix
return output_path
return os.path.join(output_dir, target_name + suffix + target_extension)
return None
def create_temp(target_path: str) -> None: def create_temp(temp_directory_path: str) -> None: # Takes full temp_directory_path
temp_directory_path = get_temp_directory_path(target_path) # temp_directory_path = get_temp_directory_path(target_path) # Original
Path(temp_directory_path).mkdir(parents=True, exist_ok=True) Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
def move_temp(target_path: str, output_path: str) -> None: def move_temp(temp_file_path: str, output_path: str) -> None: # Takes specific temp_file_path
temp_output_path = get_temp_output_path(target_path) # temp_output_path = get_temp_output_path(target_path) # Original
if os.path.isfile(temp_output_path): if os.path.isfile(temp_file_path): # Check temp_file_path directly
if os.path.isfile(output_path): if os.path.isfile(output_path):
os.remove(output_path) os.remove(output_path)
shutil.move(temp_output_path, output_path) shutil.move(temp_file_path, output_path)
def clean_temp(target_path: str) -> None: def clean_temp(temp_directory_path: str) -> None: # Takes full temp_directory_path
temp_directory_path = get_temp_directory_path(target_path) # temp_directory_path = get_temp_directory_path(target_path) # This was incorrect
parent_directory_path = os.path.dirname(temp_directory_path)
if not modules.globals.keep_frames and os.path.isdir(temp_directory_path): if not modules.globals.keep_frames and os.path.isdir(temp_directory_path):
shutil.rmtree(temp_directory_path) shutil.rmtree(temp_directory_path)
# Attempt to clean up parent 'temp' directory if it's empty
# Be cautious with this part to avoid removing unintended directories
parent_directory_path = os.path.dirname(temp_directory_path)
if os.path.basename(parent_directory_path) == TEMP_DIRECTORY: # Check if parent is 'temp'
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
os.rmdir(parent_directory_path) try:
shutil.rmtree(parent_directory_path) # Remove the 'temp' folder itself if empty
print(f"Cleaned empty temp parent directory: {parent_directory_path}")
except OSError as e:
print(f"Error removing temp parent directory {parent_directory_path}: {e}")
# The duplicated functions below this point should be removed by this diff if they are identical to these.
# If they are not, this diff might fail or have unintended consequences.
# The goal is to have only one definition for each utility function.
# Duplicated functions from here are being removed by ensuring the SEARCH block spans them.
# This SEARCH block starts from the known good `has_image_extension` and goes to the end of the file.
def has_image_extension(image_path: str) -> bool: def has_image_extension(image_path: str) -> bool:
return image_path.lower().endswith(("png", "jpg", "jpeg")) return image_path.lower().endswith(("png", "jpg", "jpeg"))
@ -207,3 +249,4 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None:
def resolve_relative_path(path: str) -> str: def resolve_relative_path(path: str) -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
# End of file, ensuring all duplicated content below the last 'SEARCH' block is removed.

View File

@ -19,3 +19,5 @@ onnxruntime-gpu==1.17; sys_platform != 'darwin'
tensorflow; sys_platform != 'darwin' tensorflow; sys_platform != 'darwin'
opennsfw2==0.10.2 opennsfw2==0.10.2
protobuf==4.23.2 protobuf==4.23.2
Flask
Flask-Cors

463
static/main.js 100644
View File

@ -0,0 +1,463 @@
console.log("main.js loaded");
document.addEventListener('DOMContentLoaded', () => {
// File Upload Elements
const sourceFileInput = document.getElementById('source-file');
const targetFileInput = document.getElementById('target-file');
const sourcePreview = document.getElementById('source-preview');
const targetPreviewImage = document.getElementById('target-preview-image');
const targetPreviewVideo = document.getElementById('target-preview-video');
// Settings Elements
const keepFpsCheckbox = document.getElementById('keep-fps');
const keepAudioCheckbox = document.getElementById('keep-audio');
const manyFacesCheckbox = document.getElementById('many-faces'); // General many_faces
const mapFacesCheckbox = document.getElementById('map-faces-checkbox'); // Specific for face mapping UI
const mouthMaskCheckbox = document.getElementById('mouth-mask');
// Add other settings elements here
// Status Element
const statusMessage = document.getElementById('status-message');
// Action Elements
const startProcessingButton = document.getElementById('start-processing');
const livePreviewButton = document.getElementById('live-preview');
const processedPreviewImage = document.getElementById('processed-preview');
const outputArea = document.getElementById('output-area');
const downloadLink = document.getElementById('download-link');
// Face Mapper Elements
const faceMapperContainer = document.getElementById('face-mapper-container');
const faceMapperArea = document.getElementById('face-mapper-area');
const submitFaceMappingsButton = document.getElementById('submit-face-mappings');
const faceMapperStatus = document.getElementById('face-mapper-status');
// WebApp state (mirroring some crucial Globals for UI logic)
let webAppGlobals = {
target_path_web: null, // Store the uploaded target file's path for UI checks
source_target_map_from_backend: [], // To hold face data from /get_target_faces_for_mapping
currentFaceMappings: [] // To store { target_id, target_image_b64, source_file, source_b64_preview }
};
// Initially hide output area and face mapper
if(outputArea) outputArea.style.display = 'none';
if(faceMapperContainer) faceMapperContainer.style.display = 'none';
if(submitFaceMappingsButton) submitFaceMappingsButton.style.display = 'none';
// Function to handle file preview (generic for source and target main previews)
function previewFile(file, imagePreviewElement, videoPreviewElement) {
const reader = new FileReader();
reader.onload = (e) => {
if (file.type.startsWith('image/')) {
imagePreviewElement.src = e.target.result;
imagePreviewElement.style.display = 'block';
if (videoPreviewElement) videoPreviewElement.style.display = 'none';
} else if (file.type.startsWith('video/')) {
if (videoPreviewElement) {
videoPreviewElement.src = e.target.result;
videoPreviewElement.style.display = 'block';
}
imagePreviewElement.style.display = 'none';
}
};
reader.readAsDataURL(file);
}
// Source File Upload
sourceFileInput.addEventListener('change', (event) => {
const file = event.target.files[0];
if (!file) return;
previewFile(file, sourcePreview, null); // Source is always an image
const formData = new FormData();
formData.append('file', file);
statusMessage.textContent = 'Uploading source...';
fetch('/upload/source', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(data => {
if (data.error) {
console.error('Source upload error:', data.error);
statusMessage.textContent = `Error: ${data.error}`;
} else {
console.log('Source uploaded:', data);
statusMessage.textContent = 'Source uploaded successfully.';
// Optionally, use data.filepath if server sends a path to a served file
}
})
.catch(error => {
console.error('Fetch error for source upload:', error);
statusMessage.textContent = 'Upload failed. Check console.';
});
});
// Target File Upload
targetFileInput.addEventListener('change', (event) => {
const file = event.target.files[0];
if (!file) return;
previewFile(file, targetPreviewImage, targetPreviewVideo); // Show preview in main target area
const formData = new FormData();
formData.append('file', file);
statusMessage.textContent = 'Uploading target...';
fetch('/upload/target', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(data => {
if (data.error) {
console.error('Target upload error:', data.error);
statusMessage.textContent = `Error: ${data.error}`;
webAppGlobals.target_path_web = null;
} else {
console.log('Target uploaded:', data);
statusMessage.textContent = 'Target uploaded successfully.';
webAppGlobals.target_path_web = data.filepath; // Store the path from backend
// If map faces is checked, try to load faces
if (mapFacesCheckbox && mapFacesCheckbox.checked) {
fetchAndDisplayTargetFaces();
}
}
})
.catch(error => {
console.error('Fetch error for target upload:', error);
statusMessage.textContent = 'Upload failed. Check console.';
webAppGlobals.target_path_web = null;
});
});
// Settings Update Logic
function sendSettings() {
const settings = {
keep_fps: keepFpsCheckbox ? keepFpsCheckbox.checked : undefined,
keep_audio: keepAudioCheckbox ? keepAudioCheckbox.checked : undefined,
many_faces: manyFacesCheckbox ? manyFacesCheckbox.checked : undefined, // General many_faces
map_faces: mapFacesCheckbox ? mapFacesCheckbox.checked : undefined, // map_faces for backend processing
mouth_mask: mouthMaskCheckbox ? mouthMaskCheckbox.checked : undefined,
// Add other settings here based on their IDs
};
// Clean undefined values
Object.keys(settings).forEach(key => settings[key] === undefined && delete settings[key]);
console.log('Sending settings:', settings);
statusMessage.textContent = 'Updating settings...';
fetch('/update_settings', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(settings)
})
.then(response => response.json())
.then(data => {
if (data.error) {
console.error('Settings update error:', data.error);
statusMessage.textContent = `Error: ${data.error}`;
} else {
console.log('Settings updated:', data);
statusMessage.textContent = 'Settings updated.';
}
})
.catch(error => {
console.error('Fetch error for settings update:', error);
statusMessage.textContent = 'Settings update failed. Check console.';
});
}
// Add event listeners to general settings checkboxes
[keepFpsCheckbox, keepAudioCheckbox, manyFacesCheckbox, mouthMaskCheckbox].forEach(checkbox => {
if (checkbox) {
checkbox.addEventListener('change', sendSettings);
}
});
// Special handling for mapFacesCheckbox as it affects UI and backend settings
if (mapFacesCheckbox) {
mapFacesCheckbox.addEventListener('change', () => {
sendSettings(); // Update backend about the map_faces state for processing
if (mapFacesCheckbox.checked && webAppGlobals.target_path_web) {
faceMapperContainer.style.display = 'block';
fetchAndDisplayTargetFaces();
} else {
if (faceMapperContainer) faceMapperContainer.style.display = 'none';
if (faceMapperArea) faceMapperArea.innerHTML = ''; // Clear existing faces
if (submitFaceMappingsButton) submitFaceMappingsButton.style.display = 'none';
if (faceMapperStatus) faceMapperStatus.textContent = 'Upload a target image and check "Map Specific Faces" to begin.';
webAppGlobals.currentFaceMappings = []; // Clear mappings
}
});
}
// Initial load of settings (optional, requires backend endpoint /get_settings)
// fetch('/get_settings')
// .then(response => response.json())
// .then(settings => {
// keepFpsCheckbox.checked = settings.keep_fps || false;
// keepAudioCheckbox.checked = settings.keep_audio || false;
// manyFacesCheckbox.checked = settings.many_faces || false;
// mouthMaskCheckbox.checked = settings.mouth_mask || false;
// // set other checkboxes
// statusMessage.textContent = 'Settings loaded.';
// })
// .catch(error => {
// console.error('Error fetching initial settings:', error);
// statusMessage.textContent = 'Could not load initial settings.';
// });
// Function to fetch and display target faces for mapping
function fetchAndDisplayTargetFaces() {
if (!mapFacesCheckbox || !mapFacesCheckbox.checked || !webAppGlobals.target_path_web) {
if (faceMapperStatus) faceMapperStatus.textContent = 'Target image not uploaded or "Map Specific Faces" not checked.';
return;
}
if (faceMapperStatus) faceMapperStatus.textContent = "Loading target faces...";
if (faceMapperContainer) faceMapperContainer.style.display = 'block'; // Show container while loading
fetch('/get_target_faces_for_mapping')
.then(response => {
if (!response.ok) {
return response.json().then(err => { throw new Error(err.error || `HTTP error ${response.status}`) });
}
return response.json();
})
.then(targetFaces => {
if (!faceMapperArea || !submitFaceMappingsButton || !faceMapperStatus) return;
faceMapperArea.innerHTML = ''; // Clear previous faces
webAppGlobals.currentFaceMappings = []; // Reset mappings
if (targetFaces.error) {
faceMapperStatus.textContent = `Error: ${targetFaces.error}`;
submitFaceMappingsButton.style.display = 'none';
return;
}
if (targetFaces.length === 0) {
faceMapperStatus.textContent = "No faces found in the target image for mapping.";
submitFaceMappingsButton.style.display = 'none';
return;
}
targetFaces.forEach(face => {
const faceDiv = document.createElement('div');
faceDiv.className = 'face-map-item'; // For styling
faceDiv.style = "border:1px solid #ccc; padding:10px; text-align:center; margin-bottom:10px;";
faceDiv.innerHTML = `<p>Target ID: ${face.id}</p>`;
const imgEl = document.createElement('img');
imgEl.src = 'data:image/jpeg;base64,' + face.image_b64;
imgEl.style = "max-width:100px; max-height:100px; display:block; margin:auto;";
faceDiv.appendChild(imgEl);
const sourceInput = document.createElement('input');
sourceInput.type = 'file';
sourceInput.accept = 'image/*';
sourceInput.id = `source-for-target-${face.id}`;
sourceInput.dataset.targetId = face.id;
sourceInput.style = "margin-top:10px;";
faceDiv.appendChild(sourceInput);
const sourcePreview = document.createElement('img');
sourcePreview.id = `source-preview-for-target-${face.id}`;
sourcePreview.style = "max-width:80px; max-height:80px; display:none; margin-top:5px; margin:auto;";
faceDiv.appendChild(sourcePreview);
faceMapperArea.appendChild(faceDiv);
// Initialize this target face in our mapping array
webAppGlobals.currentFaceMappings.push({
target_id: face.id,
target_image_b64: face.image_b64,
source_file: null,
source_b64_preview: null // Will hold base64 for preview from file reader
});
// Add event listener for the file input
sourceInput.addEventListener('change', (event) => {
const file = event.target.files[0];
const targetId = event.target.dataset.targetId;
const mappingIndex = webAppGlobals.currentFaceMappings.findIndex(m => m.target_id == targetId);
if (file && mappingIndex !== -1) {
webAppGlobals.currentFaceMappings[mappingIndex].source_file = file;
// Preview for this source
const reader = new FileReader();
reader.onload = (e) => {
sourcePreview.src = e.target.result;
sourcePreview.style.display = 'block';
webAppGlobals.currentFaceMappings[mappingIndex].source_b64_preview = e.target.result;
};
reader.readAsDataURL(file);
} else if (mappingIndex !== -1) {
webAppGlobals.currentFaceMappings[mappingIndex].source_file = null;
webAppGlobals.currentFaceMappings[mappingIndex].source_b64_preview = null;
sourcePreview.src = '#';
sourcePreview.style.display = 'none';
}
});
});
submitFaceMappingsButton.style.display = 'block';
faceMapperStatus.textContent = "Please select a source image for each target face.";
})
.catch(error => {
console.error('Error fetching/displaying target faces:', error);
if (faceMapperStatus) faceMapperStatus.textContent = `Error loading faces: ${error.message || 'Unknown error'}`;
if (submitFaceMappingsButton) submitFaceMappingsButton.style.display = 'none';
});
}
if (submitFaceMappingsButton) {
submitFaceMappingsButton.addEventListener('click', (event) => {
event.preventDefault(); // Prevent any default form submission behavior
if (faceMapperStatus) faceMapperStatus.textContent = "Submitting mappings...";
const formData = new FormData();
const targetIdsWithSource = [];
webAppGlobals.currentFaceMappings.forEach(mapping => {
if (mapping.source_file) {
formData.append(`source_file_${mapping.target_id}`, mapping.source_file, mapping.source_file.name);
targetIdsWithSource.push(mapping.target_id);
}
});
if (targetIdsWithSource.length === 0) {
if (faceMapperStatus) faceMapperStatus.textContent = "No source images selected to map.";
// Potentially clear backend maps if no sources are provided? Or backend handles this.
// For now, we can choose to send an empty list, or not send at all.
// Let's send an empty list to indicate an explicit "clear" or "submit with no new sources".
// The backend will then call simplify_maps() which would clear simple_map.
}
formData.append('target_ids_json', JSON.stringify(targetIdsWithSource));
fetch('/submit_face_mappings', {
method: 'POST',
body: formData // FormData will set Content-Type to multipart/form-data automatically
})
.then(response => {
if (!response.ok) {
return response.json().then(err => { throw new Error(err.error || `HTTP error ${response.status}`) });
}
return response.json();
})
.then(data => {
console.log('Mappings submission response:', data);
if (faceMapperStatus) faceMapperStatus.textContent = data.message || "Mappings submitted successfully.";
// Optionally hide the face mapper container or update UI
// For now, user can manually uncheck "Map Specific Faces" to hide it.
// Or, if processing is started, it will also clear.
// Consider if mapFacesCheckbox should be set to true in Globals on backend now.
// The backend /submit_face_mappings sets Globals.map_faces = True.
// We should ensure the checkbox reflects this state if it's not already.
if (mapFacesCheckbox && !mapFacesCheckbox.checked && targetIdsWithSource.length > 0) {
// If user submitted mappings, but then unchecked "Map Faces" before submission finished,
// we might want to re-check it for them, or let sendSettings handle it.
// For simplicity, backend sets Globals.map_faces = true. UI should reflect this.
// mapFacesCheckbox.checked = true; // This might trigger its change event again.
// Better to let sendSettings in mapFacesCheckbox handler manage consistency.
}
if (targetIdsWithSource.length > 0) {
statusMessage.textContent = "Face mappings ready. You can now start processing or live preview with these mappings.";
}
})
.catch(error => {
console.error('Error submitting face mappings:', error);
if (faceMapperStatus) faceMapperStatus.textContent = `Error: ${error.message || 'Failed to submit mappings.'}`;
});
});
}
// Start Processing Logic
if (startProcessingButton) {
startProcessingButton.addEventListener('click', () => {
// When starting processing, clear any live feed from the preview area
if (processedPreviewImage) {
processedPreviewImage.src = "#"; // Clear src
processedPreviewImage.style.display = 'block'; // Or 'none' if you prefer to hide it
}
// Potentially call /stop_video_feed if live feed was active and using a global camera object that needs release
// For now, just clearing the src is the main action.
statusMessage.textContent = 'Processing... Please wait.';
statusMessage.textContent = 'Processing... Please wait.';
if(outputArea) outputArea.style.display = 'none'; // Hide previous output
// Ensure settings are sent before starting, or rely on them being up-to-date
// For simplicity, we assume settings are current from checkbox listeners.
// Alternatively, call sendSettings() here and chain the fetch.
fetch('/start_processing', {
method: 'POST',
// No body needed if settings are read from Globals on backend
})
.then(response => response.json())
.then(data => {
if (data.error) {
console.error('Processing error:', data.error);
statusMessage.textContent = `Error: ${data.error}`;
if(outputArea) outputArea.style.display = 'none';
} else {
console.log('Processing complete:', data);
statusMessage.textContent = 'Processing complete!';
if (downloadLink && data.download_url) {
downloadLink.href = data.download_url; // Backend provides full URL for download
downloadLink.textContent = `Download ${data.output_filename || 'processed file'}`;
if(outputArea) outputArea.style.display = 'block';
} else {
if(outputArea) outputArea.style.display = 'none';
}
}
})
.catch(error => {
console.error('Fetch error for start processing:', error);
statusMessage.textContent = 'Processing request failed. Check console.';
if(outputArea) outputArea.style.display = 'none';
});
});
}
// Live Preview Logic
if (livePreviewButton && processedPreviewImage) {
let isLiveFeedActive = false; // State to toggle button
livePreviewButton.addEventListener('click', () => {
if (!isLiveFeedActive) {
processedPreviewImage.src = '/video_feed';
processedPreviewImage.style.display = 'block'; // Make sure it's visible
statusMessage.textContent = 'Live feed started. Navigate away or click "Stop Live Feed" to stop.';
livePreviewButton.textContent = 'Stop Live Feed';
isLiveFeedActive = true;
if(outputArea) outputArea.style.display = 'none'; // Hide download area
} else {
// Stop the feed
processedPreviewImage.src = '#'; // Clear the image source
// Optionally, set a placeholder: processedPreviewImage.src = "placeholder.jpg";
statusMessage.textContent = 'Live feed stopped.';
livePreviewButton.textContent = 'Live Preview';
isLiveFeedActive = false;
// Inform the backend to release the camera, if the backend supports it
// This is important if the camera is a shared global resource on the server.
fetch('/stop_video_feed', { method: 'POST' })
.then(response => response.json())
.then(data => console.log('Stop video feed response:', data))
.catch(error => console.error('Error stopping video feed:', error));
}
});
}
});

10
static/style.css 100644
View File

@ -0,0 +1,10 @@
body { font-family: sans-serif; margin: 20px; background-color: #f4f4f4; color: #333; }
h1, h2 { color: #2c3e50; }
.container { display: flex; margin-bottom: 20px; background-color: #fff; padding: 15px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.column { flex: 1; padding: 10px; }
#options-column label { display: block; margin-bottom: 8px; }
button { padding: 10px 15px; background-color: #3498db; color: white; border: none; border-radius: 4px; cursor: pointer; margin-right: 10px; }
button:hover { background-color: #2980b9; }
input[type="file"] { margin-bottom: 10px; }
#status-area { margin-top: 20px; padding: 10px; background-color: #e9ecef; border-radius: 4px; }
#main-preview-area img { display: block; margin-top: 10px; }

View File

@ -0,0 +1,69 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Deep-Live-Cam Web UI</title>
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
</head>
<body>
<h1>Deep-Live-Cam - Web Interface</h1>
<div class="container">
<div class="column" id="source-column">
<h2>Source Face</h2>
<input type="file" id="source-file" accept="image/*">
<img id="source-preview" src="#" alt="Source Preview" style="display:none; max-width: 200px; max-height: 200px;">
</div>
<div class="column" id="target-column">
<h2>Target Media</h2>
<input type="file" id="target-file" accept="image/*,video/*">
<img id="target-preview-image" src="#" alt="Target Image Preview" style="display:none; max-width: 200px; max-height: 200px;">
<video id="target-preview-video" controls style="display:none; max-width: 200px; max-height: 200px;"></video>
</div>
</div>
<div class="container">
<div class="column" id="options-column">
<h2>Options</h2>
<label><input type="checkbox" id="keep-fps"> Keep FPS</label><br>
<label><input type="checkbox" id="keep-audio"> Keep Audio</label><br>
<label><input type="checkbox" id="many-faces"> Many Faces</label><br> <!-- This is the general many_faces toggle -->
<label><input type="checkbox" id="map-faces-checkbox"> Map Specific Faces (Image Target Only)</label><br> <!-- Specific for face mapping UI -->
<label><input type="checkbox" id="mouth-mask"> Mouth Mask</label><br>
<!-- Add more switches as needed -->
</div>
<div class="column" id="actions-column">
<h2>Actions</h2>
<button id="start-processing">Start Processing</button>
<button id="live-preview">Live Preview</button>
</div>
</div>
<div class="container" id="main-preview-area">
<h2>Live/Processed Preview</h2>
<img id="processed-preview" src="#" alt="Preview Area" style="max-width: 640px; max-height: 480px; border: 1px solid black; display: block; margin-top: 10px;">
</div>
<div id="status-area">
<p>Status: <span id="status-message">Idle</span></p>
</div>
<div id="output-area" style="display:none;">
<a id="download-link" href="#">Download Output</a>
</div>
<div id="face-mapper-container" style="display:none; margin-top: 20px; padding: 15px; background-color: #e9e9e9; border-radius: 8px;">
<h2>Face Mapper</h2>
<p id="face-mapper-status">Upload a target image and check "Map Specific Faces" to begin.</p>
<div id="face-mapper-area" style="display: flex; flex-wrap: wrap; gap: 20px; margin-top:10px; margin-bottom:10px;">
<!-- Target faces will be dynamically added here -->
</div>
<button id="submit-face-mappings" style="margin-top: 20px; display:none;">Submit Face Mappings</button>
</div>
<script src="{{ url_for('static', filename='main.js') }}"></script>
</body>
</html>

525
webapp.py 100644
View File

@ -0,0 +1,525 @@
import os
import cv2
import shutil
import time
import base64
import json # For parsing target_ids_json
from flask import Flask, render_template, request, jsonify, send_from_directory, Response
from flask_cors import CORS
from werkzeug.utils import secure_filename
import modules.globals as Globals
import modules.core as core
from modules.utilities import normalize_output_path, get_temp_directory_path, is_image as util_is_image
from modules.face_analyser import get_one_face, get_many_faces, get_unique_faces_from_target_image, simplify_maps # Added simplify_maps
import modules.processors.frame.core as frame_processors_core
VIDEO_CAMERA = None
target_path_web = None
prev_time = 0
frame_count = 0 # For FPS calculation
current_fps = 0 # For FPS calculation
# Attempt to load initial settings from a file if it exists
# This is a placeholder for more sophisticated settings management.
# For now, we rely on defaults in modules.globals or explicit setting via UI.
# if os.path.exists('switch_states.json'):
# try:
# with open('switch_states.json', 'r') as f:
# import json
# states = json.load(f)
# # Assuming states directly map to Globals attributes
# for key, value in states.items():
# if hasattr(Globals, key):
# setattr(Globals, key, value)
# except Exception as e:
# print(f"Error loading switch_states.json: {e}")
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads')
PROCESSED_OUTPUTS_FOLDER = os.path.join(os.getcwd(), 'processed_outputs') # Added
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
if not os.path.exists(PROCESSED_OUTPUTS_FOLDER): # Added
os.makedirs(PROCESSED_OUTPUTS_FOLDER)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['PROCESSED_OUTPUTS_FOLDER'] = PROCESSED_OUTPUTS_FOLDER # Added
@app.route('/')
def index(): # Renamed from hello_world
return render_template('index.html')
@app.route('/upload/source', methods=['POST'])
def upload_source():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file:
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
Globals.source_path = filepath
return jsonify({'message': 'Source uploaded', 'filepath': filepath}), 200
@app.route('/upload/target', methods=['POST'])
def upload_target():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
global target_path_web # Use the web-specific target path
if file:
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
Globals.target_path = filepath # This is for the core processing engine
target_path_web = filepath # This is for UI state, like triggering face mapping
# Provide a URL to the uploaded file for preview if desired, requires a new endpoint or serving 'uploads' statically
# For now, client-side preview is used.
return jsonify({'message': 'Target uploaded', 'filepath': filepath, 'file_url': f'/uploads/{filename}'}), 200
@app.route('/uploads/<filename>') # Simple endpoint to serve uploaded files for preview
def uploaded_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
@app.route('/update_settings', methods=['POST'])
def update_settings():
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Update Globals based on received data
# Example:
if 'keep_fps' in data:
Globals.keep_fps = bool(data['keep_fps'])
if 'keep_audio' in data:
Globals.keep_audio = bool(data['keep_audio'])
if 'many_faces' in data:
Globals.many_faces = bool(data['many_faces'])
if 'mouth_mask' in data: # HTML ID is 'mouth-mask'
Globals.mouth_mask = bool(data['mouth_mask']) # Maps to Globals.mouth_mask
# Add more settings as they are defined in Globals and the UI
if 'frame_processors' in data: # Example for a more complex setting
Globals.frame_processors = data['frame_processors'] # Assuming it's a list of strings
# A more generic way if keys match Globals attributes:
# for key, value in data.items():
# if hasattr(Globals, key):
# # Be careful with types, e.g. ensuring booleans are booleans
# if isinstance(getattr(Globals, key, None), bool):
# setattr(Globals, key, bool(value))
# else:
# setattr(Globals, key, value)
return jsonify({'message': 'Settings updated'}), 200
@app.route('/start_processing', methods=['POST'])
def start_processing():
if not Globals.source_path or not os.path.exists(Globals.source_path):
return jsonify({'error': 'Source path not set or invalid'}), 400
if not Globals.target_path or not os.path.exists(Globals.target_path):
return jsonify({'error': 'Target path not set or invalid'}), 400
# Determine a unique output filename and set Globals.output_path
target_filename = os.path.basename(Globals.target_path)
filename, ext = os.path.splitext(target_filename)
unique_output_filename = f"{filename}_processed_{int(time.time())}{ext}"
Globals.output_path = os.path.join(app.config['PROCESSED_OUTPUTS_FOLDER'], unique_output_filename)
# Ensure default frame processors are set if none are provided by the client
if not Globals.frame_processors:
Globals.frame_processors = ['face_swapper'] # Default to face_swapper
print("Warning: No frame processors selected by client, defaulting to 'face_swapper'.")
try:
# Log current settings being used
print(f"Preparing to process with core engine. Source: {Globals.source_path}, Target: {Globals.target_path}, Output: {Globals.output_path}")
print(f"Options: Keep FPS: {Globals.keep_fps}, Keep Audio: {Globals.keep_audio}, Many Faces: {Globals.many_faces}")
print(f"Frame Processors: {Globals.frame_processors}")
# Ensure necessary resources are available and limited (e.g. memory)
# This was part of the old core.run() sequence.
# Consider if pre_check from core should be called here too, or if it's mainly for CLI
# For now, webapp assumes inputs are valid if they exist.
core.limit_resources()
# Call the refactored core processing function
processing_result = core.process_media()
if processing_result.get('success'):
final_output_path = processing_result.get('output_path', Globals.output_path) # Use path from result if available
# Ensure the unique_output_filename matches the actual output from process_media if it changed it
# For now, we assume process_media uses Globals.output_path as set above.
print(f"Core processing successful. Output at: {final_output_path}")
return jsonify({
'message': 'Processing complete',
'output_filename': os.path.basename(final_output_path),
'download_url': f'/get_output/{os.path.basename(final_output_path)}'
})
else:
print(f"Core processing failed: {processing_result.get('error')}")
# If NSFW, include that info if process_media provides it
if processing_result.get('nsfw'):
return jsonify({'error': processing_result.get('error', 'NSFW content detected.'), 'nsfw': True}), 400 # Bad request due to content
return jsonify({'error': processing_result.get('error', 'Unknown error during processing')}), 500
except Exception as e:
# This is a fallback for unexpected errors not caught by core.process_media
print(f"An unexpected error occurred in /start_processing endpoint: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': f'An critical unexpected error occurred: {str(e)}'}), 500
finally:
# Always attempt to clean up temp files, regardless of success or failure
# core.cleanup_temp_files() takes no args now for webapp context (quit_app=False is default)
print("Executing cleanup of temporary files from webapp.")
core.cleanup_temp_files()
@app.route('/get_output/<filename>')
def get_output(filename):
return send_from_directory(app.config['PROCESSED_OUTPUTS_FOLDER'], filename, as_attachment=True)
if __name__ == '__main__':
# Initialize any necessary globals or configurations from core logic if needed
# For example, if core.parse_args() sets up initial globals from some defaults:
# import modules.core as main_core
# main_core.parse_args([]) # Pass empty list or appropriate defaults if it expects CLI args
# For development, directly run the Flask app.
# For production, a WSGI server like Gunicorn would be used.
app.run(debug=True, host='0.0.0.0', port=5000)
# Video Feed Section
def generate_frames():
global VIDEO_CAMERA
global VIDEO_CAMERA, prev_time, frame_count, current_fps
print("generate_frames: Attempting to open camera...")
# Determine camera index (e.g., from Globals or default to 0)
camera_index = 0 # Or Globals.camera_index if you add such a setting
VIDEO_CAMERA = cv2.VideoCapture(camera_index)
if not VIDEO_CAMERA.isOpened():
print(f"Error: Could not open video camera at index {camera_index}.")
# TODO: Yield a placeholder image with an error message
return
print("generate_frames: Camera opened. Initializing settings for live processing.")
prev_time = time.time()
frame_count = 0
current_fps = 0
source_face = None
if Globals.source_path and not Globals.map_faces: # map_faces logic for live might be complex
try:
source_image_cv2 = cv2.imread(Globals.source_path)
if source_image_cv2 is not None:
source_face = get_one_face(source_image_cv2)
if source_face is None:
print("Warning: No face found in source image for live preview.")
except Exception as e:
print(f"Error loading source image for live preview: {e}")
# Get frame processors
# Ensure Globals.frame_processors is a list. If it can be None, default to an empty list.
current_frame_processors = Globals.frame_processors if Globals.frame_processors is not None else []
active_frame_processors = frame_processors_core.get_frame_processors_modules(current_frame_processors)
# Example: Conditionally remove face enhancer if its toggle is off
# This assumes fp_ui structure; adjust if it's different or not used for live mode.
if not Globals.fp_ui.get('face_enhancer', False) and any(p.NAME == 'DLC.FACE-ENHANCER' for p in active_frame_processors):
active_frame_processors = [p for p in active_frame_processors if p.NAME != 'DLC.FACE-ENHANCER']
print("Live Preview: Face Enhancer disabled by UI toggle.")
print(f"Live Preview: Active processors: {[p.NAME for p in active_frame_processors if hasattr(p, 'NAME')]}")
try:
while VIDEO_CAMERA and VIDEO_CAMERA.isOpened(): # Check if VIDEO_CAMERA is not None
success, frame = VIDEO_CAMERA.read()
if not success:
print("Error: Failed to read frame from camera during live feed.")
break
processed_frame = frame.copy()
if Globals.live_mirror:
processed_frame = cv2.flip(processed_frame, 1)
# Apply Processing
# Apply Processing
if Globals.map_faces:
if Globals.simple_map: # Check if mappings are submitted and processed
for processor in active_frame_processors:
if hasattr(processor, 'process_frame_v2') and callable(processor.process_frame_v2):
try:
processed_frame = processor.process_frame_v2(processed_frame)
except Exception as e:
print(f"Error applying mapped processor {processor.NAME if hasattr(processor, 'NAME') else 'Unknown'} in live feed: {e}")
cv2.putText(processed_frame, "Error in mapped processing", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# else: No v2 method, map_faces might not apply or needs different handling
else: # map_faces is true, but mappings not submitted/valid
cv2.putText(processed_frame, "Map Faces: Mappings not submitted or invalid.", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
elif source_face: # Not map_faces, but single source face is available
for processor in active_frame_processors:
try:
if hasattr(processor, 'process_frame') and callable(processor.process_frame):
if processor.NAME == 'DLC.FACE-ENHANCER':
processed_frame = processor.process_frame(None, processed_frame)
else:
processed_frame = processor.process_frame(source_face, processed_frame)
except Exception as e:
print(f"Error applying single source processor {processor.NAME if hasattr(processor, 'NAME') else 'Unknown'} in live feed: {e}")
elif not Globals.source_path: # No map_faces and no single source image
cv2.putText(processed_frame, "No Source Image Selected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# FPS Calculation & Overlay
if Globals.show_fps:
frame_count += 1
now = time.time()
# Calculate FPS over a 1-second interval
if (now - prev_time) > 1:
current_fps = frame_count / (now - prev_time)
prev_time = now
frame_count = 0
cv2.putText(processed_frame, f"FPS: {current_fps:.2f}", (10, processed_frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# Encode the processed_frame to JPEG
ret, buffer = cv2.imencode('.jpg', processed_frame)
if not ret:
print("Error: Failed to encode processed frame to JPEG.")
continue
frame_bytes = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
except GeneratorExit:
print("generate_frames: Client disconnected.")
except Exception as e:
print(f"Exception in generate_frames main loop: {e}")
import traceback
traceback.print_exc()
finally:
print("generate_frames: Releasing camera.")
if VIDEO_CAMERA:
VIDEO_CAMERA.release()
VIDEO_CAMERA = None # Reset global camera object
@app.route('/video_feed')
def video_feed():
print("Request received for /video_feed")
return Response(generate_frames(),
mimetype='multipart/x-mixed-replace; boundary=frame')
# Optional: Endpoint to explicitly stop the camera if needed.
# This is tricky with a global VIDEO_CAMERA and HTTP's stateless nature.
# A more robust solution might involve websockets or a different camera management strategy.
@app.route('/stop_video_feed', methods=['POST'])
def stop_video_feed():
global VIDEO_CAMERA
print("/stop_video_feed called")
if VIDEO_CAMERA:
print("Releasing video camera from /stop_video_feed")
VIDEO_CAMERA.release()
VIDEO_CAMERA = None
return jsonify({'message': 'Video feed stopped.'})
return jsonify({'message': 'No active video feed to stop.'})
@app.route('/get_target_faces_for_mapping', methods=['GET'])
def get_target_faces_for_mapping_route():
global target_path_web # Use the web-specific target path
if not target_path_web or not os.path.exists(target_path_web):
return jsonify({'error': 'Target image not uploaded or path is invalid.'}), 400
if not util_is_image(target_path_web): # Use the utility function for checking image type
return jsonify({'error': 'Target file is not a valid image for face mapping.'}), 400
try:
# This function will populate Globals.source_target_map
# It expects the target image path to be in Globals.target_path for its internal logic
# So, ensure Globals.target_path is also set to target_path_web for this call
# This is a bit of a workaround due to how get_unique_faces_from_target_image uses Globals
original_global_target_path = Globals.target_path
Globals.target_path = target_path_web
get_unique_faces_from_target_image() # This should fill Globals.source_target_map
# Restore original Globals.target_path if it was different (e.g. from a previous full processing run)
# For web UI flow, target_path_web and Globals.target_path will typically be the same after an upload.
Globals.target_path = original_global_target_path
if not Globals.source_target_map:
return jsonify({'error': 'No faces found in the target image or error during analysis.'}), 404
response_data = []
for item in Globals.source_target_map:
target_cv2_img = item['target']['cv2']
if target_cv2_img is None: # Should not happen if map is populated correctly
continue
_, buffer = cv2.imencode('.jpg', target_cv2_img)
b64_img = base64.b64encode(buffer).decode('utf-8')
response_data.append({'id': item['id'], 'image_b64': b64_img})
return jsonify(response_data)
except Exception as e:
print(f"Error in /get_target_faces_for_mapping: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': f'An unexpected error occurred: {str(e)}'}), 500
@app.route('/submit_face_mappings', methods=['POST'])
def submit_face_mappings_route():
if 'target_ids_json' not in request.form:
return jsonify({'error': 'No target_ids_json provided.'}), 400
try:
target_ids = json.loads(request.form['target_ids_json'])
except json.JSONDecodeError:
return jsonify({'error': 'Invalid JSON in target_ids_json.'}), 400
if not Globals.source_target_map:
# This implies /get_target_faces_for_mapping was not called or failed.
# Or, it could be cleared. Re-populate it if target_path_web is available.
if target_path_web and os.path.exists(target_path_web) and util_is_image(target_path_web):
print("Re-populating source_target_map as it was empty during submit.")
original_global_target_path = Globals.target_path
Globals.target_path = target_path_web
get_unique_faces_from_target_image()
Globals.target_path = original_global_target_path
if not Globals.source_target_map:
return jsonify({'error': 'Could not re-initialize target faces. Please re-upload target image.'}), 500
else:
return jsonify({'error': 'Target face map not initialized. Please upload target image again.'}), 500
all_mappings_valid = True
processed_ids = set()
for target_id_str in target_ids:
target_id = int(target_id_str) # Ensure it's an integer if IDs are integers
file_key = f'source_file_{target_id}'
if file_key not in request.files:
print(f"Warning: Source file for target_id {target_id} not found in submission.")
# Mark this mapping as invalid or skip? For now, we require all submitted IDs to have files.
# If a file is optional for a target, client should not include its ID in target_ids_json.
# However, Globals.source_target_map will still have this target. We just won't assign a source to it.
continue
source_file = request.files[file_key]
if source_file.filename == '':
print(f"Warning: Empty filename for source file for target_id {target_id}.")
continue # Skip if no file was actually selected for this input
# Save the uploaded source file temporarily for this mapping
temp_source_filename = f"temp_source_for_target_{target_id}_{secure_filename(source_file.filename)}"
temp_source_filepath = os.path.join(app.config['UPLOAD_FOLDER'], temp_source_filename)
source_file.save(temp_source_filepath)
source_cv2_img = cv2.imread(temp_source_filepath)
if source_cv2_img is None:
print(f"Error: Could not read saved source image for target_id {target_id} from {temp_source_filepath}")
# all_mappings_valid = False # Decide if one bad source fails all
# os.remove(temp_source_filepath) # Clean up
continue # Skip this mapping
source_face_obj = get_one_face(source_cv2_img) # This also returns the cropped face usually
if source_face_obj:
map_entry_found = False
for map_item in Globals.source_target_map:
if str(map_item['id']) == str(target_id): # Compare as strings or ensure IDs are consistent type
# The 'face' from get_one_face is the full Face object.
# The 'cv2' image from get_one_face is the cropped face.
# We need to store both, similar to how the original UI might have done.
# Let's assume get_one_face returns a tuple (Face_object, cropped_cv2_image)
# or that Face_object itself contains the cropped image if needed later.
# For now, storing the Face object which includes embedding and bbox.
# The cropped image can be re-derived or stored if `get_one_face` provides it.
# Let's assume `get_one_face` is just the Face object for simplicity here,
# and the cropped image for `source_target_map` needs to be handled.
# A better `get_one_face` might return a dict {'face': Face, 'cv2': cropped_img}
# Simplified: get_one_face returns the Face object, and we'll use that.
# The `ui.update_popup_source` implies the map needs {'cv2': cropped_img, 'face': Face_obj}
# Let's assume `source_face_obj` is the Face object. We need its cropped image.
# This might require a helper or for get_one_face to return it.
# For now, we'll store the Face object. The cropped image part for source_target_map
# might need adjustment based on face_analyser's exact return for get_one_face.
# A common pattern is that the Face object itself has bbox, and you can crop from original using that.
# Let's assume we need to manually crop based on the Face object from get_one_face
# This is a placeholder - exact cropping depends on what get_one_face returns and what processors need
# For now, we'll just store the Face object.
# If `face_swapper`'s `process_frame_v2` needs cropped source images in `source_target_map`,
# this part needs to ensure they are correctly populated.
# For simplicity, assuming `get_one_face` returns the main `Face` object, and `face_swapper` can use that.
# The `source_target_map` structure is critical.
# Looking at `face_swapper.py`, `process_frame_v2` uses `Globals.simple_map`.
# `simplify_maps()` populates `simple_map` from `source_target_map`.
# `simplify_maps()` expects `item['source']['face']` to be the source `Face` object.
map_item['source'] = {'face': source_face_obj, 'cv2': source_cv2_img} # Store the original uploaded source, not necessarily cropped yet. Processors handle cropping.
map_entry_found = True
processed_ids.add(target_id)
break
if not map_entry_found:
print(f"Warning: Target ID {target_id} from submission not found in existing map.")
all_mappings_valid = False # Or handle as error
else:
print(f"Warning: No face found in uploaded source for target_id {target_id}.")
# Mark this specific mapping as invalid by not adding a 'source' to it, or removing it.
# For now, we just don't add a source. simplify_maps should handle items without a source.
all_mappings_valid = False # if strict, one failed source makes all invalid for this submission batch
# Clean up the temporary saved source file
if os.path.exists(temp_source_filepath):
os.remove(temp_source_filepath)
# Clear 'source' for any target_ids that were in source_target_map but not in this submission
# or if their source file didn't yield a face.
for map_item in Globals.source_target_map:
if map_item['id'] not in processed_ids and 'source' in map_item:
del map_item['source']
if not all_mappings_valid: # Or based on a stricter check
# simplify_maps() will still run and create mappings for valid pairs
print("simplify_maps: Some mappings may be invalid or incomplete.")
simplify_maps() # Populate Globals.simple_map based on updated Globals.source_target_map
# For debugging:
# print("Updated source_target_map:", Globals.source_target_map)
# print("Generated simple_map:", Globals.simple_map)
if not Globals.simple_map and all_mappings_valid and target_ids: # If all submitted were meant to be valid but simple_map is empty
return jsonify({'error': 'Mappings processed, but no valid face pairs were established. Check source images.'}), 400
Globals.map_faces = True # Crucial: Set this global so processing functions know to use the map
return jsonify({'message': 'Face mappings submitted and processed.'})
# except Exception as e:
# print(f"Error in /submit_face_mappings: {e}")
# import traceback
# traceback.print_exc()
# return jsonify({'error': f'An unexpected error occurred: {str(e)}'}), 500