Adding Pygrabber as Cam manager
parent
7fb6b54c0b
commit
c72582506d
|
@ -26,7 +26,7 @@ nsfw_filter = False
|
||||||
video_encoder = None
|
video_encoder = None
|
||||||
video_quality = None
|
video_quality = None
|
||||||
live_mirror = False
|
live_mirror = False
|
||||||
live_resizable = False
|
live_resizable = True
|
||||||
max_memory = None
|
max_memory = None
|
||||||
execution_providers: List[str] = []
|
execution_providers: List[str] = []
|
||||||
execution_threads = None
|
execution_threads = None
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
name = 'Deep Live Cam'
|
name = 'Deep Live Cam'
|
||||||
version = '1.7.0'
|
version = '1.7.5'
|
||||||
edition = 'Portable'
|
edition = 'Portable'
|
||||||
|
|
|
@ -21,7 +21,10 @@ THREAD_LOCK = threading.Lock()
|
||||||
NAME = "DLC.FACE-SWAPPER"
|
NAME = "DLC.FACE-SWAPPER"
|
||||||
|
|
||||||
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
models_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), 'models')
|
models_dir = os.path.join(
|
||||||
|
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def pre_check() -> bool:
|
def pre_check() -> bool:
|
||||||
download_directory_path = abs_dir
|
download_directory_path = abs_dir
|
||||||
|
@ -56,7 +59,7 @@ def get_face_swapper() -> Any:
|
||||||
|
|
||||||
with THREAD_LOCK:
|
with THREAD_LOCK:
|
||||||
if FACE_SWAPPER is None:
|
if FACE_SWAPPER is None:
|
||||||
model_path = os.path.join(models_dir, 'inswapper_128_fp16.onnx')
|
model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
|
||||||
FACE_SWAPPER = insightface.model_zoo.get_model(
|
FACE_SWAPPER = insightface.model_zoo.get_model(
|
||||||
model_path, providers=modules.globals.execution_providers
|
model_path, providers=modules.globals.execution_providers
|
||||||
)
|
)
|
||||||
|
|
114
modules/ui.py
114
modules/ui.py
|
@ -7,7 +7,7 @@ from cv2_enumerate_cameras import enumerate_cameras # Add this import
|
||||||
from PIL import Image, ImageOps
|
from PIL import Image, ImageOps
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
|
from pygrabber.dshow_graph import FilterGraph
|
||||||
import modules.globals
|
import modules.globals
|
||||||
import modules.metadata
|
import modules.metadata
|
||||||
from modules.face_analyser import (
|
from modules.face_analyser import (
|
||||||
|
@ -26,6 +26,7 @@ from modules.utilities import (
|
||||||
resolve_relative_path,
|
resolve_relative_path,
|
||||||
has_image_extension,
|
has_image_extension,
|
||||||
)
|
)
|
||||||
|
from modules.video_capture import VideoCapturer
|
||||||
|
|
||||||
ROOT = None
|
ROOT = None
|
||||||
POPUP = None
|
POPUP = None
|
||||||
|
@ -96,7 +97,7 @@ def save_switch_states():
|
||||||
"fp_ui": modules.globals.fp_ui,
|
"fp_ui": modules.globals.fp_ui,
|
||||||
"show_fps": modules.globals.show_fps,
|
"show_fps": modules.globals.show_fps,
|
||||||
"mouth_mask": modules.globals.mouth_mask,
|
"mouth_mask": modules.globals.mouth_mask,
|
||||||
"show_mouth_mask_box": modules.globals.show_mouth_mask_box
|
"show_mouth_mask_box": modules.globals.show_mouth_mask_box,
|
||||||
}
|
}
|
||||||
with open("switch_states.json", "w") as f:
|
with open("switch_states.json", "w") as f:
|
||||||
json.dump(switch_states, f)
|
json.dump(switch_states, f)
|
||||||
|
@ -118,7 +119,9 @@ def load_switch_states():
|
||||||
modules.globals.fp_ui = switch_states.get("fp_ui", {"face_enhancer": False})
|
modules.globals.fp_ui = switch_states.get("fp_ui", {"face_enhancer": False})
|
||||||
modules.globals.show_fps = switch_states.get("show_fps", False)
|
modules.globals.show_fps = switch_states.get("show_fps", False)
|
||||||
modules.globals.mouth_mask = switch_states.get("mouth_mask", False)
|
modules.globals.mouth_mask = switch_states.get("mouth_mask", False)
|
||||||
modules.globals.show_mouth_mask_box = switch_states.get("show_mouth_mask_box", False)
|
modules.globals.show_mouth_mask_box = switch_states.get(
|
||||||
|
"show_mouth_mask_box", False
|
||||||
|
)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
# If the file doesn't exist, use default values
|
# If the file doesn't exist, use default values
|
||||||
pass
|
pass
|
||||||
|
@ -315,18 +318,22 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||||
camera_label.place(relx=0.1, rely=0.86, relwidth=0.2, relheight=0.05)
|
camera_label.place(relx=0.1, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||||
|
|
||||||
available_cameras = get_available_cameras()
|
available_cameras = get_available_cameras()
|
||||||
# Convert camera indices to strings for CTkOptionMenu
|
camera_indices, camera_names = available_cameras
|
||||||
available_camera_indices, available_camera_strings = available_cameras
|
|
||||||
camera_variable = ctk.StringVar(
|
if not camera_names or camera_names[0] == "No cameras found":
|
||||||
value=(
|
camera_variable = ctk.StringVar(value="No cameras found")
|
||||||
available_camera_strings[0]
|
camera_optionmenu = ctk.CTkOptionMenu(
|
||||||
if available_camera_strings
|
root,
|
||||||
else "No cameras found"
|
variable=camera_variable,
|
||||||
|
values=["No cameras found"],
|
||||||
|
state="disabled",
|
||||||
)
|
)
|
||||||
)
|
else:
|
||||||
camera_optionmenu = ctk.CTkOptionMenu(
|
camera_variable = ctk.StringVar(value=camera_names[0])
|
||||||
root, variable=camera_variable, values=available_camera_strings
|
camera_optionmenu = ctk.CTkOptionMenu(
|
||||||
)
|
root, variable=camera_variable, values=camera_names
|
||||||
|
)
|
||||||
|
|
||||||
camera_optionmenu.place(relx=0.35, rely=0.86, relwidth=0.25, relheight=0.05)
|
camera_optionmenu.place(relx=0.35, rely=0.86, relwidth=0.25, relheight=0.05)
|
||||||
|
|
||||||
live_button = ctk.CTkButton(
|
live_button = ctk.CTkButton(
|
||||||
|
@ -335,9 +342,16 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
|
||||||
cursor="hand2",
|
cursor="hand2",
|
||||||
command=lambda: webcam_preview(
|
command=lambda: webcam_preview(
|
||||||
root,
|
root,
|
||||||
available_camera_indices[
|
(
|
||||||
available_camera_strings.index(camera_variable.get())
|
camera_indices[camera_names.index(camera_variable.get())]
|
||||||
],
|
if camera_names and camera_names[0] != "No cameras found"
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
),
|
||||||
|
state=(
|
||||||
|
"normal"
|
||||||
|
if camera_names and camera_names[0] != "No cameras found"
|
||||||
|
else "disabled"
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
|
live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
|
||||||
|
@ -745,7 +759,7 @@ def update_preview(frame_number: int = 0) -> None:
|
||||||
def webcam_preview(root: ctk.CTk, camera_index: int):
|
def webcam_preview(root: ctk.CTk, camera_index: int):
|
||||||
if not modules.globals.map_faces:
|
if not modules.globals.map_faces:
|
||||||
if modules.globals.source_path is None:
|
if modules.globals.source_path is None:
|
||||||
# No image selected
|
update_status("Please select a source image first")
|
||||||
return
|
return
|
||||||
create_webcam_preview(camera_index)
|
create_webcam_preview(camera_index)
|
||||||
else:
|
else:
|
||||||
|
@ -757,40 +771,60 @@ def webcam_preview(root: ctk.CTk, camera_index: int):
|
||||||
|
|
||||||
def get_available_cameras():
|
def get_available_cameras():
|
||||||
"""Returns a list of available camera names and indices."""
|
"""Returns a list of available camera names and indices."""
|
||||||
camera_indices = []
|
try:
|
||||||
camera_names = []
|
graph = FilterGraph()
|
||||||
|
devices = graph.get_input_devices()
|
||||||
|
|
||||||
for camera in enumerate_cameras():
|
# Create list of indices and names
|
||||||
cap = cv2.VideoCapture(camera.index)
|
camera_indices = list(range(len(devices)))
|
||||||
if cap.isOpened():
|
camera_names = devices
|
||||||
camera_indices.append(camera.index)
|
|
||||||
camera_names.append(camera.name)
|
# If no cameras found through DirectShow, try OpenCV fallback
|
||||||
cap.release()
|
if not camera_names:
|
||||||
return (camera_indices, camera_names)
|
# Try to open camera with index -1 and 0
|
||||||
|
test_indices = [-1, 0]
|
||||||
|
working_cameras = []
|
||||||
|
|
||||||
|
for idx in test_indices:
|
||||||
|
cap = cv2.VideoCapture(idx)
|
||||||
|
if cap.isOpened():
|
||||||
|
working_cameras.append(f"Camera {idx}")
|
||||||
|
cap.release()
|
||||||
|
|
||||||
|
if working_cameras:
|
||||||
|
return test_indices[: len(working_cameras)], working_cameras
|
||||||
|
|
||||||
|
# If still no cameras found, return empty lists
|
||||||
|
if not camera_names:
|
||||||
|
return [], ["No cameras found"]
|
||||||
|
|
||||||
|
return camera_indices, camera_names
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error detecting cameras: {str(e)}")
|
||||||
|
return [], ["No cameras found"]
|
||||||
|
|
||||||
|
|
||||||
def create_webcam_preview(camera_index: int):
|
def create_webcam_preview(camera_index: int):
|
||||||
global preview_label, PREVIEW
|
global preview_label, PREVIEW
|
||||||
|
|
||||||
camera = cv2.VideoCapture(camera_index)
|
cap = VideoCapturer(camera_index)
|
||||||
camera.set(cv2.CAP_PROP_FRAME_WIDTH, PREVIEW_DEFAULT_WIDTH)
|
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
|
||||||
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, PREVIEW_DEFAULT_HEIGHT)
|
update_status("Failed to start camera")
|
||||||
camera.set(cv2.CAP_PROP_FPS, 60)
|
return
|
||||||
|
|
||||||
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
|
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
|
||||||
|
|
||||||
PREVIEW.deiconify()
|
PREVIEW.deiconify()
|
||||||
|
|
||||||
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
|
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
|
||||||
|
|
||||||
source_image = None
|
source_image = None
|
||||||
prev_time = time.time()
|
prev_time = time.time()
|
||||||
fps_update_interval = 0.5 # Update FPS every 0.5 seconds
|
fps_update_interval = 0.5
|
||||||
frame_count = 0
|
frame_count = 0
|
||||||
fps = 0
|
fps = 0
|
||||||
|
|
||||||
while camera:
|
while True:
|
||||||
ret, frame = camera.read()
|
ret, frame = cap.read()
|
||||||
if not ret:
|
if not ret:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -804,6 +838,11 @@ def create_webcam_preview(camera_index: int):
|
||||||
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
temp_frame = fit_image_to_size(
|
||||||
|
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
|
||||||
|
)
|
||||||
|
|
||||||
if not modules.globals.map_faces:
|
if not modules.globals.map_faces:
|
||||||
if source_image is None and modules.globals.source_path:
|
if source_image is None and modules.globals.source_path:
|
||||||
source_image = get_one_face(cv2.imread(modules.globals.source_path))
|
source_image = get_one_face(cv2.imread(modules.globals.source_path))
|
||||||
|
@ -816,7 +855,6 @@ def create_webcam_preview(camera_index: int):
|
||||||
temp_frame = frame_processor.process_frame(source_image, temp_frame)
|
temp_frame = frame_processor.process_frame(source_image, temp_frame)
|
||||||
else:
|
else:
|
||||||
modules.globals.target_path = None
|
modules.globals.target_path = None
|
||||||
|
|
||||||
for frame_processor in frame_processors:
|
for frame_processor in frame_processors:
|
||||||
if frame_processor.NAME == "DLC.FACE-ENHANCER":
|
if frame_processor.NAME == "DLC.FACE-ENHANCER":
|
||||||
if modules.globals.fp_ui["face_enhancer"]:
|
if modules.globals.fp_ui["face_enhancer"]:
|
||||||
|
@ -855,7 +893,7 @@ def create_webcam_preview(camera_index: int):
|
||||||
if PREVIEW.state() == "withdrawn":
|
if PREVIEW.state() == "withdrawn":
|
||||||
break
|
break
|
||||||
|
|
||||||
camera.release()
|
cap.release()
|
||||||
PREVIEW.withdraw()
|
PREVIEW.withdraw()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -12,16 +12,23 @@ from tqdm import tqdm
|
||||||
|
|
||||||
import modules.globals
|
import modules.globals
|
||||||
|
|
||||||
TEMP_FILE = 'temp.mp4'
|
TEMP_FILE = "temp.mp4"
|
||||||
TEMP_DIRECTORY = 'temp'
|
TEMP_DIRECTORY = "temp"
|
||||||
|
|
||||||
# monkey patch ssl for mac
|
# monkey patch ssl for mac
|
||||||
if platform.system().lower() == 'darwin':
|
if platform.system().lower() == "darwin":
|
||||||
ssl._create_default_https_context = ssl._create_unverified_context
|
ssl._create_default_https_context = ssl._create_unverified_context
|
||||||
|
|
||||||
|
|
||||||
def run_ffmpeg(args: List[str]) -> bool:
|
def run_ffmpeg(args: List[str]) -> bool:
|
||||||
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', modules.globals.log_level]
|
commands = [
|
||||||
|
"ffmpeg",
|
||||||
|
"-hide_banner",
|
||||||
|
"-hwaccel",
|
||||||
|
"auto",
|
||||||
|
"-loglevel",
|
||||||
|
modules.globals.log_level,
|
||||||
|
]
|
||||||
commands.extend(args)
|
commands.extend(args)
|
||||||
try:
|
try:
|
||||||
subprocess.check_output(commands, stderr=subprocess.STDOUT)
|
subprocess.check_output(commands, stderr=subprocess.STDOUT)
|
||||||
|
@ -32,8 +39,19 @@ def run_ffmpeg(args: List[str]) -> bool:
|
||||||
|
|
||||||
|
|
||||||
def detect_fps(target_path: str) -> float:
|
def detect_fps(target_path: str) -> float:
|
||||||
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
|
command = [
|
||||||
output = subprocess.check_output(command).decode().strip().split('/')
|
"ffprobe",
|
||||||
|
"-v",
|
||||||
|
"error",
|
||||||
|
"-select_streams",
|
||||||
|
"v:0",
|
||||||
|
"-show_entries",
|
||||||
|
"stream=r_frame_rate",
|
||||||
|
"-of",
|
||||||
|
"default=noprint_wrappers=1:nokey=1",
|
||||||
|
target_path,
|
||||||
|
]
|
||||||
|
output = subprocess.check_output(command).decode().strip().split("/")
|
||||||
try:
|
try:
|
||||||
numerator, denominator = map(int, output)
|
numerator, denominator = map(int, output)
|
||||||
return numerator / denominator
|
return numerator / denominator
|
||||||
|
@ -44,25 +62,65 @@ def detect_fps(target_path: str) -> float:
|
||||||
|
|
||||||
def extract_frames(target_path: str) -> None:
|
def extract_frames(target_path: str) -> None:
|
||||||
temp_directory_path = get_temp_directory_path(target_path)
|
temp_directory_path = get_temp_directory_path(target_path)
|
||||||
run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')])
|
run_ffmpeg(
|
||||||
|
[
|
||||||
|
"-i",
|
||||||
|
target_path,
|
||||||
|
"-pix_fmt",
|
||||||
|
"rgb24",
|
||||||
|
os.path.join(temp_directory_path, "%04d.png"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_video(target_path: str, fps: float = 30.0) -> None:
|
def create_video(target_path: str, fps: float = 30.0) -> None:
|
||||||
temp_output_path = get_temp_output_path(target_path)
|
temp_output_path = get_temp_output_path(target_path)
|
||||||
temp_directory_path = get_temp_directory_path(target_path)
|
temp_directory_path = get_temp_directory_path(target_path)
|
||||||
run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
|
run_ffmpeg(
|
||||||
|
[
|
||||||
|
"-r",
|
||||||
|
str(fps),
|
||||||
|
"-i",
|
||||||
|
os.path.join(temp_directory_path, "%04d.png"),
|
||||||
|
"-c:v",
|
||||||
|
modules.globals.video_encoder,
|
||||||
|
"-crf",
|
||||||
|
str(modules.globals.video_quality),
|
||||||
|
"-pix_fmt",
|
||||||
|
"yuv420p",
|
||||||
|
"-vf",
|
||||||
|
"colorspace=bt709:iall=bt601-6-625:fast=1",
|
||||||
|
"-y",
|
||||||
|
temp_output_path,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def restore_audio(target_path: str, output_path: str) -> None:
|
def restore_audio(target_path: str, output_path: str) -> None:
|
||||||
temp_output_path = get_temp_output_path(target_path)
|
temp_output_path = get_temp_output_path(target_path)
|
||||||
done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
|
done = run_ffmpeg(
|
||||||
|
[
|
||||||
|
"-i",
|
||||||
|
temp_output_path,
|
||||||
|
"-i",
|
||||||
|
target_path,
|
||||||
|
"-c:v",
|
||||||
|
"copy",
|
||||||
|
"-map",
|
||||||
|
"0:v:0",
|
||||||
|
"-map",
|
||||||
|
"1:a:0",
|
||||||
|
"-y",
|
||||||
|
output_path,
|
||||||
|
]
|
||||||
|
)
|
||||||
if not done:
|
if not done:
|
||||||
move_temp(target_path, output_path)
|
move_temp(target_path, output_path)
|
||||||
|
|
||||||
|
|
||||||
def get_temp_frame_paths(target_path: str) -> List[str]:
|
def get_temp_frame_paths(target_path: str) -> List[str]:
|
||||||
temp_directory_path = get_temp_directory_path(target_path)
|
temp_directory_path = get_temp_directory_path(target_path)
|
||||||
return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png')))
|
return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png")))
|
||||||
|
|
||||||
|
|
||||||
def get_temp_directory_path(target_path: str) -> str:
|
def get_temp_directory_path(target_path: str) -> str:
|
||||||
|
@ -81,7 +139,9 @@ def normalize_output_path(source_path: str, target_path: str, output_path: str)
|
||||||
source_name, _ = os.path.splitext(os.path.basename(source_path))
|
source_name, _ = os.path.splitext(os.path.basename(source_path))
|
||||||
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
|
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
|
||||||
if os.path.isdir(output_path):
|
if os.path.isdir(output_path):
|
||||||
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
|
return os.path.join(
|
||||||
|
output_path, source_name + "-" + target_name + target_extension
|
||||||
|
)
|
||||||
return output_path
|
return output_path
|
||||||
|
|
||||||
|
|
||||||
|
@ -108,20 +168,20 @@ def clean_temp(target_path: str) -> None:
|
||||||
|
|
||||||
|
|
||||||
def has_image_extension(image_path: str) -> bool:
|
def has_image_extension(image_path: str) -> bool:
|
||||||
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
|
return image_path.lower().endswith(("png", "jpg", "jpeg"))
|
||||||
|
|
||||||
|
|
||||||
def is_image(image_path: str) -> bool:
|
def is_image(image_path: str) -> bool:
|
||||||
if image_path and os.path.isfile(image_path):
|
if image_path and os.path.isfile(image_path):
|
||||||
mimetype, _ = mimetypes.guess_type(image_path)
|
mimetype, _ = mimetypes.guess_type(image_path)
|
||||||
return bool(mimetype and mimetype.startswith('image/'))
|
return bool(mimetype and mimetype.startswith("image/"))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_video(video_path: str) -> bool:
|
def is_video(video_path: str) -> bool:
|
||||||
if video_path and os.path.isfile(video_path):
|
if video_path and os.path.isfile(video_path):
|
||||||
mimetype, _ = mimetypes.guess_type(video_path)
|
mimetype, _ = mimetypes.guess_type(video_path)
|
||||||
return bool(mimetype and mimetype.startswith('video/'))
|
return bool(mimetype and mimetype.startswith("video/"))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@ -129,12 +189,20 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None:
|
||||||
if not os.path.exists(download_directory_path):
|
if not os.path.exists(download_directory_path):
|
||||||
os.makedirs(download_directory_path)
|
os.makedirs(download_directory_path)
|
||||||
for url in urls:
|
for url in urls:
|
||||||
download_file_path = os.path.join(download_directory_path, os.path.basename(url))
|
download_file_path = os.path.join(
|
||||||
|
download_directory_path, os.path.basename(url)
|
||||||
|
)
|
||||||
if not os.path.exists(download_file_path):
|
if not os.path.exists(download_file_path):
|
||||||
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
|
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
|
||||||
total = int(request.headers.get('Content-Length', 0))
|
total = int(request.headers.get("Content-Length", 0))
|
||||||
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
|
with tqdm(
|
||||||
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
|
total=total,
|
||||||
|
desc="Downloading",
|
||||||
|
unit="B",
|
||||||
|
unit_scale=True,
|
||||||
|
unit_divisor=1024,
|
||||||
|
) as progress:
|
||||||
|
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
|
||||||
def resolve_relative_path(path: str) -> str:
|
def resolve_relative_path(path: str) -> str:
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
from pygrabber.dshow_graph import FilterGraph
|
||||||
|
import threading
|
||||||
|
from typing import Optional, Tuple, Callable
|
||||||
|
|
||||||
|
|
||||||
|
class VideoCapturer:
|
||||||
|
def __init__(self, device_index: int):
|
||||||
|
self.graph = FilterGraph()
|
||||||
|
self.device_index = device_index
|
||||||
|
self.frame_callback = None
|
||||||
|
self._current_frame = None
|
||||||
|
self._frame_ready = threading.Event()
|
||||||
|
self.is_running = False
|
||||||
|
self.cap = None
|
||||||
|
|
||||||
|
# Verify device exists
|
||||||
|
devices = self.graph.get_input_devices()
|
||||||
|
if self.device_index >= len(devices):
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid device index {device_index}. Available devices: {len(devices)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
|
||||||
|
"""Initialize and start video capture"""
|
||||||
|
try:
|
||||||
|
# Try different capture methods in order
|
||||||
|
capture_methods = [
|
||||||
|
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first
|
||||||
|
(self.device_index, cv2.CAP_ANY), # Then try default backend
|
||||||
|
(-1, cv2.CAP_ANY), # Try -1 as fallback
|
||||||
|
(0, cv2.CAP_ANY), # Finally try 0 without specific backend
|
||||||
|
]
|
||||||
|
|
||||||
|
for dev_id, backend in capture_methods:
|
||||||
|
try:
|
||||||
|
self.cap = cv2.VideoCapture(dev_id, backend)
|
||||||
|
if self.cap.isOpened():
|
||||||
|
break
|
||||||
|
self.cap.release()
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not self.cap or not self.cap.isOpened():
|
||||||
|
raise RuntimeError("Failed to open camera with all available methods")
|
||||||
|
|
||||||
|
# Configure format
|
||||||
|
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
||||||
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
||||||
|
self.cap.set(cv2.CAP_PROP_FPS, fps)
|
||||||
|
|
||||||
|
self.is_running = True
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to start capture: {str(e)}")
|
||||||
|
if self.cap:
|
||||||
|
self.cap.release()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
|
||||||
|
"""Read a frame from the camera"""
|
||||||
|
if not self.is_running or self.cap is None:
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
ret, frame = self.cap.read()
|
||||||
|
if ret:
|
||||||
|
self._current_frame = frame
|
||||||
|
if self.frame_callback:
|
||||||
|
self.frame_callback(frame)
|
||||||
|
return True, frame
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
def release(self) -> None:
|
||||||
|
"""Stop capture and release resources"""
|
||||||
|
if self.is_running and self.cap is not None:
|
||||||
|
self.cap.release()
|
||||||
|
self.is_running = False
|
||||||
|
self.cap = None
|
||||||
|
|
||||||
|
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
|
||||||
|
"""Set callback for frame processing"""
|
||||||
|
self.frame_callback = callback
|
Loading…
Reference in New Issue