Files
moss/parse_cues.py
2026-05-22 10:51:18 +08:00

513 lines
17 KiB
Python

#!/usr/bin/env python3
"""Scan a directory for cue-sheet albums and split them into tracks.
Supported album sources:
- cue + flac
- cue + wav
- cue + ape
Strategy:
- flac / wav: split directly when possible
- ape: try direct split first by default, then fall back to transcoding to flac
Output layout:
- one folder per cue sheet, defaulting to: <cue_stem>.split/
Examples:
python parse_cues.py ~/Music/album_dir
python parse_cues.py ~/Music/album_dir --ape-policy transcode
python parse_cues.py ~/Music/album_dir --dry-run
"""
from __future__ import annotations
import argparse
import glob
import os
import re
import shutil
import subprocess
import sys
import tempfile
import unicodedata
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
AUDIO_EXTS = (".flac", ".wav", ".ape")
FILE_LINE_RE = re.compile(r'^\s*FILE\s+"?(.+?)"?\s+\S+', re.IGNORECASE)
TRACK_LINE_RE = re.compile(r'^\s*TRACK\s+(\d+)\s+\S+', re.IGNORECASE)
TITLE_LINE_RE = re.compile(r'^\s*TITLE\s+(.*)$', re.IGNORECASE)
@dataclass
class AlbumJob:
cue: Path
audio: Path
audio_ext: str
output_dir: Path
class ToolError(RuntimeError):
pass
def log(msg: str) -> None:
print(msg, flush=True)
def warn(msg: str) -> None:
print(f"[warn] {msg}", file=sys.stderr, flush=True)
def err(msg: str) -> None:
print(f"[error] {msg}", file=sys.stderr, flush=True)
def which_or_fail(name: str) -> str:
path = shutil.which(name)
if not path:
raise ToolError(f"missing required tool: {name}")
return path
def ensure_tools(required: Iterable[str]) -> None:
missing = [name for name in required if shutil.which(name) is None]
if missing:
raise ToolError("missing required tools: " + ", ".join(missing))
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Traverse a directory, find cue-sheet albums, and split tracks."
)
parser.add_argument("directory", help="Root directory to scan")
parser.add_argument(
"--no-recursive",
action="store_true",
help="Only scan the top-level directory",
)
parser.add_argument(
"--ape-policy",
choices=("auto", "direct", "transcode"),
default="auto",
help="How to handle APE sources",
)
parser.add_argument(
"--output-root",
default=None,
help="Optional root directory for split outputs. Default: beside the cue file",
)
parser.add_argument(
"--final-root",
default="/home/shenwei/mnt/volume2/navidrome",
help="Move finished album folders into this root. Default: navidrome mount",
)
parser.add_argument(
"--cue-encoding",
default="auto",
help="Cue file text encoding. Default: auto",
)
parser.add_argument(
"--mp3-bitrate",
default="320k",
help="MP3 target bitrate for conversion (e.g. 192k, 256k, 320k). Default: 320k",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only print planned actions",
)
return parser.parse_args()
def read_text_lines(path: Path) -> list[str]:
return path.read_text(encoding="utf-8", errors="replace").splitlines()
def read_cue_text(path: Path, encoding: str = "auto") -> str:
if encoding != "auto":
return path.read_text(encoding=encoding, errors="replace")
raw = path.read_bytes()
candidates = ["utf-8-sig", "utf-8", "gb18030", "gbk", "big5", "cp936", "cp1252", "latin1"]
for enc in candidates:
try:
text = raw.decode(enc)
# Prefer a decode that doesn't introduce lots of replacement chars.
if "\ufffd" not in text:
return text
except UnicodeDecodeError:
continue
return raw.decode("utf-8", errors="replace")
def cue_lines(path: Path, encoding: str = "auto") -> list[str]:
return read_cue_text(path, encoding=encoding).splitlines()
def sanitize_filename(name: str) -> str:
name = name.strip().strip('"')
name = re.sub(r"[\\/:*?\"<>|]+", "_", name)
name = re.sub(r"\s+", " ", name).strip()
return name or "track"
def parse_cue_track_titles(cue_path: Path, encoding: str = "auto") -> dict[int, str]:
"""Return track number -> TITLE from a cue sheet.
The parser is intentionally lightweight and handles the common structure:
TRACK nn AUDIO
TITLE "Song Name"
"""
titles: dict[int, str] = {}
current_track: Optional[int] = None
for line in cue_lines(cue_path, encoding=encoding):
track_match = TRACK_LINE_RE.match(line)
if track_match:
current_track = int(track_match.group(1))
continue
title_match = TITLE_LINE_RE.match(line)
if title_match and current_track is not None:
title = title_match.group(1).strip()
if title.startswith('"') and title.endswith('"') and len(title) >= 2:
title = title[1:-1]
titles[current_track] = sanitize_filename(title)
return titles
def parse_cue_album_title(cue_path: Path, encoding: str = "auto") -> str:
"""Return the cue's top-level album title.
We use the first TITLE that appears before the first TRACK entry.
If absent, fall back to the cue filename stem.
"""
current_track_seen = False
for line in cue_lines(cue_path, encoding=encoding):
if TRACK_LINE_RE.match(line):
current_track_seen = True
continue
if current_track_seen:
continue
title_match = TITLE_LINE_RE.match(line)
if title_match:
title = title_match.group(1).strip()
if title.startswith('"') and title.endswith('"') and len(title) >= 2:
title = title[1:-1]
title = sanitize_filename(title)
if title:
return title
return sanitize_filename(cue_path.stem)
def detect_audio_from_cue(cue_path: Path, encoding: str = "auto") -> Optional[Path]:
"""Prefer the FILE entry inside the cue sheet.
Many cue sheets use FILE "album.ape" WAVE-style lines even if the payload is
flac/wav/ape. We use the path from the cue first, then fall back to same-stem
audio candidates in the same directory.
"""
lines = read_text_lines(cue_path)
file_refs: list[str] = []
for line in lines:
m = FILE_LINE_RE.match(line)
if m:
file_refs.append(m.group(1).strip())
# Multi-file cue sheets need a different strategy; keep this script focused on
# one-file albums for now.
if len(file_refs) > 1:
warn(f"{cue_path}: multiple FILE entries found; using the first one only")
candidates: list[Path] = []
if file_refs:
ref = file_refs[0]
ref_path = Path(ref)
if not ref_path.is_absolute():
candidates.append((cue_path.parent / ref_path).resolve())
else:
candidates.append(ref_path)
# Fallback: same stem with supported extensions.
for ext in AUDIO_EXTS:
candidates.append(cue_path.with_suffix(ext))
candidates.append(cue_path.with_suffix(ext.upper()))
seen: set[Path] = set()
for candidate in candidates:
candidate = candidate.resolve() if candidate.exists() else candidate
if candidate in seen:
continue
seen.add(candidate)
if candidate.exists() and candidate.is_file():
return candidate
return None
def find_cue_files(root: Path, recursive: bool) -> list[Path]:
if recursive:
cues = sorted(p for p in root.rglob("*.cue") if p.is_file())
else:
cues = sorted(p for p in root.glob("*.cue") if p.is_file())
return cues
def collect_album_jobs(root: Path, output_root: Optional[Path], recursive: bool, cue_encoding: str) -> list[AlbumJob]:
jobs: list[AlbumJob] = []
for cue in find_cue_files(root, recursive):
audio = detect_audio_from_cue(cue, encoding=cue_encoding)
if audio is None:
warn(f"skip {cue}: no matching flac/wav/ape audio found")
continue
audio_ext = audio.suffix.lower()
if audio_ext not in AUDIO_EXTS:
warn(f"skip {cue}: unsupported audio extension {audio.suffix}")
continue
album_dir_name = parse_cue_album_title(cue, encoding=cue_encoding)
out_dir = (output_root / album_dir_name) if output_root else cue.parent / album_dir_name
jobs.append(AlbumJob(cue=cue, audio=audio, audio_ext=audio_ext, output_dir=out_dir))
return jobs
def move_album_dir_to_final_root(album_dir: Path, final_root: Path, top_level_name: str, dry_run: bool) -> Path:
final_album_root = final_root / top_level_name
final_album_root.mkdir(parents=True, exist_ok=True)
if album_dir.resolve().parent == final_album_root.resolve():
# Already under final root with the desired top-level grouping.
return album_dir
dest = final_album_root / album_dir.name
if dest.exists():
suffix = 2
while True:
candidate = final_root / f"{album_dir.name} ({suffix})"
if not candidate.exists():
dest = candidate
break
suffix += 1
if dry_run:
log(f"[dry-run] mv -T {album_dir} {dest}")
return dest
shutil.move(str(album_dir), str(dest))
return dest
def shell_quote(path: Path) -> str:
import shlex
return shlex.quote(str(path))
def run_cmd(cmd: list[str], cwd: Optional[Path] = None, dry_run: bool = False) -> None:
printable = " ".join(shlex_quote_arg(x) for x in cmd)
if cwd:
printable = f"(cd {cwd} && {printable})"
if dry_run:
log(f"[dry-run] {printable}")
return
proc = subprocess.run(cmd, cwd=str(cwd) if cwd else None)
if proc.returncode != 0:
raise ToolError(f"command failed ({proc.returncode}): {printable}")
def shlex_quote_arg(arg: str) -> str:
import shlex
return shlex.quote(arg)
def run_shell(script: str, cwd: Optional[Path] = None, dry_run: bool = False) -> None:
printable = script
if cwd:
printable = f"(cd {cwd} && {script})"
if dry_run:
log(f"[dry-run] {printable}")
return
proc = subprocess.run(["bash", "-lc", script], cwd=str(cwd) if cwd else None)
if proc.returncode != 0:
raise ToolError(f"command failed ({proc.returncode}): {printable}")
def tag_output_files(cue: Path, out_dir: Path, ext: str, dry_run: bool) -> None:
files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{ext}")))
if not files:
warn(f"{cue}: no *.{ext} files found for tagging")
return
cmd = ["cuetag", str(cue), *[str(p) for p in files]]
run_cmd(cmd, dry_run=dry_run)
def rename_split_files_by_cue_titles(cue: Path, out_dir: Path, ext: str, dry_run: bool) -> None:
titles = parse_cue_track_titles(cue)
files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{ext}")))
if not files:
return
width = max(2, len(str(len(files))))
for idx, src in enumerate(files, start=1):
title = titles.get(idx, f"track {idx:0{width}d}")
prefix = f"{idx:0{width}d} - "
dest = src.with_name(f"{prefix}{title}{src.suffix.lower()}")
if dest == src:
continue
if dest.exists():
stem = dest.stem
suffix = dest.suffix
n = 2
while True:
alt = dest.with_name(f"{stem} ({n}){suffix}")
if not alt.exists():
dest = alt
break
n += 1
if dry_run:
log(f"[dry-run] mv {src} -> {dest}")
else:
src.rename(dest)
def convert_tracks_to_mp3(out_dir: Path, source_ext: str, bitrate: str, dry_run: bool) -> None:
"""Convert split tracks to mp3 in-place, preserving filenames."""
ensure_tools(["ffmpeg"])
source_files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{source_ext}")))
if not source_files:
warn(f"{out_dir}: no *.{source_ext} files found for mp3 conversion")
return
for src in source_files:
dest = src.with_suffix(".mp3")
cmd = [
"ffmpeg",
"-y",
"-i",
str(src),
"-map_metadata",
"0",
"-vn",
"-codec:a",
"libmp3lame",
"-b:a",
bitrate,
str(dest),
]
printable = " ".join(shlex_quote_arg(x) for x in cmd)
if dry_run:
log(f"[dry-run] {printable}")
log(f"[dry-run] rm {src}")
continue
proc = subprocess.run(cmd)
if proc.returncode != 0:
raise ToolError(f"mp3 conversion failed ({proc.returncode}): {src}")
src.unlink()
def split_direct(job: AlbumJob, dry_run: bool) -> None:
job.output_dir.mkdir(parents=True, exist_ok=True)
script = (
f"set -euo pipefail; "
f"cd {shell_quote(job.output_dir)}; "
f"cuebreakpoints {shell_quote(job.cue)} | shnsplit -o {job.audio_ext.lstrip('.')} {shell_quote(job.audio)}"
)
run_shell(script, dry_run=dry_run)
tag_output_files(job.cue, job.output_dir, job.audio_ext.lstrip('.'), dry_run=dry_run)
rename_split_files_by_cue_titles(job.cue, job.output_dir, job.audio_ext.lstrip('.'), dry_run=dry_run)
def split_via_flac(job: AlbumJob, bitrate: str, dry_run: bool) -> None:
ensure_tools(["ffmpeg"])
job.output_dir.mkdir(parents=True, exist_ok=True)
temp_flac = job.output_dir / f".{job.cue.stem}.transcode.flac"
script = (
f"set -euo pipefail; "
f"ffmpeg -y -i {shell_quote(job.audio)} -c:a flac {shell_quote(temp_flac)} >/dev/null 2>&1; "
f"cd {shell_quote(job.output_dir)}; "
f"cuebreakpoints {shell_quote(job.cue)} | shnsplit -o flac {shell_quote(temp_flac)}; "
f"rm -f {shell_quote(temp_flac)}"
)
run_shell(script, dry_run=dry_run)
tag_output_files(job.cue, job.output_dir, "flac", dry_run=dry_run)
rename_split_files_by_cue_titles(job.cue, job.output_dir, "flac", dry_run=dry_run)
convert_tracks_to_mp3(job.output_dir, "flac", bitrate, dry_run=dry_run)
def process_job(job: AlbumJob, ape_policy: str, dry_run: bool, cue_encoding: str, mp3_bitrate: str) -> None:
log(f"[album] {job.cue}")
log(f" audio: {job.audio}")
log(f" output: {job.output_dir}")
if job.audio_ext in (".flac", ".wav"):
split_direct(job, dry_run=dry_run)
convert_tracks_to_mp3(job.output_dir, job.audio_ext.lstrip('.'), mp3_bitrate, dry_run=dry_run)
return
if job.audio_ext == ".ape":
if ape_policy == "direct":
split_direct(job, dry_run=dry_run)
convert_tracks_to_mp3(job.output_dir, "ape", mp3_bitrate, dry_run=dry_run)
return
if ape_policy == "transcode":
split_via_flac(job, mp3_bitrate, dry_run=dry_run)
return
# auto
try:
split_direct(job, dry_run=dry_run)
convert_tracks_to_mp3(job.output_dir, job.audio_ext.lstrip('.'), mp3_bitrate, dry_run=dry_run)
except Exception as direct_exc:
warn(f"{job.cue}: direct APE split failed, falling back to FLAC transcode")
warn(f"reason: {direct_exc}")
split_via_flac(job, mp3_bitrate, dry_run=dry_run)
return
raise ToolError(f"unsupported audio extension: {job.audio_ext}")
def main() -> int:
args = parse_args()
root = Path(args.directory).expanduser().resolve()
if not root.exists() or not root.is_dir():
err(f"directory not found: {root}")
return 2
ensure_tools(["cuebreakpoints", "shnsplit", "cuetag"])
if args.ape_policy in ("auto", "transcode"):
# ffmpeg is only required for the fallback path or explicit transcode mode.
if shutil.which("ffmpeg") is None and args.ape_policy == "transcode":
raise ToolError("missing required tool: ffmpeg")
output_root = Path(args.output_root).expanduser().resolve() if args.output_root else None
if output_root:
output_root.mkdir(parents=True, exist_ok=True)
final_root = Path(args.final_root).expanduser().resolve()
top_level_name = root.name
jobs = collect_album_jobs(root, output_root, recursive=not args.no_recursive, cue_encoding=args.cue_encoding)
if not jobs:
log("no cue-sheet albums found")
return 0
ok = 0
failed = 0
for job in jobs:
try:
process_job(job, ape_policy=args.ape_policy, dry_run=args.dry_run, cue_encoding=args.cue_encoding, mp3_bitrate=args.mp3_bitrate)
moved = move_album_dir_to_final_root(job.output_dir, final_root, top_level_name, dry_run=args.dry_run)
log(f" final: {moved}")
ok += 1
except Exception as exc:
failed += 1
err(f"{job.cue}: {exc}")
log(f"done: {ok} ok, {failed} failed")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())