Passed the code in formatter

This commit is contained in:
Fabrice Quenneville 2026-05-30 15:28:52 -04:00
parent 87847859c5
commit c31007498e
3 changed files with 232 additions and 160 deletions

View File

@ -41,15 +41,13 @@ def get_ffmpeg_codecs(codec_type="S"):
"""
try:
# Run ffmpeg -codecs
result = subprocess.run(["ffmpeg", "-codecs"],
capture_output=True,
text=True,
check=True)
result = subprocess.run(
["ffmpeg", "-codecs"], capture_output=True, text=True, check=True
)
output = result.stdout
# Extract codecs using regex
codec_pattern = re.compile(rf"^[ D.E]+{codec_type}[ A-Z.]+ (\S+)",
re.MULTILINE)
codec_pattern = re.compile(rf"^[ D.E]+{codec_type}[ A-Z.]+ (\S+)", re.MULTILINE)
codecs = codec_pattern.findall(output)
return set(codecs) # Return as a set for easy lookup
@ -60,7 +58,7 @@ def get_ffmpeg_codecs(codec_type="S"):
def findfreename(filepath, attempt=0):
'''
"""
Given a filepath, it will try to find a free filename by appending a number to the name.
First, it tries using the filepath passed in the argument, then adds a number to the end. If all fail, it appends (#).
@ -70,10 +68,10 @@ def findfreename(filepath, attempt=0):
Returns:
str: The first free filepath found.
'''
"""
attempt += 1
filename = str(filepath)[:str(filepath).rindex(".")]
extension = str(filepath)[str(filepath).rindex("."):]
filename = str(filepath)[: str(filepath).rindex(".")]
extension = str(filepath)[str(filepath).rindex(".") :]
copynumpath = filename + f"({attempt})" + extension
if not os.path.exists(filepath) and attempt <= 2:
@ -84,13 +82,13 @@ def findfreename(filepath, attempt=0):
def deletefile(filepath):
'''Delete a file, Returns a boolean
"""Delete a file, Returns a boolean
Args:
filepath : A string containing the full filepath
Returns:
Bool : The success of the operation
'''
"""
try:
os.remove(filepath)
except OSError:
@ -102,7 +100,7 @@ def deletefile(filepath):
def ensure_output_path(output_path):
'''
"""
Ensure that the output directory exists. If it doesn't, attempt to create it.
Args:
@ -110,15 +108,14 @@ def ensure_output_path(output_path):
Returns:
None
'''
"""
try:
os.makedirs(output_path, exist_ok=True)
# Attempts to create the output directory if it doesn't exist.
# exist_ok=True ensures that an OSError is not raised if the directory already exists.
except OSError as e:
# If an OSError occurs during directory creation, print an error message and exit the program.
print(f"{cred}Failed to create output directory:{creset} {e}",
file=sys.stderr)
print(f"{cred}Failed to create output directory:{creset} {e}", file=sys.stderr)
sys.exit(1)
@ -140,13 +137,23 @@ def has_supported_subs(video_file, supported_subtitle_codecs, debug=False):
False otherwise or in case of an error.
"""
try:
result = subprocess.run([
'ffprobe', '-v', 'error', '-select_streams', 's', '-show_entries',
'stream=index,codec_name', '-of', 'json', video_file
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"s",
"-show_entries",
"stream=index,codec_name",
"-of",
"json",
video_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
text=True,
)
if result.returncode != 0:
if debug:
@ -161,8 +168,8 @@ def has_supported_subs(video_file, supported_subtitle_codecs, debug=False):
return False # Assume no supported subtitles if JSON is malformed
return any(
stream.get("codec_name") in supported_subtitle_codecs
for stream in streams)
stream.get("codec_name") in supported_subtitle_codecs for stream in streams
)
except Exception as e:
if debug:
@ -171,7 +178,7 @@ def has_supported_subs(video_file, supported_subtitle_codecs, debug=False):
def find_videos_to_convert(input_path, max_height=720, debug=False):
'''
"""
Find video files in the specified directory tree that meet conversion criteria.
Args:
@ -181,10 +188,9 @@ def find_videos_to_convert(input_path, max_height=720, debug=False):
Returns:
list: A list of paths to video files that meet the conversion criteria.
'''
"""
# Print a message indicating the start of the scanning process
print(
f"{cgreen}Scanning {input_path} for video files to convert.{creset}\n")
print(f"{cgreen}Scanning {input_path} for video files to convert.{creset}\n")
# Initialize lists to store all video files and valid video files
video_files = []
@ -195,13 +201,13 @@ def find_videos_to_convert(input_path, max_height=720, debug=False):
for file in files:
fullpath = os.path.join(root, file)
# Check if the file is a video file with one of the specified extensions
if (file.lower().endswith(
('.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.divx'))
and os.path.isfile(fullpath)):
if file.lower().endswith(
(".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".divx")
) and os.path.isfile(fullpath):
video_files.append(fullpath)
# List of encodings to try for decoding output from ffprobe
ENCODINGS_TO_TRY = ['utf-8', 'latin-1', 'iso-8859-1']
ENCODINGS_TO_TRY = ["utf-8", "latin-1", "iso-8859-1"]
# Iterate through each video file
for video_file in video_files:
@ -209,28 +215,37 @@ def find_videos_to_convert(input_path, max_height=720, debug=False):
# Run ffprobe command to get video resolution
resolution_output = subprocess.check_output(
[
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=width,height', '-of', 'csv=p=0',
video_file
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"csv=p=0",
video_file,
],
stderr=subprocess.STDOUT)
stderr=subprocess.STDOUT,
)
# Attempt to decode the output using different encodings
dimensions_str = None
for encoding in ENCODINGS_TO_TRY:
try:
dimensions_str = resolution_output.decode(
encoding).strip().rstrip(',')
dimensions_str = (
resolution_output.decode(encoding).strip().rstrip(",")
)
dimensions = [
int(d) for d in dimensions_str.split(',')
if d.strip().isdigit()
int(d) for d in dimensions_str.split(",") if d.strip().isdigit()
]
if len(dimensions) >= 2:
width, height = dimensions[:2]
else:
raise ValueError(
f"Unexpected dimensions format: {dimensions_str}")
f"Unexpected dimensions format: {dimensions_str}"
)
break # Stop trying encodings once successful decoding and conversion
except (UnicodeDecodeError, ValueError):
continue # Try the next encoding
@ -240,7 +255,8 @@ def find_videos_to_convert(input_path, max_height=720, debug=False):
if debug:
print(
f"{cred}Error processing {video_file}: Unable to decode output from ffprobe using any of the specified encodings.{creset}",
file=sys.stderr)
file=sys.stderr,
)
continue # Skip this video and continue with the next one
# Ignore vertical videos
@ -258,17 +274,20 @@ def find_videos_to_convert(input_path, max_height=720, debug=False):
decoded_error = e.output.decode("utf-8")
print(
f"{cred}Error processing {video_file}: {decoded_error}{creset}\n",
file=sys.stderr)
file=sys.stderr,
)
else:
print(
f"{cred}Error processing {video_file}: No output from subprocess{creset}\n",
file=sys.stderr)
file=sys.stderr,
)
except ValueError as e:
if debug:
# Print error message if value error occurs during decoding
print(
f"{cred}Error processing {video_file}: ValueError decoding the file{creset}\n",
file=sys.stderr)
file=sys.stderr,
)
valid_videos_files.sort()
@ -276,7 +295,7 @@ def find_videos_to_convert(input_path, max_height=720, debug=False):
def convert_videos(input_path, output_path, max_height=720, debug=False):
'''
"""
Convert videos taller than a specified height to 720p resolution with x265 encoding and MKV container.
Args:
@ -287,7 +306,7 @@ def convert_videos(input_path, output_path, max_height=720, debug=False):
Returns:
bool: True if conversion succeeds, False otherwise.
'''
"""
# Get supported subtitle codecs
supported_subtitle_codecs = get_ffmpeg_codecs("S")
@ -326,22 +345,30 @@ def convert_videos(input_path, output_path, max_height=720, debug=False):
# Generate output file path and name
output_file = findfreename(
os.path.join(
output_path,
os.path.splitext(os.path.basename(video_file))[0] + '.mkv'))
output_path, os.path.splitext(os.path.basename(video_file))[0] + ".mkv"
)
)
try:
# Get original video width and height
video_info = subprocess.run([
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=width,height', '-of', 'csv=p=0',
video_file
video_info = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"csv=p=0",
video_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
width, height = map(
int,
video_info.stdout.strip().rstrip(',').split(','))
text=True,
)
width, height = map(int, video_info.stdout.strip().rstrip(",").split(","))
# Calculate the scaled width maintaining aspect ratio
scaled_width = int(max_height * (width / height))
@ -353,38 +380,55 @@ def convert_videos(input_path, output_path, max_height=720, debug=False):
current_file = output_file
# Check if the video has supported subtitles
include_subs = has_supported_subs(video_file,
supported_subtitle_codecs, debug)
include_subs = has_supported_subs(
video_file, supported_subtitle_codecs, debug
)
# Construct the FFmpeg command dynamically
ffmpeg_command = [
'ffmpeg', '-n', '-i', video_file, '-map', '0:v', '-c:v',
'libx265', '-vf', f'scale={scaled_width}:{max_height}',
'-preset', 'medium', '-crf', '28', '-map', '0:a?', '-c:a',
'copy'
"ffmpeg",
"-n",
"-i",
video_file,
"-map",
"0:v",
"-c:v",
"libx265",
"-vf",
f"scale={scaled_width}:{max_height}",
"-preset",
"medium",
"-crf",
"28",
"-map",
"0:a?",
"-c:a",
"copy",
]
# Only include subtitles if they are supported
if include_subs:
ffmpeg_command.extend(['-map', '0:s?', '-c:s', 'copy'])
ffmpeg_command.extend(["-map", "0:s?", "-c:s", "copy"])
ffmpeg_command.append(output_file)
# Print the FFmpeg command as a single string
if (debug):
print("FFmpeg command: " + ' '.join(ffmpeg_command))
if debug:
print("FFmpeg command: " + " ".join(ffmpeg_command))
# Run the FFmpeg command
process = subprocess.run(ffmpeg_command,
process = subprocess.run(
ffmpeg_command,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
stderr=subprocess.STDOUT,
)
# Check if ffmpeg process returned successfully
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode,
process.args,
output=process.stderr)
raise subprocess.CalledProcessError(
process.returncode, process.args, output=process.stderr
)
except subprocess.CalledProcessError as e:
# If we have a partial video converted, delete it due to the failure
@ -397,11 +441,13 @@ def convert_videos(input_path, output_path, max_height=720, debug=False):
decoded_error = e.output.decode("utf-8")
print(
f"{cred}Error processing {video_file}: {decoded_error}{creset}\n",
file=sys.stderr)
file=sys.stderr,
)
else:
print(
f"{cred}Error processing {video_file}: No output from subprocess{creset}\n",
file=sys.stderr)
file=sys.stderr,
)
except KeyboardInterrupt:
print(f"{cyellow}Conversions cancelled, cleaning up...{creset}")
@ -422,13 +468,12 @@ def convert_videos(input_path, output_path, max_height=720, debug=False):
if debug:
print(
f"{cred}Error processing {video_file}: {str(e)}{creset}\n",
file=sys.stderr)
file=sys.stderr,
)
else:
# Print success message if conversion is successful
print(
f"{ccyan}Successfully converted video to output file:{creset}")
print(f"{ccyan}Successfully converted video to output file:{creset}")
print(f"{output_file}\n")
try:
@ -443,39 +488,40 @@ def convert_videos(input_path, output_path, max_height=720, debug=False):
def main():
'''
"""
Main function to parse command line arguments and initiate video conversion.
'''
"""
# Create argument parser
parser = argparse.ArgumentParser(
description=
'Convert videos taller than a certain height to 720p resolution with x265 encoding and MKV container.'
description="Convert videos taller than a certain height to 720p resolution with x265 encoding and MKV container."
)
# Define command line arguments
parser.add_argument(
'input_path',
nargs='?',
"input_path",
nargs="?",
default=os.getcwd(),
help=
'directory path to search for video files (default: current directory)'
help="directory path to search for video files (default: current directory)",
)
parser.add_argument(
'-o',
'--output-path',
default=os.path.join(os.getcwd(), 'converted'),
help='directory path to save converted videos (default: ./converted)')
"-o",
"--output-path",
default=os.path.join(os.getcwd(), "converted"),
help="directory path to save converted videos (default: ./converted)",
)
parser.add_argument(
'-mh',
'--max-height',
"-mh",
"--max-height",
type=int,
default=720,
help='maximum height of videos to be converted (default: 720)')
help="maximum height of videos to be converted (default: 720)",
)
parser.add_argument(
'--debug',
action='store_true',
help='enable debug mode for printing additional error messages')
"--debug",
action="store_true",
help="enable debug mode for printing additional error messages",
)
# Enable autocomplete for argparse
argcomplete.autocomplete(parser)
@ -484,8 +530,7 @@ def main():
args = parser.parse_args()
# Convert videos
convert_videos(args.input_path, args.output_path, args.max_height,
args.debug)
convert_videos(args.input_path, args.output_path, args.max_height, args.debug)
if __name__ == "__main__":

View File

@ -17,7 +17,7 @@ cred = colorama.Fore.RED
def autorename(input_path, max_height=720, debug=False):
'''
"""
Rename video files and folders in the specified directory tree that contain a resolution
in their name to the specified max_height resolution.
@ -25,17 +25,17 @@ def autorename(input_path, max_height=720, debug=False):
input_path (str): The path of the directory to search for video files and folders.
max_height (int, optional): The maximum height (in pixels) of the video to consider for conversion. Default is 720.
debug (bool, optional): If True, print debug messages. Default is False.
'''
"""
# Define patterns to search for various resolutions
resolution_patterns = {
2160: r'(4096x2160|3840x2160|2880p|2160p|2160|1440p|4k)',
1080: r'(1920x1080|1080p|1080)',
720: r'(1280x720|720p|720)',
2160: r"(4096x2160|3840x2160|2880p|2160p|2160|1440p|4k)",
1080: r"(1920x1080|1080p|1080)",
720: r"(1280x720|720p|720)",
}
# Wrap each pattern to only match when isolated by non-alphanumeric chars or string boundaries
resolution_patterns = {
res: rf'(?<![a-zA-Z0-9]){pat}(?![a-zA-Z0-9])'
res: rf"(?<![a-zA-Z0-9]){pat}(?![a-zA-Z0-9])"
for res, pat in resolution_patterns.items()
}
@ -48,12 +48,12 @@ def autorename(input_path, max_height=720, debug=False):
for resolution, pattern in resolution_patterns.items():
# Only process resolutions greater than max_height
if resolution > max_height and re.search(
pattern, filename, re.IGNORECASE):
pattern, filename, re.IGNORECASE
):
old_file = os.path.join(dirpath, filename)
new_filename = re.sub(pattern,
f'{max_height}p',
filename,
flags=re.IGNORECASE)
new_filename = re.sub(
pattern, f"{max_height}p", filename, flags=re.IGNORECASE
)
new_file = os.path.join(dirpath, new_filename)
if old_file != new_file:
files_to_rename.append((old_file, new_file))
@ -64,12 +64,12 @@ def autorename(input_path, max_height=720, debug=False):
for resolution, pattern in resolution_patterns.items():
# Only process resolutions greater than max_height
if resolution > max_height and re.search(
pattern, dirname, re.IGNORECASE):
pattern, dirname, re.IGNORECASE
):
old_dir = os.path.join(dirpath, dirname)
new_dirname = re.sub(pattern,
f'{max_height}p',
dirname,
flags=re.IGNORECASE)
new_dirname = re.sub(
pattern, f"{max_height}p", dirname, flags=re.IGNORECASE
)
new_dir = os.path.join(dirpath, new_dirname)
if old_dir != new_dir:
dirs_to_rename.append((old_dir, new_dir))
@ -82,7 +82,7 @@ def autorename(input_path, max_height=720, debug=False):
continue
os.rename(old_file, new_file)
if debug:
print(f'Renamed file: {old_file} -> {new_file}')
print(f"Renamed file: {old_file} -> {new_file}")
# Rename directories after
for old_dir, new_dir in sorted(dirs_to_rename, key=lambda x: -len(x[0])):
@ -91,38 +91,38 @@ def autorename(input_path, max_height=720, debug=False):
continue
os.rename(old_dir, new_dir)
if debug:
print(f'Renamed directory: {old_dir} -> {new_dir}')
print(f"Renamed directory: {old_dir} -> {new_dir}")
def main():
'''
"""
Main function to parse command line arguments and initiate file renamings.
'''
"""
# Create argument parser
parser = argparse.ArgumentParser(
description=
'Rename video files containing resolutions in their filenames to a specified max_height resolution.'
description="Rename video files containing resolutions in their filenames to a specified max_height resolution."
)
# Define command line arguments
parser.add_argument(
'input_path',
nargs='?',
"input_path",
nargs="?",
default=os.getcwd(),
help=
'directory path to search for video files (default: current directory)'
help="directory path to search for video files (default: current directory)",
)
parser.add_argument(
'-mh',
'--max-height',
"-mh",
"--max-height",
type=int,
default=720,
help='maximum height of videos to be converted (default: 720)')
help="maximum height of videos to be converted (default: 720)",
)
parser.add_argument(
'--debug',
action='store_true',
help='enable debug mode for printing additional messages')
"--debug",
action="store_true",
help="enable debug mode for printing additional messages",
)
# Enable autocomplete for argparse
argcomplete.autocomplete(parser)

View File

@ -32,20 +32,48 @@ def process_subtitles(file_path, track, command):
if command == "remove":
# Remove only the specified subtitle track while keeping all other streams
ffmpeg_command = [
"ffmpeg", "-i", file_path, "-map", "0", "-map", f"-0:s:{track}",
"-c", "copy", output_file
"ffmpeg",
"-i",
file_path,
"-map",
"0",
"-map",
f"-0:s:{track}",
"-c",
"copy",
output_file,
]
elif command == "keep":
# Keep only the specified subtitle track while preserving video, audio, and metadata
ffmpeg_command = [
"ffmpeg", "-i", file_path, "-map", "0:v", "-map", "0:a", "-map",
f"0:s:{track}", "-map", "0:t?", "-c", "copy", output_file
"ffmpeg",
"-i",
file_path,
"-map",
"0:v",
"-map",
"0:a",
"-map",
f"0:s:{track}",
"-map",
"0:t?",
"-c",
"copy",
output_file,
]
elif command == "none":
# Remove all subtitle tracks
ffmpeg_command = [
"ffmpeg", "-i", file_path, "-map", "0", "-map", "-0:s", "-c",
"copy", output_file
"ffmpeg",
"-i",
file_path,
"-map",
"0",
"-map",
"-0:s",
"-c",
"copy",
output_file,
]
else:
print(f"{cred}Invalid command: {command}{creset}")
@ -53,10 +81,9 @@ def process_subtitles(file_path, track, command):
try:
# Execute the ffmpeg command and capture output
result = subprocess.run(ffmpeg_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
result = subprocess.run(
ffmpeg_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if result.returncode == 0:
print(
@ -91,13 +118,16 @@ def main():
# Create argument parser
parser = argparse.ArgumentParser(
description="Manage subtitle tracks in video files.")
description="Manage subtitle tracks in video files."
)
# Define command line arguments
# Add a positional argument for the command
parser.add_argument('command',
choices=['remove', 'keep', 'none'],
help='Command to run (remove, keep, or none)')
parser.add_argument(
"command",
choices=["remove", "keep", "none"],
help="Command to run (remove, keep, or none)",
)
# Add other arguments with both short and long options, including defaults
parser.add_argument(
@ -105,19 +135,16 @@ def main():
"--track",
type=int,
default=0,
help=
"Subtitle track index (default is 0). Use 'none' to remove all subtitles."
help="Subtitle track index (default is 0). Use 'none' to remove all subtitles.",
)
parser.add_argument("-f",
"--file",
type=str,
help="Path to a specific video file.")
parser.add_argument("-f", "--file", type=str, help="Path to a specific video file.")
parser.add_argument(
"-d",
"--dir",
type=str,
default=os.getcwd(),
help="Directory to process (default is current directory).")
help="Directory to process (default is current directory).",
)
# Enable autocomplete for command-line arguments
argcomplete.autocomplete(parser)