663 lines
23 KiB
Python
Executable File
663 lines
23 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# video_autoreduce.py
|
|
|
|
import argparse
|
|
import argcomplete
|
|
import colorama
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import json
|
|
from pathlib import Path
|
|
|
|
# Allow importing from scripts/library even when run directly
|
|
project_root = str(Path(__file__).resolve().parent.parent)
|
|
if project_root not in sys.path:
|
|
sys.path.append(project_root)
|
|
|
|
# === Local import ===
|
|
from scripts.library import (
|
|
deletefile,
|
|
findfreename,
|
|
apply_resolution_rename,
|
|
get_intermediate_dirs,
|
|
)
|
|
|
|
colorama.init()
|
|
|
|
creset = colorama.Fore.RESET
|
|
ccyan = colorama.Fore.CYAN
|
|
cyellow = colorama.Fore.YELLOW
|
|
cgreen = colorama.Fore.GREEN
|
|
cred = colorama.Fore.RED
|
|
|
|
VIDEO_EXTENSIONS = (".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".divx")
|
|
|
|
|
|
def get_ffmpeg_codecs(codec_type="S"):
|
|
"""
|
|
Retrieve a list of supported codecs for a specific type from ffmpeg.
|
|
|
|
This function runs the `ffmpeg -codecs` command and parses its output to extract
|
|
the list of supported codecs based on the provided `codec_type` (e.g., 'S' for subtitles,
|
|
'V' for video, 'A' for audio). The list of codecs is returned as a set for efficient lookup.
|
|
|
|
Args:
|
|
codec_type (str): The type of codecs to retrieve.
|
|
- "S" for subtitles (default)
|
|
- "V" for video
|
|
- "A" for audio
|
|
- "D" for data
|
|
- "T" for attachment
|
|
|
|
Returns:
|
|
set: A set of codec names of the specified type.
|
|
If an error occurs or no codecs are found, an empty set is returned.
|
|
"""
|
|
try:
|
|
# Run ffmpeg -codecs
|
|
result = subprocess.run(
|
|
["ffmpeg", "-codecs"], capture_output=True, text=True, check=True
|
|
)
|
|
output = result.stdout
|
|
|
|
# Extract codecs using regex
|
|
codec_pattern = re.compile(rf"^[ D.E]+{codec_type}[ A-Z.]+ (\S+)", re.MULTILINE)
|
|
codecs = codec_pattern.findall(output)
|
|
|
|
return set(codecs) # Return as a set for easy lookup
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print("Error running ffmpeg:", e)
|
|
return set()
|
|
|
|
|
|
def user_confirm(message, color="yellow"):
|
|
"""
|
|
Prompt the user for a yes/no confirmation.
|
|
|
|
Args:
|
|
message (str): The confirmation message to display.
|
|
color (str): Color for the prompt ('yellow', 'red', 'green'). Default is 'yellow'.
|
|
|
|
Returns:
|
|
bool: True if the user confirms with 'Y', False otherwise.
|
|
"""
|
|
color_map = {
|
|
"yellow": cyellow,
|
|
"red": cred,
|
|
"green": cgreen,
|
|
}
|
|
c = color_map.get(color, cyellow)
|
|
response = input(f"{c}{message}{creset} ").strip().upper()
|
|
return response == "Y"
|
|
|
|
|
|
def ensure_output_path(output_path):
|
|
"""
|
|
Ensure that the output directory exists. If it doesn't, attempt to create it.
|
|
|
|
Args:
|
|
output_path (str): The path of the output directory to be ensured.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
os.makedirs(output_path, exist_ok=True)
|
|
# Attempts to create the output directory if it doesn't exist.
|
|
# exist_ok=True ensures that an OSError is not raised if the directory already exists.
|
|
except OSError as e:
|
|
# If an OSError occurs during directory creation, print an error message and exit the program.
|
|
print(f"{cred}Failed to create output directory:{creset} {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def has_supported_subs(video_file, supported_subtitle_codecs, debug=False):
|
|
"""
|
|
Check if the given video file contains subtitles in a supported codec.
|
|
|
|
This function uses `ffprobe` to extract subtitle stream information from the
|
|
video file and checks whether any subtitle stream has a codec that matches
|
|
one of the supported codecs provided in the `supported_subtitle_codecs` list.
|
|
|
|
Args:
|
|
video_file (str): Path to the video file to be checked.
|
|
supported_subtitle_codecs (set): Set of subtitle codec names that are considered supported.
|
|
debug (bool): If True, prints debug information for errors. Defaults to False.
|
|
|
|
Returns:
|
|
bool: True if at least one subtitle stream is found with a supported codec,
|
|
False otherwise or in case of an error.
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-select_streams",
|
|
"s",
|
|
"-show_entries",
|
|
"stream=index,codec_name",
|
|
"-of",
|
|
"json",
|
|
video_file,
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
if debug:
|
|
print(f"ffprobe error: {result.stderr.strip()}")
|
|
return False # Assume no supported subtitles on failure
|
|
|
|
try:
|
|
streams = json.loads(result.stdout).get("streams", [])
|
|
except json.JSONDecodeError as e:
|
|
if debug:
|
|
print(f"JSON parsing error: {e}")
|
|
return False # Assume no supported subtitles if JSON is malformed
|
|
|
|
return any(
|
|
stream.get("codec_name") in supported_subtitle_codecs for stream in streams
|
|
)
|
|
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"Unexpected error: {e}")
|
|
return False # Assume no supported subtitles
|
|
|
|
|
|
def find_videos_to_convert(input_path, max_height=720, debug=False):
|
|
"""
|
|
Find video files in the specified directory tree that meet conversion criteria.
|
|
|
|
Args:
|
|
input_path (str): The path of the directory to search for video files.
|
|
max_height (int, optional): The maximum height (in pixels) of the video to consider for conversion. Default is 720.
|
|
debug (bool, optional): If True, print debug messages. Default is False.
|
|
|
|
Returns:
|
|
list: A list of paths to video files that meet the conversion criteria.
|
|
"""
|
|
# Print a message indicating the start of the scanning process
|
|
print(f"{cgreen}Scanning {input_path} for video files to convert.{creset}\n")
|
|
|
|
# Initialize lists to store all video files and valid video files
|
|
video_files = []
|
|
valid_videos_files = []
|
|
|
|
# Find all video files in the directory tree
|
|
for root, dirs, files in os.walk(input_path):
|
|
for file in files:
|
|
fullpath = os.path.join(root, file)
|
|
# Check if the file is a video file with one of the specified extensions
|
|
if file.lower().endswith(VIDEO_EXTENSIONS) and os.path.isfile(fullpath):
|
|
video_files.append(fullpath)
|
|
|
|
# List of encodings to try for decoding output from ffprobe
|
|
ENCODINGS_TO_TRY = ["utf-8", "latin-1", "iso-8859-1"]
|
|
|
|
# Iterate through each video file
|
|
for video_file in video_files:
|
|
try:
|
|
# Run ffprobe command to get video resolution
|
|
resolution_output = subprocess.check_output(
|
|
[
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-select_streams",
|
|
"v:0",
|
|
"-show_entries",
|
|
"stream=width,height",
|
|
"-of",
|
|
"csv=p=0",
|
|
video_file,
|
|
],
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
|
|
# Attempt to decode the output using different encodings
|
|
dimensions_str = None
|
|
width = height = None
|
|
for encoding in ENCODINGS_TO_TRY:
|
|
try:
|
|
dimensions_str = (
|
|
resolution_output.decode(encoding).strip().rstrip(",")
|
|
)
|
|
|
|
dimensions = [
|
|
int(d) for d in dimensions_str.split(",") if d.strip().isdigit()
|
|
]
|
|
if len(dimensions) >= 2:
|
|
width, height = dimensions[:2]
|
|
else:
|
|
raise ValueError(
|
|
f"Unexpected dimensions format: {dimensions_str}"
|
|
)
|
|
break # Stop trying encodings once successful decoding and conversion
|
|
except (UnicodeDecodeError, ValueError):
|
|
continue # Try the next encoding
|
|
|
|
# If decoding was unsuccessful with all encodings, skip this video
|
|
if width is None or height is None:
|
|
if debug:
|
|
print(
|
|
f"{cred}Error processing {video_file}: Unable to decode dimensions from ffprobe output.{creset}",
|
|
file=sys.stderr,
|
|
)
|
|
continue
|
|
|
|
# If decoding was unsuccessful with all encodings, skip this video
|
|
if dimensions_str is None:
|
|
if debug:
|
|
print(
|
|
f"{cred}Error processing {video_file}: Unable to decode output from ffprobe using any of the specified encodings.{creset}",
|
|
file=sys.stderr,
|
|
)
|
|
continue # Skip this video and continue with the next one
|
|
|
|
# Ignore vertical videos
|
|
if height > width:
|
|
continue # Skip this video and continue with the next one
|
|
|
|
# Check if the video height exceeds the maximum allowed height
|
|
if height > max_height:
|
|
valid_videos_files.append(video_file)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
if debug:
|
|
# Print error message if subprocess call fails
|
|
if e.output is not None:
|
|
decoded_error = e.output.decode("utf-8")
|
|
print(
|
|
f"{cred}Error processing {video_file}: {decoded_error}{creset}\n",
|
|
file=sys.stderr,
|
|
)
|
|
else:
|
|
print(
|
|
f"{cred}Error processing {video_file}: No output from subprocess{creset}\n",
|
|
file=sys.stderr,
|
|
)
|
|
except ValueError as e:
|
|
if debug:
|
|
# Print error message if value error occurs during decoding
|
|
print(
|
|
f"{cred}Error processing {video_file}: ValueError decoding the file{creset}\n",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
valid_videos_files.sort()
|
|
|
|
return valid_videos_files
|
|
|
|
|
|
def convert_videos(
|
|
input_path,
|
|
output_path=None,
|
|
max_height=720,
|
|
delete=False,
|
|
rename=False,
|
|
debug=False,
|
|
):
|
|
"""
|
|
Convert videos taller than a specified height to 720p resolution with x265 encoding and MKV container.
|
|
|
|
Args:
|
|
input_path (str): The path of the directory containing input video files.
|
|
output_path (str, optional): The path of the directory where converted video files will be saved.
|
|
If None, output files are written alongside the originals (in-place).
|
|
max_height (int, optional): The maximum height (in pixels) of the video to consider for conversion. Default is 720.
|
|
delete (bool, optional): If True, delete the original file after successful conversion. Default is False.
|
|
rename (bool, optional): If True, rename files and intermediate folders to reflect the target resolution. Default is False.
|
|
debug (bool, optional): If True, print debug messages. Default is False.
|
|
|
|
Returns:
|
|
bool: True if conversion succeeds, False otherwise.
|
|
"""
|
|
|
|
# Get supported subtitle codecs
|
|
supported_subtitle_codecs = get_ffmpeg_codecs("S")
|
|
|
|
# Find video files to convert in the input directory
|
|
video_files = find_videos_to_convert(input_path, max_height, debug)
|
|
counter = 0
|
|
|
|
# If no video files found, print a message and return False
|
|
if len(video_files) == 0:
|
|
print(f"No video files to convert found.\n")
|
|
return False
|
|
|
|
# Ensure the fixed output directory exists (only when output_path is set)
|
|
if output_path:
|
|
ensure_output_path(output_path)
|
|
|
|
# Print a message indicating the start of the conversion process
|
|
print(
|
|
f"Converting {len(video_files)} videos taller than {max_height} pixels to {max_height}p resolution with x265 encoding and MKV container...\n"
|
|
)
|
|
|
|
# Variable to keep a track of the current_file in case of failure
|
|
current_file = None
|
|
|
|
# Track successfully converted original paths (used for folder rename pass)
|
|
converted_files = set()
|
|
|
|
# Iterate through each video file for conversion
|
|
for video_file in video_files:
|
|
counter += 1
|
|
|
|
# Print conversion progress information
|
|
print(
|
|
f"{cgreen}****** Starting conversion {counter} of {len(video_files)}: '{os.path.basename(video_file)}'...{creset}"
|
|
)
|
|
print(f"{ccyan}Original file:{creset}")
|
|
print(video_file)
|
|
|
|
# Determine output directory: fixed path, or same dir as the source file
|
|
effective_output_path = (
|
|
output_path if output_path else os.path.dirname(video_file)
|
|
)
|
|
|
|
# Compute base output filename, applying resolution rename if requested
|
|
base_name = os.path.splitext(os.path.basename(video_file))[0]
|
|
if rename:
|
|
renamed_base = apply_resolution_rename(base_name, max_height)
|
|
if renamed_base == base_name and f"{max_height}p" not in renamed_base:
|
|
# No resolution found in filename: append .{max_height}p before extension
|
|
renamed_base = f"{base_name}.{max_height}p"
|
|
base_name = renamed_base
|
|
|
|
# Generate output file path, using findfreename to avoid conflicts
|
|
output_file = findfreename(
|
|
os.path.join(effective_output_path, base_name + ".mkv")
|
|
)
|
|
|
|
try:
|
|
# Get original video width and height
|
|
video_info = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-select_streams",
|
|
"v:0",
|
|
"-show_entries",
|
|
"stream=width,height",
|
|
"-of",
|
|
"csv=p=0",
|
|
video_file,
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
width, height = map(int, video_info.stdout.strip().rstrip(",").split(","))
|
|
|
|
# Calculate the scaled width maintaining aspect ratio
|
|
scaled_width = int(max_height * (width / height))
|
|
# Ensure the width is even
|
|
if scaled_width % 2 != 0:
|
|
scaled_width += 1
|
|
|
|
# Keep a track of the current_file in case of failure
|
|
current_file = output_file
|
|
|
|
# Check if the video has supported subtitles
|
|
include_subs = has_supported_subs(
|
|
video_file, supported_subtitle_codecs, debug
|
|
)
|
|
|
|
# Construct the FFmpeg command dynamically
|
|
ffmpeg_command = [
|
|
"ffmpeg",
|
|
"-n",
|
|
"-i",
|
|
video_file,
|
|
"-map",
|
|
"0:v",
|
|
"-c:v",
|
|
"libx265",
|
|
"-vf",
|
|
f"scale={scaled_width}:{max_height}",
|
|
"-preset",
|
|
"medium",
|
|
"-crf",
|
|
"28",
|
|
"-map",
|
|
"0:a?",
|
|
"-c:a",
|
|
"copy",
|
|
]
|
|
|
|
# Only include subtitles if they are supported
|
|
if include_subs:
|
|
ffmpeg_command.extend(["-map", "0:s?", "-c:s", "copy"])
|
|
|
|
ffmpeg_command.append(output_file)
|
|
|
|
# Print the FFmpeg command as a single string
|
|
if debug:
|
|
print("FFmpeg command: " + " ".join(ffmpeg_command))
|
|
|
|
# Run the FFmpeg command
|
|
process = subprocess.run(
|
|
ffmpeg_command,
|
|
check=True,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
|
|
# Check if ffmpeg process returned successfully
|
|
if process.returncode != 0:
|
|
raise subprocess.CalledProcessError(
|
|
process.returncode, process.args, output=process.stderr
|
|
)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
# If we have a partial video converted, delete it due to the failure
|
|
if current_file:
|
|
deletefile(current_file)
|
|
|
|
# Handle subprocess errors
|
|
if debug:
|
|
if e.output is not None:
|
|
decoded_error = e.output.decode("utf-8")
|
|
print(
|
|
f"{cred}Error processing {video_file}: {decoded_error}{creset}\n",
|
|
file=sys.stderr,
|
|
)
|
|
else:
|
|
print(
|
|
f"{cred}Error processing {video_file}: No output from subprocess{creset}\n",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
except KeyboardInterrupt:
|
|
print(f"{cyellow}Conversions cancelled, cleaning up...{creset}")
|
|
|
|
# If we have a partial video converted, delete it due to the failure
|
|
if current_file:
|
|
deletefile(current_file)
|
|
current_file = None
|
|
|
|
exit()
|
|
|
|
except Exception as e:
|
|
# If we have a partial video converted, delete it due to the failure
|
|
if current_file:
|
|
deletefile(current_file)
|
|
|
|
# Handle other exceptions
|
|
if debug:
|
|
print(
|
|
f"{cred}Error processing {video_file}: {str(e)}{creset}\n",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
else:
|
|
# Print success message if conversion is successful
|
|
print(f"{ccyan}Successfully converted video to output file:{creset}")
|
|
print(f"{output_file}\n")
|
|
|
|
try:
|
|
os.chmod(output_file, 0o777)
|
|
except PermissionError:
|
|
print(f"{cred}PermissionError on: '{output_file}'{creset}")
|
|
|
|
# Delete the original file if requested
|
|
if delete:
|
|
deletefile(video_file)
|
|
|
|
# Track this original and its converted output as successfully converted
|
|
converted_files.add(os.path.realpath(video_file))
|
|
converted_files.add(os.path.realpath(output_file))
|
|
|
|
# Remove the now successfully converted filepath
|
|
current_file = None
|
|
|
|
# Folder rename second pass
|
|
if rename and converted_files:
|
|
# Collect all intermediate dirs touched by successful conversions, deepest first
|
|
dirs_to_consider = {}
|
|
for original_file in converted_files:
|
|
for d in get_intermediate_dirs(input_path, original_file):
|
|
dirs_to_consider[d] = True
|
|
|
|
# For each candidate dir, check that ALL video files under it were converted
|
|
dirs_to_rename = []
|
|
for d in sorted(dirs_to_consider, key=len, reverse=True):
|
|
all_converted = True
|
|
for root, _, files in os.walk(d):
|
|
for f in files:
|
|
if f.lower().endswith(VIDEO_EXTENSIONS):
|
|
fpath = os.path.realpath(os.path.join(root, f))
|
|
if fpath not in converted_files:
|
|
all_converted = False
|
|
break
|
|
if not all_converted:
|
|
break
|
|
|
|
if all_converted:
|
|
new_dirname = apply_resolution_rename(os.path.basename(d), max_height)
|
|
if new_dirname != os.path.basename(d):
|
|
new_dir = os.path.join(os.path.dirname(d), new_dirname)
|
|
dirs_to_rename.append((d, new_dir))
|
|
|
|
# Rename dirs deepest first to avoid breaking paths
|
|
for old_dir, new_dir in dirs_to_rename:
|
|
if os.path.exists(new_dir):
|
|
print(
|
|
f"{cred}Error: Target directory {new_dir} already exists, skipping rename.{creset}"
|
|
)
|
|
continue
|
|
os.rename(old_dir, new_dir)
|
|
if debug:
|
|
print(f"Renamed directory: {old_dir} -> {new_dir}")
|
|
|
|
return True
|
|
|
|
|
|
def parse_args():
|
|
"""
|
|
Parse command line arguments.
|
|
|
|
Returns:
|
|
argparse.Namespace: Parsed arguments.
|
|
"""
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Convert videos taller than a certain height to 720p resolution with x265 encoding and MKV container."
|
|
)
|
|
|
|
parser.add_argument(
|
|
"input_path",
|
|
nargs="?",
|
|
default=os.getcwd(),
|
|
help="directory path to search for video files (default: current directory)",
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output-path",
|
|
default=None,
|
|
help="directory path to save converted videos (default: ./converted, or alongside originals with --delete)",
|
|
)
|
|
parser.add_argument(
|
|
"-mh",
|
|
"--max-height",
|
|
type=int,
|
|
default=720,
|
|
help="maximum height of videos to be converted (default: 720)",
|
|
)
|
|
parser.add_argument(
|
|
"-del",
|
|
"--delete",
|
|
action="store_true",
|
|
help="delete original files after successful conversion",
|
|
)
|
|
parser.add_argument(
|
|
"-y",
|
|
"--yes",
|
|
action="store_true",
|
|
help="skip confirmation prompts",
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--rename",
|
|
action="store_true",
|
|
help="rename converted files and intermediate folders to reflect the target resolution",
|
|
)
|
|
parser.add_argument(
|
|
"--debug",
|
|
action="store_true",
|
|
help="enable debug mode for printing additional error messages",
|
|
)
|
|
|
|
# Enable autocomplete for argparse
|
|
argcomplete.autocomplete(parser)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.output_path is None and not args.delete:
|
|
args.output_path = os.path.join(os.getcwd(), "converted")
|
|
|
|
# Warn and confirm if --delete is active
|
|
if args.delete:
|
|
print(f"{cyellow}WARNING: Delete option selected!{creset}")
|
|
if not args.yes and not user_confirm(
|
|
"Are you sure you wish to delete original files after successful conversion? [Y/N] ?",
|
|
color="yellow",
|
|
):
|
|
print(f"{cgreen}Exiting!{creset}")
|
|
exit()
|
|
|
|
return args
|
|
|
|
|
|
def main():
|
|
"""
|
|
Main function to parse command line arguments and initiate video conversion.
|
|
"""
|
|
|
|
args = parse_args()
|
|
|
|
# Convert videos
|
|
convert_videos(
|
|
args.input_path,
|
|
args.output_path,
|
|
args.max_height,
|
|
args.delete,
|
|
args.rename,
|
|
args.debug,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Execute main function when the script is run directly
|
|
main()
|