Revert "Merge pull request #566 from pereiraroland26/main"

This reverts commit 5d450b4352.
experimental
Kenneth Estanislao 2024-10-04 15:57:48 +08:00
parent 5d450b4352
commit 61dae91439
8 changed files with 323 additions and 973 deletions

2
.gitignore vendored
View File

@ -20,5 +20,3 @@ models/inswapper_128.onnx
models/GFPGANv1.4.pth models/GFPGANv1.4.pth
*.onnx *.onnx
models/DMDNet.pth models/DMDNet.pth
faceswap/
.vscode/

219
README.md
View File

@ -1,4 +1,3 @@
![demo-gif](demo.gif) ![demo-gif](demo.gif)
@ -7,7 +6,7 @@ This software is meant to be a productive contribution to the rapidly growing AI
The developers of this software are aware of its possible unethical applications and are committed to take preventative measures against them. It has a built-in check which prevents the program from working on inappropriate media including but not limited to nudity, graphic content, sensitive material such as war footage etc. We will continue to develop this project in the positive direction while adhering to law and ethics. This project may be shut down or include watermarks on the output if requested by law. The developers of this software are aware of its possible unethical applications and are committed to take preventative measures against them. It has a built-in check which prevents the program from working on inappropriate media including but not limited to nudity, graphic content, sensitive material such as war footage etc. We will continue to develop this project in the positive direction while adhering to law and ethics. This project may be shut down or include watermarks on the output if requested by law.
Users of this software are expected to use this software responsibly while abiding by local laws. If the face of a real person is being used, users are required to get consent from the concerned person and clearly mention that it is a deepfake when posting content online. Developers of this software will not be responsible for actions of end-users. Users of this software are expected to use this software responsibly while abiding the local law. If face of a real person is being used, users are suggested to get consent from the concerned person and clearly mention that it is a deepfake when posting content online. Developers of this software will not be responsible for actions of end-users.
## How do I install it? ## How do I install it?
@ -25,7 +24,7 @@ Users of this software are expected to use this software responsibly while abidi
#### 3. Download Models #### 3. Download Models
1. [GFPGANv1.4](https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth) 1. [GFPGANv1.4](https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth)
2. [inswapper_128_fp16.onnx](https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx) *(Note: Use this [replacement version](https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128_fp16.onnx) if an issue occurs on your computer)* 2. [inswapper_128_fp16.onnx](https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128.onnx)
Then put those 2 files on the "**models**" folder Then put those 2 files on the "**models**" folder
@ -34,29 +33,27 @@ We highly recommend to work with a `venv` to avoid issues.
``` ```
pip install -r requirements.txt pip install -r requirements.txt
``` ```
For MAC OS, You have to install or upgrade python-tk package: ##### DONE!!! If you dont have any GPU, You should be able to run roop using `python run.py` command. Keep in mind that while running the program for first time, it will download some models which can take time depending on your network connection.
```
brew install python-tk@3.10
```
##### DONE!!! If you don't have any GPU, You should be able to run roop using `python run.py` command. Keep in mind that while running the program for first time, it will download some models which can take time depending on your network connection.
#### 5. Proceed if you want to use GPU acceleration (optional)
<details>
<summary>Click to see the details</summary>
### *Proceed if you want to use GPU Acceleration
### CUDA Execution Provider (Nvidia)* ### CUDA Execution Provider (Nvidia)*
1. Install [CUDA Toolkit 11.8](https://developer.nvidia.com/cuda-11-8-0-download-archive) 1. Install [CUDA Toolkit 11.8](https://developer.nvidia.com/cuda-11-8-0-download-archive)
2. Install dependencies: 2. Install dependencies:
``` ```
pip uninstall onnxruntime onnxruntime-gpu pip uninstall onnxruntime onnxruntime-gpu
pip install onnxruntime-gpu==1.16.3 pip install onnxruntime-gpu==1.16.3
``` ```
3. Usage in case the provider is available: 3. Usage in case the provider is available:
``` ```
python run.py --execution-provider cuda python run.py --execution-provider cuda
``` ```
### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#coreml-execution-provider-apple-silicon)CoreML Execution Provider (Apple Silicon) ### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#coreml-execution-provider-apple-silicon)CoreML Execution Provider (Apple Silicon)
@ -66,52 +63,65 @@ python run.py --execution-provider cuda
``` ```
pip uninstall onnxruntime onnxruntime-silicon pip uninstall onnxruntime onnxruntime-silicon
pip install onnxruntime-silicon==1.13.1 pip install onnxruntime-silicon==1.13.1
``` ```
2. Usage in case the provider is available: 2. Usage in case the provider is available:
``` ```
python run.py --execution-provider coreml python run.py --execution-provider coreml
``` ```
### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#coreml-execution-provider-apple-legacy)CoreML Execution Provider (Apple Legacy) ### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#coreml-execution-provider-apple-legacy)CoreML Execution Provider (Apple Legacy)
1. Install dependencies: 1. Install dependencies:
``` ```
pip uninstall onnxruntime onnxruntime-coreml pip uninstall onnxruntime onnxruntime-coreml
pip install onnxruntime-coreml==1.13.1 pip install onnxruntime-coreml==1.13.1
``` ```
2. Usage in case the provider is available: 2. Usage in case the provider is available:
``` ```
python run.py --execution-provider coreml python run.py --execution-provider coreml
``` ```
### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#directml-execution-provider-windows)DirectML Execution Provider (Windows) ### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#directml-execution-provider-windows)DirectML Execution Provider (Windows)
1. Install dependencies: 1. Install dependencies:
``` ```
pip uninstall onnxruntime onnxruntime-directml pip uninstall onnxruntime onnxruntime-directml
pip install onnxruntime-directml==1.15.1 pip install onnxruntime-directml==1.15.1
``` ```
2. Usage in case the provider is available: 2. Usage in case the provider is available:
``` ```
python run.py --execution-provider directml python run.py --execution-provider directml
``` ```
### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#openvino-execution-provider-intel)OpenVINO™ Execution Provider (Intel) ### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#openvino-execution-provider-intel)OpenVINO™ Execution Provider (Intel)
1. Install dependencies: 1. Install dependencies:
``` ```
pip uninstall onnxruntime onnxruntime-openvino pip uninstall onnxruntime onnxruntime-openvino
pip install onnxruntime-openvino==1.15.0 pip install onnxruntime-openvino==1.15.0
``` ```
2. Usage in case the provider is available: 2. Usage in case the provider is available:
``` ```
python run.py --execution-provider openvino python run.py --execution-provider openvino
``` ```
</details>
## How do I use it? ## How do I use it?
> Note: When you run this program for the first time, it will download some models ~300MB in size. > Note: When you run this program for the first time, it will download some models ~300MB in size.
@ -125,29 +135,29 @@ Choose a face (image with desired face) and the target image/video (image/video
Just follow the clicks on the screenshot Just follow the clicks on the screenshot
1. Select a face 1. Select a face
2. Click live 2. Click live
3. Wait for a few seconds (it takes a longer time, usually 10 to 30 seconds before the preview shows up) 3. Wait for a few second (it takes a longer time, usually 10 to 30 seconds before the preview shows up)
![demo-gif](demo.gif) ![demo-gif](demo.gif)
Just use your favorite screencapture to stream like OBS Just use your favorite screencapture to stream like OBS
> Note: In case you want to change your face, just select another picture, the preview mode will then restart (so just wait a bit). > Note: In case you want to change your face, just select another picture, the preview mode will then restart (so just wait a bit).
You can now use the virtual camera output (uses pyvirtualcam) by turning on the `Virtual Cam Output (OBS)` toggle which should output to the OBS Virtual Camera. Note: this may not work on macOS. You will get a preview as before, but now you will also have a virtual camera output which can be used in applications like Zoom.
Additional command line arguments are given below. To learn out what they do, check [this guide](https://github.com/s0md3v/roop/wiki/Advanced-Options). Additional command line arguments are given below. To learn out what they do, check [this guide](https://github.com/s0md3v/roop/wiki/Advanced-Options).
``` ```
options: options:
-h, --help show this help message and exit -h, --help show this help message and exit
-s SOURCE_PATH, --source SOURCE_PATH select a source image -s SOURCE_PATH, --source SOURCE_PATH select an source image
-t TARGET_PATH, --target TARGET_PATH select a target image or video -t TARGET_PATH, --target TARGET_PATH select an target image or video
-o OUTPUT_PATH, --output OUTPUT_PATH select output file or directory -o OUTPUT_PATH, --output OUTPUT_PATH select output file or directory
--frame-processor FRAME_PROCESSOR [FRAME_PROCESSOR ...] frame processors (choices: face_swapper, face_enhancer, ...) --frame-processor FRAME_PROCESSOR [FRAME_PROCESSOR ...] frame processors (choices: face_swapper, face_enhancer, super_resolution...)
--keep-fps keep original fps --keep-fps keep original fps
--keep-audio keep original audio --keep-audio keep original audio
--keep-frames keep temporary frames --keep-frames keep temporary frames
--many-faces process every face --many-faces process every face
--map-faces map source target faces
--nsfw-filter filter the NSFW image or video
--video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder --video-encoder {libx264,libx265,libvpx-vp9} adjust output video encoder
--video-quality [0-51] adjust output video quality --video-quality [0-51] adjust output video quality
--live-mirror the live camera display as you see it in the front-facing camera frame --live-mirror the live camera display as you see it in the front-facing camera frame
@ -155,175 +165,24 @@ options:
--max-memory MAX_MEMORY maximum amount of RAM in GB --max-memory MAX_MEMORY maximum amount of RAM in GB
--execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...) --execution-provider {cpu} [{cpu} ...] available execution provider (choices: cpu, ...)
--execution-threads EXECUTION_THREADS number of execution threads --execution-threads EXECUTION_THREADS number of execution threads
--headless run in headless mode
--enhancer-upscale-factor Sets the upscale factor for the enhancer. Only applies if `face_enhancer` is set as a frame-processor
--source-image-scaling-factor Set the upscale factor for source images. Only applies if `face_swapper` is set as a frame-processor
-r SCALE, --super-resolution-scale-factor SCALE Super resolution scale factor, choices are 2, 3, 4
-v, --version show program's version number and exit -v, --version show program's version number and exit
``` ```
Looking for a CLI mode? Using the -s/--source argument will make the run program in cli mode. Looking for a CLI mode? Using the -s/--source argument will make the run program in cli mode.
### Webcam mode on Windows 11 using WSL2 Ubuntu (optional) To improve the video quality, you can use the `super_resolution` frame processor after swapping the faces. It will enhance the video quality by 2x, 3x or 4x. You can set the upscale factor using the `-r` or `--super-resolution-scale-factor` argument.
Processing time will increase with the upscale factor, but it's quite quick.
<details>
<summary>Click to see the details</summary>
If you want to use WSL2 on Windows 11 you will notice, that Ubuntu WSL2 doesn't come with USB-Webcam support in the Kernel. You need to do two things: Compile the Kernel with the right modules integrated and forward your USB Webcam from Windows to Ubuntu with the usbipd app. Here are detailed Steps:
This tutorial will guide you through the process of setting up WSL2 Ubuntu with USB webcam support, rebuilding the kernel, and preparing the environment for the Deep-Live-Cam project.
#### 1. Install WSL2 Ubuntu
Install WSL2 Ubuntu from the Microsoft Store or using PowerShell:
#### 2. Enable USB Support in WSL2
1. Install the USB/IP tool for Windows:
[https://learn.microsoft.com/en-us/windows/wsl/connect-usb](https://learn.microsoft.com/en-us/windows/wsl/connect-usb)
2. In Windows PowerShell (as Administrator), connect your webcam to WSL:
```powershell
usbipd list
usbipd bind --busid x-x # Replace x-x with your webcam's bus ID
usbipd attach --wsl --busid x-x # Replace x-x with your webcam's bus ID
```
You need to redo the above every time you reboot wsl or re-connect your webcam/usb device.
#### 3. Rebuild WSL2 Ubuntu Kernel with USB and Webcam Modules
Follow these steps to rebuild the kernel:
1. Start with this guide: [https://github.com/PINTO0309/wsl2_linux_kernel_usbcam_enable_conf](https://github.com/PINTO0309/wsl2_linux_kernel_usbcam_enable_conf)
2. When you reach the `sudo wget [github.com](http://github.com/)...PINTO0309` step, which won't work for newer kernel versions, follow this video instead or alternatively follow the video tutorial from the beginning:
[https://www.youtube.com/watch?v=t_YnACEPmrM](https://www.youtube.com/watch?v=t_YnACEPmrM)
Additional info: [https://askubuntu.com/questions/1413377/camera-not-working-in-cheese-in-wsl2](https://askubuntu.com/questions/1413377/camera-not-working-in-cheese-in-wsl2)
3. After rebuilding, restart WSL with the new kernel.
#### 4. Set Up Deep-Live-Cam Project
Within Ubuntu:
1. Clone the repository:
```bash
git clone [https://github.com/hacksider/Deep-Live-Cam](https://github.com/hacksider/Deep-Live-Cam)
```
2. Follow the installation instructions in the repository, including cuda toolkit 11.8, make 100% sure it's not cuda toolkit 12.x.
#### 5. Verify and Load Kernel Modules
1. Check if USB and webcam modules are built into the kernel:
```bash
zcat /proc/config.gz | grep -i "CONFIG_USB_VIDEO_CLASS"
```
2. If modules are loadable (m), not built-in (y), check if the file exists:
```bash
ls /lib/modules/$(uname -r)/kernel/drivers/media/usb/uvc/
```
3. Load the module and check for errors (optional if built-in):
```bash
sudo modprobe uvcvideo
dmesg | tail
```
4. Verify video devices:
```bash
sudo ls -al /dev/video*
```
#### 6. Set Up Permissions
1. Add user to video group and set permissions:
```bash
sudo usermod -a -G video $USER
sudo chgrp video /dev/video0 /dev/video1
sudo chmod 660 /dev/video0 /dev/video1
```
2. Create a udev rule for permanent permissions:
```bash
sudo nano /etc/udev/rules.d/81-webcam.rules
```
Add this content:
``` ```
KERNEL=="video[0-9]*", GROUP="video", MODE="0660"
```
3. Reload udev rules:
```bash
sudo udevadm control --reload-rules && sudo udevadm trigger
```
4. Log out and log back into your WSL session.
5. Start Deep-Live-Cam with `python run.py --execution-provider cuda --max-memory 8` where 8 can be changed to the number of GB VRAM of your GPU has, minus 1-2GB. If you have a RTX3080 with 10GB I suggest adding 8GB. Leave some left for Windows.
#### Final Notes
- Steps 6 and 7 may be optional if the modules are built into the kernel and permissions are already set correctly.
- Always ensure you're using compatible versions of CUDA, ONNX, and other dependencies.
- If issues persist, consider checking the Deep-Live-Cam project's specific requirements and troubleshooting steps.
By following these steps, you should have a WSL2 Ubuntu environment with USB webcam support ready for the Deep-Live-Cam project. If you encounter any issues, refer back to the specific error messages and troubleshooting steps provided.
#### Troubleshooting CUDA Issues
If you encounter this error:
```
[ONNXRuntimeError] : 1 : FAIL : Failed to load library [libonnxruntime_providers_cuda.so](http://libonnxruntime_providers_cuda.so/) with error: libcufft.so.10: cannot open shared object file: No such file or directory
```
Follow these steps:
1. Install CUDA Toolkit 11.8 (ONNX 1.16.3 requires CUDA 11.x, not 12.x):
[https://developer.nvidia.com/cuda-11-8-0-download-archive](https://developer.nvidia.com/cuda-11-8-0-download-archive)
select: Linux, x86_64, WSL-Ubuntu, 2.0, deb (local)
2. Check CUDA version:
```bash
/usr/local/cuda/bin/nvcc --version
```
3. If the wrong version is installed, remove it completely:
[https://askubuntu.com/questions/530043/removing-nvidia-cuda-toolkit-and-installing-new-one](https://askubuntu.com/questions/530043/removing-nvidia-cuda-toolkit-and-installing-new-one)
4. Install CUDA Toolkit 11.8 again [https://developer.nvidia.com/cuda-11-8-0-download-archive](https://developer.nvidia.com/cuda-11-8-0-download-archive), select: Linux, x86_64, WSL-Ubuntu, 2.0, deb (local)
```bash
sudo apt-get -y install cuda-toolkit-11-8
```
</details>
## Want the Next Update Now?
If you want the latest and greatest build, or want to see some new great features, go to our [experimental branch](https://github.com/hacksider/Deep-Live-Cam/tree/experimental) and experience what the contributors have given.
## TODO
- [ ] Support multiple faces feature
- [ ] Develop a version for web app/service
- [ ] UI/UX enhancements for desktop app
- [ ] Speed up model loading
- [ ] Speed up real-time face swapping
*Note: This is an open-source project, and were working on it in our free time. Therefore, features, replies, bug fixes, etc., might be delayed. We hope you understand. Thanks.*
## Credits ## Credits
- [henryruhs](https://github.com/henryruhs): for being an irreplaceable contributor to the project
- [ffmpeg](https://ffmpeg.org/): for making video related operations easy - [ffmpeg](https://ffmpeg.org/): for making video related operations easy
- [deepinsight](https://github.com/deepinsight): for their [insightface](https://github.com/deepinsight/insightface) project which provided a well-made library and models. Please be reminded that the [use of the model is for non-commercial research purposes only](https://github.com/deepinsight/insightface?tab=readme-ov-file#license). - [deepinsight](https://github.com/deepinsight): for their [insightface](https://github.com/deepinsight/insightface) project which provided a well-made library and models.
- [havok2-htwo](https://github.com/havok2-htwo) : for sharing the code for webcam - [havok2-htwo](https://github.com/havok2-htwo) : for sharing the code for webcam
- [GosuDRM](https://github.com/GosuDRM/nsfw-roop) : for uncensoring roop - [GosuDRM](https://github.com/GosuDRM/nsfw-roop) : for uncensoring roop
- [vic4key](https://github.com/vic4key) : For supporting/contributing on this project - and all developers behind libraries used in this project.
- and [all developers](https://github.com/hacksider/Deep-Live-Cam/graphs/contributors) behind libraries used in this project.
- Foot Note: [This is originally roop-cam, see the full history of the code here.](https://github.com/hacksider/roop-cam) Please be informed that the base author of the code is [s0md3v](https://github.com/s0md3v/roop)

View File

@ -1,32 +0,0 @@
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from typing import Any
def find_cluster_centroids(embeddings, max_k=10) -> Any:
inertia = []
cluster_centroids = []
K = range(1, max_k+1)
for k in K:
kmeans = KMeans(n_clusters=k, random_state=0)
kmeans.fit(embeddings)
inertia.append(kmeans.inertia_)
cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_})
diffs = [inertia[i] - inertia[i+1] for i in range(len(inertia)-1)]
optimal_centroids = cluster_centroids[diffs.index(max(diffs)) + 1]['centroids']
return optimal_centroids
def find_closest_centroid(centroids: list, normed_face_embedding) -> list:
try:
centroids = np.array(centroids)
normed_face_embedding = np.array(normed_face_embedding)
similarities = np.dot(centroids, normed_face_embedding)
closest_centroid_index = np.argmax(similarities)
return closest_centroid_index, centroids[closest_centroid_index]
except ValueError:
return None

View File

@ -1,16 +1,17 @@
import os import os
import sys import sys
# single thread doubles cuda performance - needs to be set before torch import
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings import warnings
from typing import List
import platform import platform
import signal import signal
import shutil import shutil
import argparse import argparse
from typing import List
# Set environment variables for CUDA performance and TensorFlow logging
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import torch import torch
import onnxruntime import onnxruntime
import tensorflow import tensorflow
@ -19,38 +20,73 @@ import modules.globals
import modules.metadata import modules.metadata
import modules.ui as ui import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path from modules.utilities import (
has_image_extension,
if 'ROCMExecutionProvider' in modules.globals.execution_providers: is_image,
del torch is_video,
detect_fps,
create_video,
extract_frames,
get_temp_frame_paths,
restore_audio,
create_temp,
move_temp,
clean_temp,
normalize_output_path
)
# Filter warnings
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface') warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
# Cross-platform resource management
if platform.system() == 'Darwin' and 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
def parse_args() -> None: def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser() program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='select an source image', dest='source_path') program.add_argument('-s', '--source', help='Select a source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') program.add_argument('-t', '--target', help='Select a target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path') program.add_argument('-o', '--output', help='Select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+') program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor',
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False) default=['face_swapper'], choices=['face_swapper', 'face_enhancer', 'super_resolution'],
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) nargs='+')
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) program.add_argument('--keep-fps', help='Keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False) program.add_argument('--keep-audio', help='Keep original audio', dest='keep_audio', action='store_true',
program.add_argument('--nsfw-filter', help='filter the NSFW image or video', dest='nsfw_filter', action='store_true', default=False) default=True)
program.add_argument('--map-faces', help='map source target faces', dest='map_faces', action='store_true', default=False) program.add_argument('--keep-frames', help='Keep temporary frames', dest='keep_frames', action='store_true',
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9']) default=False)
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]') program.add_argument('--many-faces', help='Process every face', dest='many_faces', action='store_true',
program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame', dest='live_mirror', action='store_true', default=False) default=False)
program.add_argument('--live-resizable', help='The live camera frame is resizable', dest='live_resizable', action='store_true', default=False) program.add_argument('--video-encoder', help='Adjust output video encoder', dest='video_encoder', default='libx264',
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory()) choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+') program.add_argument('--video-quality', help='Adjust output video quality', dest='video_quality', type=int,
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads()) default=18,
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}') choices=range(52), metavar='[0-51]')
program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame',
dest='live_mirror', action='store_true', default=False)
program.add_argument('--live-resizable', help='The live camera frame is resizable',
dest='live_resizable', action='store_true', default=False)
program.add_argument('--max-memory', help='Maximum amount of RAM in GB', dest='max_memory', type=int,
default=suggest_max_memory())
program.add_argument('--execution-provider', help='Execution provider', dest='execution_provider', default=['cpu'],
choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='Number of execution threads', dest='execution_threads', type=int,
default=suggest_execution_threads())
program.add_argument('--headless', help='Run in headless mode', dest='headless', default=False, action='store_true')
program.add_argument('--enhancer-upscale-factor',
help='Sets the upscale factor for the enhancer. Only applies if `face_enhancer` is set as a frame-processor',
dest='enhancer_upscale_factor', type=int, default=1)
program.add_argument('--source-image-scaling-factor', help='Set the upscale factor for source images',
dest='source_image_scaling_factor', default=2, type=int)
program.add_argument('-r', '--super-resolution-scale-factor', dest='super_resolution_scale_factor',
help='Set the upscale factor for super resolution', default=4, choices=[2, 3, 4], type=int)
program.add_argument('-v', '--version', action='version',
version=f'{modules.metadata.name} {modules.metadata.version}')
# register deprecated args # Register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated') program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int) program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated') program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
@ -60,15 +96,14 @@ def parse_args() -> None:
modules.globals.source_path = args.source_path modules.globals.source_path = args.source_path
modules.globals.target_path = args.target_path modules.globals.target_path = args.target_path
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path) modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path,
args.output_path)
modules.globals.frame_processors = args.frame_processor modules.globals.frame_processors = args.frame_processor
modules.globals.headless = args.source_path or args.target_path or args.output_path modules.globals.headless = args.source_path or args.target_path or args.output_path
modules.globals.keep_fps = args.keep_fps modules.globals.keep_fps = args.keep_fps
modules.globals.keep_audio = args.keep_audio modules.globals.keep_audio = args.keep_audio
modules.globals.keep_frames = args.keep_frames modules.globals.keep_frames = args.keep_frames
modules.globals.many_faces = args.many_faces modules.globals.many_faces = args.many_faces
modules.globals.nsfw_filter = args.nsfw_filter
modules.globals.map_faces = args.map_faces
modules.globals.video_encoder = args.video_encoder modules.globals.video_encoder = args.video_encoder
modules.globals.video_quality = args.video_quality modules.globals.video_quality = args.video_quality
modules.globals.live_mirror = args.live_mirror modules.globals.live_mirror = args.live_mirror
@ -76,18 +111,26 @@ def parse_args() -> None:
modules.globals.max_memory = args.max_memory modules.globals.max_memory = args.max_memory
modules.globals.execution_providers = decode_execution_providers(args.execution_provider) modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
modules.globals.execution_threads = args.execution_threads modules.globals.execution_threads = args.execution_threads
modules.globals.headless = args.headless
modules.globals.enhancer_upscale_factor = args.enhancer_upscale_factor
modules.globals.source_image_scaling_factor = args.source_image_scaling_factor
modules.globals.sr_scale_factor = args.super_resolution_scale_factor
# Handle face enhancer tumbler
modules.globals.fp_ui['face_enhancer'] = 'face_enhancer' in args.frame_processor
#for ENHANCER tumbler: modules.globals.nsfw = False
if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
else:
modules.globals.fp_ui['face_enhancer'] = False
# translate deprecated args # Handle deprecated arguments
handle_deprecated_args(args)
def handle_deprecated_args(args) -> None:
"""Handle deprecated arguments by translating them to the new format."""
if args.source_path_deprecated: if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m') print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
modules.globals.source_path = args.source_path_deprecated modules.globals.source_path = args.source_path_deprecated
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path) modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path,
args.output_path)
if args.cpu_cores_deprecated: if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m') print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.cpu_cores_deprecated modules.globals.execution_threads = args.cpu_cores_deprecated
@ -98,7 +141,7 @@ def parse_args() -> None:
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m') print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['cuda']) modules.globals.execution_providers = decode_execution_providers(['cuda'])
if args.gpu_vendor_deprecated == 'amd': if args.gpu_vendor_deprecated == 'amd':
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m') print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider rocm instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['rocm']) modules.globals.execution_providers = decode_execution_providers(['rocm'])
if args.gpu_threads_deprecated: if args.gpu_threads_deprecated:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m') print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
@ -106,18 +149,22 @@ def parse_args() -> None:
def encode_execution_providers(execution_providers: List[str]) -> List[str]: def encode_execution_providers(execution_providers: List[str]) -> List[str]:
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers] return [provider.replace('ExecutionProvider', '').lower() for provider in execution_providers]
def decode_execution_providers(execution_providers: List[str]) -> List[str]: def decode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers())) available_providers = onnxruntime.get_available_providers()
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)] encoded_providers = encode_execution_providers(available_providers)
selected_providers = [available_providers[encoded_providers.index(req)] for req in execution_providers
if req in encoded_providers]
# Default to CPU if no suitable providers are found
return selected_providers if selected_providers else ['CPUExecutionProvider']
def suggest_max_memory() -> int: def suggest_max_memory() -> int:
if platform.system().lower() == 'darwin': return 4 if platform.system().lower() == 'darwin' else 16
return 4
return 16
def suggest_execution_providers() -> List[str]: def suggest_execution_providers() -> List[str]:
@ -125,34 +172,43 @@ def suggest_execution_providers() -> List[str]:
def suggest_execution_threads() -> int: def suggest_execution_threads() -> int:
if 'DmlExecutionProvider' in modules.globals.execution_providers: if 'dml' in modules.globals.execution_providers:
return 1 return 1
if 'ROCMExecutionProvider' in modules.globals.execution_providers: if 'rocm' in modules.globals.execution_providers:
return 1 return 1
return 8 return 8
def limit_resources() -> None: def limit_resources() -> None:
# prevent tensorflow memory leak # Prevent TensorFlow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU') gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus: for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True) tensorflow.config.experimental.set_memory_growth(gpu, True)
# limit memory usage
# Limit memory usage
if modules.globals.max_memory: if modules.globals.max_memory:
memory = modules.globals.max_memory * 1024 ** 3 memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin': if platform.system().lower() == 'darwin':
memory = modules.globals.max_memory * 1024 ** 6 memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'windows': elif platform.system().lower() == 'windows':
import ctypes import ctypes
kernel32 = ctypes.windll.kernel32 kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory)) kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else: else:
import resource import resource
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory)) try:
soft, hard = resource.getrlimit(resource.RLIMIT_DATA)
if memory > hard:
print(
f"Warning: Requested memory limit {memory / (1024 ** 3)} GB exceeds system's hard limit. Setting to maximum allowed {hard / (1024 ** 3)} GB.")
memory = hard
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
except ValueError as e:
print(f"Warning: Could not set memory limit: {e}. Continuing with default limits.")
def release_resources() -> None: def release_resources() -> None:
if 'CUDAExecutionProvider' in modules.globals.execution_providers: if 'cuda' in modules.globals.execution_providers:
torch.cuda.empty_cache() torch.cuda.empty_cache()
@ -163,52 +219,86 @@ def pre_check() -> bool:
if not shutil.which('ffmpeg'): if not shutil.which('ffmpeg'):
update_status('ffmpeg is not installed.') update_status('ffmpeg is not installed.')
return False return False
if 'cuda' in modules.globals.execution_providers and not torch.cuda.is_available():
update_status('CUDA is not available. Please check your GPU or CUDA installation.')
return False
return True return True
def update_status(message: str, scope: str = 'DLC.CORE') -> None: def update_status(message: str, scope: str = 'DLC.CORE') -> None:
print(f'[{scope}] {message}') print(f'[{scope}] {message}')
if not modules.globals.headless: if not modules.globals.headless and ui.status_label:
ui.update_status(message) ui.update_status(message)
def start() -> None: def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start(): if not frame_processor.pre_start():
return return
update_status('Processing...')
# process image to image # Process image to image
if has_image_extension(modules.globals.target_path): if has_image_extension(modules.globals.target_path):
if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy): process_image_to_image()
return
# Process image to video
process_image_to_video()
def process_image_to_image() -> None:
if modules.globals.nsfw:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy(to_quit=False)
update_status('Processing to image ignored!')
return return
try:
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
except Exception as e:
print("Error copying file:", str(e))
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeed!')
else:
update_status('Processing to image failed!')
return
# process image to videos
if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy):
return
if not modules.globals.map_faces: try:
update_status('Creating temp resources...') shutil.copy2(modules.globals.target_path, modules.globals.output_path)
create_temp(modules.globals.target_path) except Exception as e:
update_status('Extracting frames...') print("Error copying file:", str(e))
extract_frames(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Processing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path,
modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeeded!')
else:
update_status('Processing to image failed!')
def process_image_to_video() -> None:
if modules.globals.nsfw:
from modules.predicter import predict_video
if predict_video(modules.globals.target_path):
destroy(to_quit=False)
update_status('Processing to video ignored!')
return
update_status('Creating temporary resources...')
create_temp(modules.globals.target_path)
update_status('Extracting frames...')
extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME) update_status('Processing...', frame_processor.NAME)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths) frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
release_resources() release_resources()
# handles fps
handle_video_fps()
handle_video_audio()
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeeded!')
else:
update_status('Processing to video failed!')
def handle_video_fps() -> None:
if modules.globals.keep_fps: if modules.globals.keep_fps:
update_status('Detecting fps...') update_status('Detecting fps...')
fps = detect_fps(modules.globals.target_path) fps = detect_fps(modules.globals.target_path)
@ -217,7 +307,9 @@ def start() -> None:
else: else:
update_status('Creating video with 30.0 fps...') update_status('Creating video with 30.0 fps...')
create_video(modules.globals.target_path) create_video(modules.globals.target_path)
# handle audio
def handle_video_audio() -> None:
if modules.globals.keep_audio: if modules.globals.keep_audio:
if modules.globals.keep_fps: if modules.globals.keep_fps:
update_status('Restoring audio...') update_status('Restoring audio...')
@ -226,12 +318,6 @@ def start() -> None:
restore_audio(modules.globals.target_path, modules.globals.output_path) restore_audio(modules.globals.target_path, modules.globals.output_path)
else: else:
move_temp(modules.globals.target_path, modules.globals.output_path) move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!')
else:
update_status('Processing to video failed!')
def destroy(to_quit=True) -> None: def destroy(to_quit=True) -> None:
@ -241,15 +327,20 @@ def destroy(to_quit=True) -> None:
def run() -> None: def run() -> None:
parse_args() try:
if not pre_check(): parse_args()
return if not pre_check():
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_check():
return return
limit_resources() for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if modules.globals.headless: if not frame_processor.pre_check():
start() return
else: limit_resources()
window = ui.init(start, destroy) if modules.globals.headless:
window.mainloop() start()
else:
window = ui.init(start, destroy)
window.mainloop()
except Exception as e:
print(f"UI initialization failed: {str(e)}")
update_status(f"UI initialization failed: {str(e)}")
destroy() # Ensure any resources are cleaned up on failure

View File

@ -1,189 +1,27 @@
import os from typing import Any, Optional
import shutil
from typing import Any
import insightface import insightface
import cv2
import numpy as np
import modules.globals import modules.globals
from tqdm import tqdm
from modules.typing import Frame from modules.typing import Frame
from modules.cluster_analysis import find_cluster_centroids, find_closest_centroid
from modules.utilities import get_temp_directory_path, create_temp, extract_frames, clean_temp, get_temp_frame_paths
from pathlib import Path
FACE_ANALYSER = None FACE_ANALYSER: Optional[insightface.app.FaceAnalysis] = None
def get_face_analyser() -> insightface.app.FaceAnalysis:
def get_face_analyser() -> Any:
global FACE_ANALYSER global FACE_ANALYSER
if FACE_ANALYSER is None: if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=modules.globals.execution_providers) FACE_ANALYSER = insightface.app.FaceAnalysis(
name='buffalo_l',
providers=modules.globals.execution_providers
)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640)) FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
return FACE_ANALYSER return FACE_ANALYSER
def get_one_face(frame: Frame) -> Optional[Any]:
faces = get_face_analyser().get(frame)
return min(faces, key=lambda x: x.bbox[0], default=None)
def get_one_face(frame: Frame) -> Any: def get_many_faces(frame: Frame) -> Optional[Any]:
face = get_face_analyser().get(frame) faces = get_face_analyser().get(frame)
try: return faces if faces else None
return min(face, key=lambda x: x.bbox[0])
except ValueError:
return None
def get_many_faces(frame: Frame) -> Any:
try:
return get_face_analyser().get(frame)
except IndexError:
return None
def has_valid_map() -> bool:
for map in modules.globals.souce_target_map:
if "source" in map and "target" in map:
return True
return False
def default_source_face() -> Any:
for map in modules.globals.souce_target_map:
if "source" in map:
return map['source']['face']
return None
def simplify_maps() -> Any:
centroids = []
faces = []
for map in modules.globals.souce_target_map:
if "source" in map and "target" in map:
centroids.append(map['target']['face'].normed_embedding)
faces.append(map['source']['face'])
modules.globals.simple_map = {'source_faces': faces, 'target_embeddings': centroids}
return None
def add_blank_map() -> Any:
try:
max_id = -1
if len(modules.globals.souce_target_map) > 0:
max_id = max(modules.globals.souce_target_map, key=lambda x: x['id'])['id']
modules.globals.souce_target_map.append({
'id' : max_id + 1
})
except ValueError:
return None
def get_unique_faces_from_target_image() -> Any:
try:
modules.globals.souce_target_map = []
target_frame = cv2.imread(modules.globals.target_path)
many_faces = get_many_faces(target_frame)
i = 0
for face in many_faces:
x_min, y_min, x_max, y_max = face['bbox']
modules.globals.souce_target_map.append({
'id' : i,
'target' : {
'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : face
}
})
i = i + 1
except ValueError:
return None
def get_unique_faces_from_target_video() -> Any:
try:
modules.globals.souce_target_map = []
frame_face_embeddings = []
face_embeddings = []
print('Creating temp resources...')
clean_temp(modules.globals.target_path)
create_temp(modules.globals.target_path)
print('Extracting frames...')
extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
i = 0
for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"):
temp_frame = cv2.imread(temp_frame_path)
many_faces = get_many_faces(temp_frame)
for face in many_faces:
face_embeddings.append(face.normed_embedding)
frame_face_embeddings.append({'frame': i, 'faces': many_faces, 'location': temp_frame_path})
i += 1
centroids = find_cluster_centroids(face_embeddings)
for frame in frame_face_embeddings:
for face in frame['faces']:
closest_centroid_index, _ = find_closest_centroid(centroids, face.normed_embedding)
face['target_centroid'] = closest_centroid_index
for i in range(len(centroids)):
modules.globals.souce_target_map.append({
'id' : i
})
temp = []
for frame in tqdm(frame_face_embeddings, desc=f"Mapping frame embeddings to centroids-{i}"):
temp.append({'frame': frame['frame'], 'faces': [face for face in frame['faces'] if face['target_centroid'] == i], 'location': frame['location']})
modules.globals.souce_target_map[i]['target_faces_in_frame'] = temp
# dump_faces(centroids, frame_face_embeddings)
default_target_face()
except ValueError:
return None
def default_target_face():
for map in modules.globals.souce_target_map:
best_face = None
best_frame = None
for frame in map['target_faces_in_frame']:
if len(frame['faces']) > 0:
best_face = frame['faces'][0]
best_frame = frame
break
for frame in map['target_faces_in_frame']:
for face in frame['faces']:
if face['det_score'] > best_face['det_score']:
best_face = face
best_frame = frame
x_min, y_min, x_max, y_max = best_face['bbox']
target_frame = cv2.imread(best_frame['location'])
map['target'] = {
'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : best_face
}
def dump_faces(centroids: Any, frame_face_embeddings: list):
temp_directory_path = get_temp_directory_path(modules.globals.target_path)
for i in range(len(centroids)):
if os.path.exists(temp_directory_path + f"/{i}") and os.path.isdir(temp_directory_path + f"/{i}"):
shutil.rmtree(temp_directory_path + f"/{i}")
Path(temp_directory_path + f"/{i}").mkdir(parents=True, exist_ok=True)
for frame in tqdm(frame_face_embeddings, desc=f"Copying faces to temp/./{i}"):
temp_frame = cv2.imread(frame['location'])
j = 0
for face in frame['faces']:
if face['target_centroid'] == i:
x_min, y_min, x_max, y_max = face['bbox']
if temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)].size > 0:
cv2.imwrite(temp_directory_path + f"/{i}/{frame['frame']}_{j}.png", temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)])
j += 1

View File

@ -1,5 +1,5 @@
import os import os
from typing import List, Dict, Any from typing import List, Dict
ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
WORKFLOW_DIR = os.path.join(ROOT_DIR, 'workflow') WORKFLOW_DIR = os.path.join(ROOT_DIR, 'workflow')
@ -9,9 +9,6 @@ file_types = [
('Video', ('*.mp4','*.mkv')) ('Video', ('*.mp4','*.mkv'))
] ]
souce_target_map = []
simple_map = {}
source_path = None source_path = None
target_path = None target_path = None
output_path = None output_path = None
@ -20,9 +17,6 @@ keep_fps = None
keep_audio = None keep_audio = None
keep_frames = None keep_frames = None
many_faces = None many_faces = None
map_faces = None
color_correction = None # New global variable for color correction toggle
nsfw_filter = None
video_encoder = None video_encoder = None
video_quality = None video_quality = None
live_mirror = None live_mirror = None
@ -33,5 +27,9 @@ execution_threads = None
headless = None headless = None
log_level = 'error' log_level = 'error'
fp_ui: Dict[str, bool] = {} fp_ui: Dict[str, bool] = {}
nsfw = None
camera_input_combobox = None camera_input_combobox = None
webcam_preview_running = False webcam_preview_running = False
enhancer_upscale_factor = 1
source_image_scaling_factor = 2
sr_scale_factor = 4

View File

@ -2,58 +2,68 @@ from typing import Any, List
import cv2 import cv2
import insightface import insightface
import threading import threading
import os
import modules.globals import modules.globals
import modules.processors.frame.core import modules.processors.frame.core
from modules.core import update_status from modules.core import update_status
from modules.face_analyser import get_one_face, get_many_faces, default_source_face from modules.face_analyser import get_one_face, get_many_faces
from modules.typing import Face, Frame from modules.typing import Face, Frame
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
from modules.cluster_analysis import find_closest_centroid import numpy as np
FACE_SWAPPER = None FACE_SWAPPER = None
THREAD_LOCK = threading.Lock() THREAD_LOCK = threading.Lock()
NAME = 'DLC.FACE-SWAPPER' NAME = 'DLC.FACE-SWAPPER'
def pre_check() -> bool: def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models') download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx']) conditional_download(download_directory_path, [
'https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128.onnx'
])
return True return True
def pre_start() -> bool: def pre_start() -> bool:
if not modules.globals.map_faces and not is_image(modules.globals.source_path): if not is_image(modules.globals.source_path):
update_status('Select an image for source path.', NAME) update_status('Select an image for source path.', NAME)
return False return False
elif not modules.globals.map_faces and not get_one_face(cv2.imread(modules.globals.source_path)): elif not get_one_face(cv2.imread(modules.globals.source_path)):
update_status('No face in source path detected.', NAME) update_status('No face detected in the source path.', NAME)
return False return False
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path): if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status('Select an image or video for target path.', NAME) update_status('Select an image or video for target path.', NAME)
return False return False
return True return True
def get_face_swapper() -> Any: def get_face_swapper() -> Any:
global FACE_SWAPPER global FACE_SWAPPER
with THREAD_LOCK: with THREAD_LOCK:
if FACE_SWAPPER is None: if FACE_SWAPPER is None:
model_path = resolve_relative_path('../models/inswapper_128_fp16.onnx') model_path = resolve_relative_path('../models/inswapper_128.onnx')
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=modules.globals.execution_providers) FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=modules.globals.execution_providers)
return FACE_SWAPPER return FACE_SWAPPER
def upscale_image(image: np.ndarray, scaling_factor: int = modules.globals.source_image_scaling_factor) -> np.ndarray:
"""
Upscales the given image by the specified scaling factor.
Args:
image (np.ndarray): The input image to upscale.
scaling_factor (int): The factor by which to upscale the image.
Returns:
np.ndarray: The upscaled image.
"""
height, width = image.shape[:2]
new_size = (width * scaling_factor, height * scaling_factor)
upscaled_image = cv2.resize(image, new_size, interpolation=cv2.INTER_CUBIC)
return upscaled_image
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True) return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
def process_frame(source_face: Face, temp_frame: Frame) -> Frame: def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
# Ensure the frame is in RGB format if color correction is enabled
if modules.globals.color_correction:
temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
if modules.globals.many_faces: if modules.globals.many_faces:
many_faces = get_many_faces(temp_frame) many_faces = get_many_faces(temp_frame)
if many_faces: if many_faces:
@ -65,99 +75,30 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
temp_frame = swap_face(source_face, target_face, temp_frame) temp_frame = swap_face(source_face, target_face, temp_frame)
return temp_frame return temp_frame
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
if is_image(modules.globals.target_path):
if modules.globals.many_faces:
source_face = default_source_face()
for map in modules.globals.souce_target_map:
target_face = map['target']['face']
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces:
for map in modules.globals.souce_target_map:
if "source" in map:
source_face = map['source']['face']
target_face = map['target']['face']
temp_frame = swap_face(source_face, target_face, temp_frame)
elif is_video(modules.globals.target_path):
if modules.globals.many_faces:
source_face = default_source_face()
for map in modules.globals.souce_target_map:
target_frame = [f for f in map['target_faces_in_frame'] if f['location'] == temp_frame_path]
for frame in target_frame:
for target_face in frame['faces']:
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces:
for map in modules.globals.souce_target_map:
if "source" in map:
target_frame = [f for f in map['target_faces_in_frame'] if f['location'] == temp_frame_path]
source_face = map['source']['face']
for frame in target_frame:
for target_face in frame['faces']:
temp_frame = swap_face(source_face, target_face, temp_frame)
else:
many_faces = get_many_faces(temp_frame)
if modules.globals.many_faces:
source_face = default_source_face()
if many_faces:
for target_face in many_faces:
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces:
if many_faces:
for target_face in many_faces:
closest_centroid_index, _ = find_closest_centroid(modules.globals.simple_map['target_embeddings'], target_face.normed_embedding)
temp_frame = swap_face(modules.globals.simple_map['source_faces'][closest_centroid_index], target_face, temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None: def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
if not modules.globals.map_faces: source_image = cv2.imread(source_path)
source_face = get_one_face(cv2.imread(source_path)) if source_image is None:
for temp_frame_path in temp_frame_paths: print(f"Failed to load source image from {source_path}")
temp_frame = cv2.imread(temp_frame_path) return
try: # Upscale the source image for better quality
result = process_frame(source_face, temp_frame) source_image_upscaled = upscale_image(source_image, scaling_factor=2)
cv2.imwrite(temp_frame_path, result) source_face = get_one_face(source_image_upscaled)
except Exception as exception:
print(exception)
pass
if progress:
progress.update(1)
else:
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
try:
result = process_frame_v2(temp_frame, temp_frame_path)
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
print(exception)
pass
if progress:
progress.update(1)
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
try:
result = process_frame(source_face, temp_frame)
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
print(f"Error processing frame {temp_frame_path}: {exception}")
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None: def process_image(source_path: str, target_path: str, output_path: str) -> None:
if not modules.globals.map_faces: source_face = get_one_face(cv2.imread(source_path))
source_face = get_one_face(cv2.imread(source_path)) target_frame = cv2.imread(target_path)
target_frame = cv2.imread(target_path) result = process_frame(source_face, target_frame)
result = process_frame(source_face, target_frame) cv2.imwrite(output_path, result)
cv2.imwrite(output_path, result)
else:
if modules.globals.many_faces:
update_status('Many faces enabled. Using first source image. Progressing...', NAME)
target_frame = cv2.imread(output_path)
result = process_frame_v2(target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None: def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
if modules.globals.map_faces and modules.globals.many_faces:
update_status('Many faces enabled. Using first source image. Progressing...', NAME)
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames) modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)

View File

@ -19,7 +19,7 @@ if platform.system() == 'Windows' or platform.system() == 'Linux': # Windows or
import modules.globals import modules.globals
import modules.metadata import modules.metadata
from modules.face_analyser import get_one_face, get_unique_faces_from_target_image, get_unique_faces_from_target_video, add_blank_map, has_valid_map, simplify_maps from modules.face_analyser import get_one_face
from modules.capturer import get_video_frame, get_video_frame_total from modules.capturer import get_video_frame, get_video_frame_total
from modules.processors.frame.core import get_frame_processors_modules from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import is_image, is_video, resolve_relative_path from modules.utilities import is_image, is_video, resolve_relative_path
@ -34,22 +34,6 @@ PREVIEW_MAX_WIDTH = 1200
PREVIEW_DEFAULT_WIDTH = 960 PREVIEW_DEFAULT_WIDTH = 960
PREVIEW_DEFAULT_HEIGHT = 540 PREVIEW_DEFAULT_HEIGHT = 540
POPUP_WIDTH = 750
POPUP_HEIGHT = 810
POPUP_SCROLL_WIDTH = 740,
POPUP_SCROLL_HEIGHT = 700
POPUP_LIVE_WIDTH = 900
POPUP_LIVE_HEIGHT = 820
POPUP_LIVE_SCROLL_WIDTH = 890,
POPUP_LIVE_SCROLL_HEIGHT = 700
MAPPER_PREVIEW_MAX_HEIGHT = 100
MAPPER_PREVIEW_MAX_WIDTH = 100
DEFAULT_BUTTON_WIDTH = 200
DEFAULT_BUTTON_HEIGHT = 40
RECENT_DIRECTORY_SOURCE = None RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None RECENT_DIRECTORY_TARGET = None
RECENT_DIRECTORY_OUTPUT = None RECENT_DIRECTORY_OUTPUT = None
@ -59,11 +43,6 @@ preview_slider = None
source_label = None source_label = None
target_label = None target_label = None
status_label = None status_label = None
popup_status_label = None
popup_status_label_live = None
source_label_dict = {}
source_label_dict_live = {}
target_label_dict_live = {}
img_ft, vid_ft = modules.globals.file_types img_ft, vid_ft = modules.globals.file_types
@ -169,12 +148,8 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get())) nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
nsfw_switch.place(relx=0.6, rely=0.6125) nsfw_switch.place(relx=0.6, rely=0.6125)
map_faces = ctk.BooleanVar(value=modules.globals.map_faces) start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
map_faces_switch = ctk.CTkSwitch(root, text='Map faces', variable=map_faces, cursor='hand2', command=lambda: setattr(modules.globals, 'map_faces', map_faces.get())) start_button.place(relx=0.15, rely=0.7, relwidth=0.2, relheight=0.05)
map_faces_switch.place(relx=0.1, rely=0.75)
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: analyze_target(start, root))
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy) stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy)
stop_button.place(relx=0.4, rely=0.7, relwidth=0.2, relheight=0.05) stop_button.place(relx=0.4, rely=0.7, relwidth=0.2, relheight=0.05)
@ -182,8 +157,22 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview) preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview)
preview_button.place(relx=0.65, rely=0.7, relwidth=0.2, relheight=0.05) preview_button.place(relx=0.65, rely=0.7, relwidth=0.2, relheight=0.05)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(root)) camera_label = ctk.CTkLabel(root, text="Select Camera:")
live_button.place(relx=0.40, rely=0.86, relwidth=0.2, relheight=0.05) camera_label.place(relx=0.4, rely=0.7525, relwidth=0.2, relheight=0.05)
available_cameras = get_available_cameras()
available_camera_strings = [str(cam) for cam in available_cameras]
camera_variable = ctk.StringVar(value=available_camera_strings[0] if available_camera_strings else "No cameras found")
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, values=available_camera_strings)
camera_optionmenu.place(relx=0.65, rely=0.7525, relwidth=0.2, relheight=0.05)
virtual_cam_out_value = ctk.BooleanVar(value=False)
virtual_cam_out_switch = ctk.CTkSwitch(root, text='Virtual Cam Output (OBS)', variable=virtual_cam_out_value, cursor='hand2')
virtual_cam_out_switch.place(relx=0.4, rely=0.805)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(camera_variable.get(), virtual_cam_out_value.get()))
live_button.place(relx=0.15, rely=0.7525, relwidth=0.2, relheight=0.05)
status_label = ctk.CTkLabel(root, text=None, justify='center') status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, relwidth=0.8, rely=0.875) status_label.place(relx=0.1, relwidth=0.8, rely=0.875)
@ -195,109 +184,6 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
return root return root
def analyze_target(start: Callable[[], None], root: ctk.CTk):
if POPUP != None and POPUP.winfo_exists():
update_status("Please complete pop-up or close it.")
return
if modules.globals.map_faces:
modules.globals.souce_target_map = []
if is_image(modules.globals.target_path):
update_status('Getting unique faces')
get_unique_faces_from_target_image()
elif is_video(modules.globals.target_path):
update_status('Getting unique faces')
get_unique_faces_from_target_video()
if len(modules.globals.souce_target_map) > 0:
create_source_target_popup(start, root, modules.globals.souce_target_map)
else:
update_status("No faces found in target")
else:
select_output_path(start)
def create_source_target_popup(start: Callable[[], None], root: ctk.CTk, map: list) -> None:
global POPUP, popup_status_label
POPUP = ctk.CTkToplevel(root)
POPUP.title("Source x Target Mapper")
POPUP.geometry(f"{POPUP_WIDTH}x{POPUP_HEIGHT}")
POPUP.focus()
def on_submit_click(start):
if has_valid_map():
POPUP.destroy()
select_output_path(start)
else:
update_pop_status("Atleast 1 source with target is required!")
scrollable_frame = ctk.CTkScrollableFrame(POPUP, width=POPUP_SCROLL_WIDTH, height=POPUP_SCROLL_HEIGHT)
scrollable_frame.grid(row=0, column=0, padx=0, pady=0, sticky='nsew')
def on_button_click(map, button_num):
map = update_popup_source(scrollable_frame, map, button_num)
for item in map:
id = item['id']
button = ctk.CTkButton(scrollable_frame, text="Select source image", command=lambda id=id: on_button_click(map, id), width=DEFAULT_BUTTON_WIDTH, height=DEFAULT_BUTTON_HEIGHT)
button.grid(row=id, column=0, padx=50, pady=10)
x_label = ctk.CTkLabel(scrollable_frame, text=f"X", width=MAPPER_PREVIEW_MAX_WIDTH, height=MAPPER_PREVIEW_MAX_HEIGHT)
x_label.grid(row=id, column=2, padx=10, pady=10)
image = Image.fromarray(cv2.cvtColor(item['target']['cv2'], cv2.COLOR_BGR2RGB))
image = image.resize((MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS)
tk_image = ctk.CTkImage(image, size=image.size)
target_image = ctk.CTkLabel(scrollable_frame, text=f"T-{id}", width=MAPPER_PREVIEW_MAX_WIDTH, height=MAPPER_PREVIEW_MAX_HEIGHT)
target_image.grid(row=id, column=3, padx=10, pady=10)
target_image.configure(image=tk_image)
popup_status_label = ctk.CTkLabel(POPUP, text=None, justify='center')
popup_status_label.grid(row=1, column=0, pady=15)
close_button = ctk.CTkButton(POPUP, text="Submit", command=lambda: on_submit_click(start))
close_button.grid(row=2, column=0, pady=10)
def update_popup_source(scrollable_frame: ctk.CTkScrollableFrame, map: list, button_num: int) -> list:
global source_label_dict
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
if "source" in map[button_num]:
map[button_num].pop("source")
source_label_dict[button_num].destroy()
del source_label_dict[button_num]
if source_path == "":
return map
else:
cv2_img = cv2.imread(source_path)
face = get_one_face(cv2_img)
if face:
x_min, y_min, x_max, y_max = face['bbox']
map[button_num]['source'] = {
'cv2' : cv2_img[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : face
}
image = Image.fromarray(cv2.cvtColor(map[button_num]['source']['cv2'], cv2.COLOR_BGR2RGB))
image = image.resize((MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS)
tk_image = ctk.CTkImage(image, size=image.size)
source_image = ctk.CTkLabel(scrollable_frame, text=f"S-{button_num}", width=MAPPER_PREVIEW_MAX_WIDTH, height=MAPPER_PREVIEW_MAX_HEIGHT)
source_image.grid(row=button_num, column=1, padx=10, pady=10)
source_image.configure(image=tk_image)
source_label_dict[button_num] = source_image
else:
update_pop_status("Face could not be detected in last upload!")
return map
def create_preview(parent: ctk.CTk) -> ctk.CTkToplevel: def create_preview(parent: ctk.CTk) -> ctk.CTkToplevel:
global preview_label, preview_slider global preview_label, preview_slider
@ -320,11 +206,6 @@ def update_status(text: str) -> None:
status_label.configure(text=text) status_label.configure(text=text)
ROOT.update() ROOT.update()
def update_pop_status(text: str) -> None:
popup_status_label.configure(text=text)
def update_pop_live_status(text: str) -> None:
popup_status_label_live.configure(text=text)
def update_tumbler(var: str, value: bool) -> None: def update_tumbler(var: str, value: bool) -> None:
modules.globals.fp_ui[var] = value modules.globals.fp_ui[var] = value
@ -513,75 +394,10 @@ def fit_image_to_size(image, width: int, height: int):
new_size = (int(ratio * w), int(ratio * h)) new_size = (int(ratio * w), int(ratio * h))
return cv2.resize(image, dsize=new_size) return cv2.resize(image, dsize=new_size)
def webcam_preview(camera_name: str, virtual_cam_output: bool):
if modules.globals.source_path is None:
return
def render_image_preview(image_path: str, size: Tuple[int, int]) -> ctk.CTkImage:
image = Image.open(image_path)
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: int = 0) -> ctk.CTkImage:
capture = cv2.VideoCapture(video_path)
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
capture.release()
cv2.destroyAllWindows()
def toggle_preview() -> None:
if PREVIEW.state() == 'normal':
PREVIEW.withdraw()
elif modules.globals.source_path and modules.globals.target_path:
init_preview()
update_preview()
def init_preview() -> None:
if is_image(modules.globals.target_path):
preview_slider.pack_forget()
if is_video(modules.globals.target_path):
video_frame_total = get_video_frame_total(modules.globals.target_path)
preview_slider.configure(to=video_frame_total)
preview_slider.pack(fill='x')
preview_slider.set(0)
def update_preview(frame_number: int = 0) -> None:
if modules.globals.source_path and modules.globals.target_path:
update_status('Processing...')
temp_frame = get_video_frame(modules.globals.target_path, frame_number)
if modules.globals.nsfw_filter and check_and_ignore_nsfw(temp_frame):
return
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
temp_frame = frame_processor.process_frame(
get_one_face(cv2.imread(modules.globals.source_path)),
temp_frame
)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
update_status('Processing succeed!')
PREVIEW.deiconify()
def webcam_preview(root: ctk.CTk):
if not modules.globals.map_faces:
if modules.globals.source_path is None:
# No image selected
return
create_webcam_preview()
else:
modules.globals.souce_target_map = []
create_source_target_popup_for_webcam(root, modules.globals.souce_target_map)
def create_webcam_preview():
global preview_label, PREVIEW global preview_label, PREVIEW
WIDTH = 960 WIDTH = 960
@ -624,7 +440,11 @@ def create_webcam_preview():
while preview_running: while preview_running:
preview_running = webcam_preview_loop(camera, source_image, frame_processors, virtual_cam) preview_running = webcam_preview_loop(camera, source_image, frame_processors, virtual_cam)
temp_frame = frame.copy() #Create a copy of the frame while preview_running:
preview_running = webcam_preview_loop(camera, source_image, frame_processors)
if camera: camera.release()
PREVIEW.withdraw()
def get_camera_index_by_name(camera_name: str) -> int: def get_camera_index_by_name(camera_name: str) -> int:
@ -639,18 +459,6 @@ def get_camera_index_by_name(camera_name: str) -> int:
return get_available_cameras().index(camera_name) return get_available_cameras().index(camera_name)
return -1 return -1
if not modules.globals.map_faces:
# Select and save face image only once
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
else:
modules.globals.target_path = None
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame_v2(temp_frame)
def get_available_cameras(): def get_available_cameras():
"""Get available camera names (cross-platform).""" """Get available camera names (cross-platform)."""
@ -682,156 +490,5 @@ def get_available_cameras():
cap.release() cap.release()
index += 1 index += 1
if PREVIEW.state() == 'withdrawn': available_cameras = devices
break return available_cameras
camera.release()
PREVIEW.withdraw() # Close preview window when loop is finished
def create_source_target_popup_for_webcam(root: ctk.CTk, map: list) -> None:
global POPUP_LIVE, popup_status_label_live
POPUP_LIVE = ctk.CTkToplevel(root)
POPUP_LIVE.title("Source x Target Mapper")
POPUP_LIVE.geometry(f"{POPUP_LIVE_WIDTH}x{POPUP_LIVE_HEIGHT}")
POPUP_LIVE.focus()
def on_submit_click():
if has_valid_map():
POPUP_LIVE.destroy()
simplify_maps()
create_webcam_preview()
else:
update_pop_live_status("Atleast 1 source with target is required!")
def on_add_click():
add_blank_map()
refresh_data(map)
update_pop_live_status("Please provide mapping!")
popup_status_label_live = ctk.CTkLabel(POPUP_LIVE, text=None, justify='center')
popup_status_label_live.grid(row=1, column=0, pady=15)
add_button = ctk.CTkButton(POPUP_LIVE, text="Add", command=lambda: on_add_click())
add_button.place(relx=0.2, rely=0.92, relwidth=0.2, relheight=0.05)
close_button = ctk.CTkButton(POPUP_LIVE, text="Submit", command=lambda: on_submit_click())
close_button.place(relx=0.6, rely=0.92, relwidth=0.2, relheight=0.05)
def refresh_data(map: list):
global POPUP_LIVE
scrollable_frame = ctk.CTkScrollableFrame(POPUP_LIVE, width=POPUP_LIVE_SCROLL_WIDTH, height=POPUP_LIVE_SCROLL_HEIGHT)
scrollable_frame.grid(row=0, column=0, padx=0, pady=0, sticky='nsew')
def on_sbutton_click(map, button_num):
map = update_webcam_source(scrollable_frame, map, button_num)
def on_tbutton_click(map, button_num):
map = update_webcam_target(scrollable_frame, map, button_num)
for item in map:
id = item['id']
button = ctk.CTkButton(scrollable_frame, text="Select source image", command=lambda id=id: on_sbutton_click(map, id), width=DEFAULT_BUTTON_WIDTH, height=DEFAULT_BUTTON_HEIGHT)
button.grid(row=id, column=0, padx=30, pady=10)
x_label = ctk.CTkLabel(scrollable_frame, text=f"X", width=MAPPER_PREVIEW_MAX_WIDTH, height=MAPPER_PREVIEW_MAX_HEIGHT)
x_label.grid(row=id, column=2, padx=10, pady=10)
button = ctk.CTkButton(scrollable_frame, text="Select target image", command=lambda id=id: on_tbutton_click(map, id), width=DEFAULT_BUTTON_WIDTH, height=DEFAULT_BUTTON_HEIGHT)
button.grid(row=id, column=3, padx=20, pady=10)
if "source" in item:
image = Image.fromarray(cv2.cvtColor(item['source']['cv2'], cv2.COLOR_BGR2RGB))
image = image.resize((MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS)
tk_image = ctk.CTkImage(image, size=image.size)
source_image = ctk.CTkLabel(scrollable_frame, text=f"S-{id}", width=MAPPER_PREVIEW_MAX_WIDTH, height=MAPPER_PREVIEW_MAX_HEIGHT)
source_image.grid(row=id, column=1, padx=10, pady=10)
source_image.configure(image=tk_image)
if "target" in item:
image = Image.fromarray(cv2.cvtColor(item['target']['cv2'], cv2.COLOR_BGR2RGB))
image = image.resize((MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS)
tk_image = ctk.CTkImage(image, size=image.size)
target_image = ctk.CTkLabel(scrollable_frame, text=f"T-{id}", width=MAPPER_PREVIEW_MAX_WIDTH, height=MAPPER_PREVIEW_MAX_HEIGHT)
target_image.grid(row=id, column=4, padx=20, pady=10)
target_image.configure(image=tk_image)
def update_webcam_source(scrollable_frame: ctk.CTkScrollableFrame, map: list, button_num: int) -> list:
global source_label_dict_live
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
if "source" in map[button_num]:
map[button_num].pop("source")
source_label_dict_live[button_num].destroy()
del source_label_dict_live[button_num]
if source_path == "":
return map
else:
cv2_img = cv2.imread(source_path)
face = get_one_face(cv2_img)
if face:
x_min, y_min, x_max, y_max = face['bbox']
map[button_num]['source'] = {
'cv2' : cv2_img[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : face
}
image = Image.fromarray(cv2.cvtColor(map[button_num]['source']['cv2'], cv2.COLOR_BGR2RGB))
image = image.resize((MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS)
tk_image = ctk.CTkImage(image, size=image.size)
source_image = ctk.CTkLabel(scrollable_frame, text=f"S-{button_num}", width=MAPPER_PREVIEW_MAX_WIDTH, height=MAPPER_PREVIEW_MAX_HEIGHT)
source_image.grid(row=button_num, column=1, padx=10, pady=10)
source_image.configure(image=tk_image)
source_label_dict_live[button_num] = source_image
else:
update_pop_live_status("Face could not be detected in last upload!")
return map
def update_webcam_target(scrollable_frame: ctk.CTkScrollableFrame, map: list, button_num: int) -> list:
global target_label_dict_live
target_path = ctk.filedialog.askopenfilename(title='select an target image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
if "target" in map[button_num]:
map[button_num].pop("target")
target_label_dict_live[button_num].destroy()
del target_label_dict_live[button_num]
if target_path == "":
return map
else:
cv2_img = cv2.imread(target_path)
face = get_one_face(cv2_img)
if face:
x_min, y_min, x_max, y_max = face['bbox']
map[button_num]['target'] = {
'cv2' : cv2_img[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : face
}
image = Image.fromarray(cv2.cvtColor(map[button_num]['target']['cv2'], cv2.COLOR_BGR2RGB))
image = image.resize((MAPPER_PREVIEW_MAX_WIDTH, MAPPER_PREVIEW_MAX_HEIGHT), Image.LANCZOS)
tk_image = ctk.CTkImage(image, size=image.size)
target_image = ctk.CTkLabel(scrollable_frame, text=f"T-{button_num}", width=MAPPER_PREVIEW_MAX_WIDTH, height=MAPPER_PREVIEW_MAX_HEIGHT)
target_image.grid(row=button_num, column=4, padx=20, pady=10)
target_image.configure(image=tk_image)
target_label_dict_live[button_num] = target_image
else:
update_pop_live_status("Face could not be detected in last upload!")
return map