#!/usr/bin/env python3 import re import shlex import subprocess import sys from pathlib import Path from typing import Dict, List, Tuple USAGE = """\ ffcompare -cq Examples: ffcompare -cq 18-22 "./ffmpeg -i '/path/Video.mkv' -cq {cq} -y '/out/{Movie name}/{Movie name}-{cq}.{ext}'" ffcompare -cq 18-22 -- ./ffmpeg -i "/path/Video.mkv" -cq {cq} -y "/out/{Movie name}/{Movie name}-{cq}.{ext}" Notes: - Use {cq} in the ffmpeg command where the CQ value should be substituted. - {Movie name} and {ext} are derived from the -i input filename. - If your ffmpeg command contains options like -cq, prefer quoting the whole command or use -- to separate ffcompare options from the ffmpeg command. """ def eprint(msg: str) -> None: print(msg, file=sys.stderr) def parse_cli(argv: List[str]) -> Tuple[str, List[str]]: if not argv or "-h" in argv or "--help" in argv: print(USAGE) sys.exit(0 if argv else 1) cq_value = None cmd_parts: List[str] = [] skip_next = False for i, arg in enumerate(argv): if skip_next: skip_next = False continue if arg == "--": cmd_parts.extend(argv[i + 1 :]) break if arg in ("-cq", "--cq"): if i + 1 >= len(argv): eprint("error: -cq requires a value") print(USAGE) sys.exit(1) cq_value = argv[i + 1] skip_next = True continue cmd_parts.append(arg) if not cq_value: eprint("error: -cq is required") print(USAGE) sys.exit(1) if not cmd_parts: eprint("error: ffmpeg command is required") print(USAGE) sys.exit(1) return cq_value, cmd_parts def parse_cq_values(spec: str) -> List[int]: values: List[int] = [] for part in spec.split(","): part = part.strip() if not part: continue if "-" in part: start_s, end_s = part.split("-", 1) if start_s == "" or end_s == "": raise ValueError(f"invalid range: {part}") start = int(start_s) end = int(end_s) step = 1 if end >= start else -1 for v in range(start, end + step, step): values.append(v) else: values.append(int(part)) unique: List[int] = [] seen = set() for v in values: if v not in seen: unique.append(v) seen.add(v) return sorted(unique) def split_command(parts: List[str]) -> List[str]: if len(parts) == 1: return shlex.split(parts[0]) return parts def find_input_path(tokens: List[str]) -> str: for i, tok in enumerate(tokens): if tok == "-i" and i + 1 < len(tokens): return tokens[i + 1] return "" def find_output_path(tokens: List[str]) -> str: return tokens[-1] if tokens else "" def slugify_filename(name: str) -> str: cleaned = re.sub(r"[\\/:*?\"<>|]", "-", name) cleaned = re.sub(r"\s+", "-", cleaned.strip()) cleaned = re.sub(r"[^A-Za-z0-9._-]", "-", cleaned) cleaned = re.sub(r"-+", "-", cleaned) return cleaned or "movie" def substitute_tokens(tokens: List[str], replacements: Dict[str, str]) -> List[str]: out = [] for tok in tokens: new_tok = tok for key, val in replacements.items(): new_tok = new_tok.replace(key, val) out.append(new_tok) return out def run_command(tokens: List[str], label: str) -> None: cmd = shlex.join(tokens) print(f"{label}: {cmd}") result = subprocess.run(tokens) if result.returncode != 0: eprint(f"error: command failed with exit code {result.returncode}") sys.exit(result.returncode) def ffprobe_duration(ffprobe_bin: str, path: str) -> float: cmd = [ ffprobe_bin, "-v", "error", "-show_entries", "format=duration", "-of", "default=nk=1:nw=1", path, ] try: out = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode().strip() return float(out) if out else 0.0 except subprocess.CalledProcessError as exc: eprint(f"error: ffprobe failed for {path}: {exc.output.decode(errors='ignore')}") sys.exit(1) except ValueError: eprint(f"error: could not parse ffprobe duration for {path}") sys.exit(1) def format_mbps(mbps: float) -> str: if mbps >= 10: return f"{mbps:.0f}" return f"{mbps:.1f}".rstrip("0").rstrip(".") def format_gbh(gbh: float) -> str: return f"{gbh:.1f}".rstrip("0").rstrip(".") def format_mb(mb: float) -> str: if mb >= 10: return f"{mb:.0f}" return f"{mb:.1f}".rstrip("0").rstrip(".") def format_gb(gb: float) -> str: return f"{gb:.1f}".rstrip("0").rstrip(".") def build_info(label: str, size_bytes: int, duration: float, full_duration: float) -> str: if duration <= 0: return f"{label}, 0 Mbps, 0GB/h, 0MB/0Gb" bytes_per_sec = size_bytes / duration mbps = (bytes_per_sec * 8) / 1e6 gbh = bytes_per_sec * 3600 / 1e9 file_mb = size_bytes / 1e6 projected_full_bytes = bytes_per_sec * full_duration if full_duration > 0 else 0 full_gb = projected_full_bytes / 1e9 return ( f"{label}, {format_mbps(mbps)} Mbps, {format_gbh(gbh)}GB/h, " f"{format_mb(file_mb)}MB/{format_gb(full_gb)}Gb" ) def build_original_info( label: str, size_bytes: int, full_duration: float, trimmed_duration: float, ) -> str: if full_duration <= 0: return f"{label}, 0 Mbps, 0GB/h, 0MB/0Gb" bytes_per_sec = size_bytes / full_duration mbps = (bytes_per_sec * 8) / 1e6 gbh = bytes_per_sec * 3600 / 1e9 trimmed_bytes = size_bytes * (trimmed_duration / full_duration) if trimmed_duration > 0 else 0 file_mb = trimmed_bytes / 1e6 full_gb = size_bytes / 1e9 return ( f"{label}, {format_mbps(mbps)} Mbps, {format_gbh(gbh)}GB/h, " f"{format_mb(file_mb)}MB/{format_gb(full_gb)}Gb" ) def format_timestamp(seconds: int) -> str: hours = seconds // 3600 minutes = (seconds % 3600) // 60 secs = seconds % 60 return f"{hours:02d}:{minutes:02d}:{secs:02d}" def format_timestamp_label(seconds: int) -> str: minutes = seconds // 60 secs = seconds % 60 return f"{minutes:02d}-{secs:02d}" def take_screenshot( ffmpeg_bin: str, video_path: str, timestamp: int, output_path: Path, image_format: str, ) -> None: ts = format_timestamp(timestamp) cmd = [ ffmpeg_bin, "-hide_banner", "-loglevel", "error", "-ss", ts, "-i", video_path, "-frames:v", "1", "-map", "0:v:0", ] if image_format == "png": cmd += ["-c:v", "png"] elif image_format == "jpg": cmd += ["-c:v", "mjpeg", "-q:v", "2", "-qmin", "2", "-qmax", "2"] else: eprint(f"error: unsupported screenshot format: {image_format}") sys.exit(1) cmd += ["-y", str(output_path)] result = subprocess.run(cmd) if result.returncode != 0: eprint(f"error: screenshot failed for {video_path} at {ts}") sys.exit(result.returncode) def html_escape(value: str) -> str: return ( value.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) ) def build_html( movie_name: str, cq_values: List[int], times: List[int], screenshots: Dict[str, Dict[str, str]], info_map: Dict[str, str], output_dir: Path, command_text: str, ) -> None: ref_keys = ["original"] + [f"cq-{cq}" for cq in cq_values] formats = ["jpg", "png"] options_html = [""] for cq in cq_values: options_html.append(f"") left_default = f"cq-{cq_values[0]}" if cq_values else "original" right_default = "original" rows = [] for t in times: label = format_timestamp_label(t) left_src = screenshots[left_default][label]["jpg"] right_src = screenshots[right_default][label]["jpg"] meta_attrs = [] img_attrs = [] for key in ref_keys: meta_attrs.append( f"data-ref-{key}=\"{html_escape(info_map[key])}\"" ) for fmt in formats: img_attrs.append( f"data-ref-{key}-{fmt}=\"{html_escape(screenshots[key][label][fmt])}\"" ) meta_attr_str = " ".join(meta_attrs) img_attr_str = " ".join(img_attrs) row_html = f"""
{html_escape(info_map[left_default])}
\"left
{html_escape(info_map[right_default])}
\"right
""" rows.append(row_html) html = f""" {html_escape(movie_name)} - ffcompare

{html_escape(movie_name)}

ffmpeg command
{html_escape(command_text)}
{"".join(rows)}
""" (output_dir / "index.html").write_text(html, encoding="utf-8") def main() -> None: cq_spec, cmd_parts = parse_cli(sys.argv[1:]) try: cq_values = parse_cq_values(cq_spec) except ValueError as exc: eprint(f"error: {exc}") sys.exit(1) if not cq_values: eprint("error: no CQ values parsed") sys.exit(1) template_tokens = split_command(cmd_parts) if not template_tokens: eprint("error: ffmpeg command is empty") sys.exit(1) if len(cmd_parts) == 1: raw_command = cmd_parts[0] else: raw_command = shlex.join(cmd_parts) if "{cq}" not in " ".join(template_tokens): eprint("error: ffmpeg command must include {cq} placeholder") sys.exit(1) input_path = find_input_path(template_tokens) if not input_path: eprint("error: could not find -i input file in ffmpeg command") sys.exit(1) input_path_obj = Path(input_path) movie_name = input_path_obj.stem ext = input_path_obj.suffix.lstrip(".") movie_slug = slugify_filename(movie_name) ffmpeg_bin = template_tokens[0] ffprobe_bin = "ffprobe" ffmpeg_path = Path(ffmpeg_bin) if ffmpeg_path.exists(): candidate = ffmpeg_path.with_name("ffprobe") if candidate.exists(): ffprobe_bin = str(candidate) output_paths: Dict[int, str] = {} commands: List[Tuple[int, List[str], str]] = [] seen_outputs: Dict[str, int] = {} for cq in cq_values: replacements = { "{cq}": str(cq), "{Movie name}": movie_name, "{Movie-name}": movie_slug, "{ext}": ext, } cmd_tokens = substitute_tokens(template_tokens, replacements) output_path = find_output_path(cmd_tokens) if not output_path: eprint("error: could not determine output path from ffmpeg command") sys.exit(1) if output_path in seen_outputs: eprint( "error: output path collision between CQ values " f"{seen_outputs[output_path]} and {cq}: {output_path}" ) sys.exit(1) seen_outputs[output_path] = cq output_paths[cq] = output_path commands.append((cq, cmd_tokens, output_path)) output_dir = Path(output_paths[cq_values[0]]).parent for cq, path in output_paths.items(): if Path(path).parent != output_dir: eprint("error: output paths must be in the same directory") sys.exit(1) output_dir.mkdir(parents=True, exist_ok=True) for cq, cmd_tokens, output_path in commands: Path(output_path).parent.mkdir(parents=True, exist_ok=True) run_command(cmd_tokens, f"ffmpeg cq={cq}") full_duration = ffprobe_duration(ffprobe_bin, input_path) output_durations: Dict[int, float] = {} for cq, path in output_paths.items(): output_durations[cq] = ffprobe_duration(ffprobe_bin, path) trimmed_duration = min(output_durations.values()) if output_durations else 0 if full_duration > 0: trimmed_duration = min(trimmed_duration, full_duration) max_time = max(0, int(trimmed_duration) - 1) times = list(range(60, max_time + 1, 60)) if not times: eprint("warning: trimmed duration is under 1 minute; no screenshots will be generated") screenshot_dir = output_dir / "screenshots" screenshot_dir.mkdir(parents=True, exist_ok=True) screenshots: Dict[str, Dict[str, Dict[str, str]]] = {"original": {}} for cq in cq_values: screenshots[f"cq-{cq}"] = {} for t in times: label = format_timestamp_label(t) original_base = f"{movie_slug}-original-{label}" original_png = screenshot_dir / f"{original_base}.png" original_jpg = screenshot_dir / f"{original_base}.jpg" take_screenshot(ffmpeg_bin, input_path, t, original_png, "png") take_screenshot(ffmpeg_bin, input_path, t, original_jpg, "jpg") screenshots["original"][label] = { "png": str(Path("screenshots") / original_png.name), "jpg": str(Path("screenshots") / original_jpg.name), } for cq in cq_values: output_path = output_paths[cq] base = f"{movie_slug}-{cq}-{label}" shot_png = screenshot_dir / f"{base}.png" shot_jpg = screenshot_dir / f"{base}.jpg" take_screenshot(ffmpeg_bin, output_path, t, shot_png, "png") take_screenshot(ffmpeg_bin, output_path, t, shot_jpg, "jpg") screenshots[f"cq-{cq}"][label] = { "png": str(Path("screenshots") / shot_png.name), "jpg": str(Path("screenshots") / shot_jpg.name), } info_map: Dict[str, str] = {} for cq in cq_values: output_path = Path(output_paths[cq]) size_bytes = output_path.stat().st_size if output_path.exists() else 0 duration = output_durations.get(cq, 0) info_map[f"cq-{cq}"] = build_info( str(cq), size_bytes, duration, full_duration ) input_size = Path(input_path).stat().st_size if Path(input_path).exists() else 0 info_map["original"] = build_original_info( "original", input_size, full_duration, trimmed_duration ) build_html( movie_name, cq_values, times, screenshots, info_map, output_dir, raw_command ) print(f"Done. Outputs in {output_dir}") if __name__ == "__main__": main()