142 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			142 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
import glob
 | 
						|
import mimetypes
 | 
						|
import os
 | 
						|
import platform
 | 
						|
import shutil
 | 
						|
import ssl
 | 
						|
import subprocess
 | 
						|
import urllib
 | 
						|
from pathlib import Path
 | 
						|
from typing import List, Any
 | 
						|
from tqdm import tqdm
 | 
						|
 | 
						|
import modules.globals
 | 
						|
 | 
						|
TEMP_FILE = 'temp.mp4'
 | 
						|
TEMP_DIRECTORY = 'temp'
 | 
						|
 | 
						|
# monkey patch ssl for mac
 | 
						|
if platform.system().lower() == 'darwin':
 | 
						|
    ssl._create_default_https_context = ssl._create_unverified_context
 | 
						|
 | 
						|
 | 
						|
def run_ffmpeg(args: List[str]) -> bool:
 | 
						|
    commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', modules.globals.log_level]
 | 
						|
    commands.extend(args)
 | 
						|
    try:
 | 
						|
        subprocess.check_output(commands, stderr=subprocess.STDOUT)
 | 
						|
        return True
 | 
						|
    except Exception:
 | 
						|
        pass
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def detect_fps(target_path: str) -> float:
 | 
						|
    command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
 | 
						|
    output = subprocess.check_output(command).decode().strip().split('/')
 | 
						|
    try:
 | 
						|
        numerator, denominator = map(int, output)
 | 
						|
        return numerator / denominator
 | 
						|
    except Exception:
 | 
						|
        pass
 | 
						|
    return 30.0
 | 
						|
 | 
						|
 | 
						|
def extract_frames(target_path: str) -> None:
 | 
						|
    temp_directory_path = get_temp_directory_path(target_path)
 | 
						|
    run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')])
 | 
						|
 | 
						|
 | 
						|
def create_video(target_path: str, fps: float = 30.0) -> None:
 | 
						|
    temp_output_path = get_temp_output_path(target_path)
 | 
						|
    temp_directory_path = get_temp_directory_path(target_path)
 | 
						|
    run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
 | 
						|
 | 
						|
 | 
						|
def restore_audio(target_path: str, output_path: str) -> None:
 | 
						|
    temp_output_path = get_temp_output_path(target_path)
 | 
						|
    done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
 | 
						|
    if not done:
 | 
						|
        move_temp(target_path, output_path)
 | 
						|
 | 
						|
 | 
						|
def get_temp_frame_paths(target_path: str) -> List[str]:
 | 
						|
    temp_directory_path = get_temp_directory_path(target_path)
 | 
						|
    return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png')))
 | 
						|
 | 
						|
 | 
						|
def get_temp_directory_path(target_path: str) -> str:
 | 
						|
    target_name, _ = os.path.splitext(os.path.basename(target_path))
 | 
						|
    target_directory_path = os.path.dirname(target_path)
 | 
						|
    return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name)
 | 
						|
 | 
						|
 | 
						|
def get_temp_output_path(target_path: str) -> str:
 | 
						|
    temp_directory_path = get_temp_directory_path(target_path)
 | 
						|
    return os.path.join(temp_directory_path, TEMP_FILE)
 | 
						|
 | 
						|
 | 
						|
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any:
 | 
						|
    if source_path and target_path:
 | 
						|
        source_name, _ = os.path.splitext(os.path.basename(source_path))
 | 
						|
        target_name, target_extension = os.path.splitext(os.path.basename(target_path))
 | 
						|
        if os.path.isdir(output_path):
 | 
						|
            return os.path.join(output_path, source_name + '-' + target_name + target_extension)
 | 
						|
    return output_path
 | 
						|
 | 
						|
 | 
						|
def create_temp(target_path: str) -> None:
 | 
						|
    temp_directory_path = get_temp_directory_path(target_path)
 | 
						|
    Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
 | 
						|
 | 
						|
 | 
						|
def move_temp(target_path: str, output_path: str) -> None:
 | 
						|
    temp_output_path = get_temp_output_path(target_path)
 | 
						|
    if os.path.isfile(temp_output_path):
 | 
						|
        if os.path.isfile(output_path):
 | 
						|
            os.remove(output_path)
 | 
						|
        shutil.move(temp_output_path, output_path)
 | 
						|
 | 
						|
 | 
						|
def clean_temp(target_path: str) -> None:
 | 
						|
    temp_directory_path = get_temp_directory_path(target_path)
 | 
						|
    parent_directory_path = os.path.dirname(temp_directory_path)
 | 
						|
    if not modules.globals.keep_frames and os.path.isdir(temp_directory_path):
 | 
						|
        shutil.rmtree(temp_directory_path)
 | 
						|
    if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
 | 
						|
        os.rmdir(parent_directory_path)
 | 
						|
 | 
						|
 | 
						|
def has_image_extension(image_path: str) -> bool:
 | 
						|
    return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
 | 
						|
 | 
						|
 | 
						|
def is_image(image_path: str) -> bool:
 | 
						|
    if image_path and os.path.isfile(image_path):
 | 
						|
        mimetype, _ = mimetypes.guess_type(image_path)
 | 
						|
        return bool(mimetype and mimetype.startswith('image/'))
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def is_video(video_path: str) -> bool:
 | 
						|
    if video_path and os.path.isfile(video_path):
 | 
						|
        mimetype, _ = mimetypes.guess_type(video_path)
 | 
						|
        return bool(mimetype and mimetype.startswith('video/'))
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def conditional_download(download_directory_path: str, urls: List[str]) -> None:
 | 
						|
    if not os.path.exists(download_directory_path):
 | 
						|
        os.makedirs(download_directory_path)
 | 
						|
    for url in urls:
 | 
						|
        download_file_path = os.path.join(download_directory_path, os.path.basename(url))
 | 
						|
        if not os.path.exists(download_file_path):
 | 
						|
            request = urllib.request.urlopen(url) # type: ignore[attr-defined]
 | 
						|
            total = int(request.headers.get('Content-Length', 0))
 | 
						|
            with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
 | 
						|
                urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
 | 
						|
 | 
						|
 | 
						|
def resolve_relative_path(path: str) -> str:
 | 
						|
    return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
 |