513 lines
17 KiB
Python
513 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""Scan a directory for cue-sheet albums and split them into tracks.
|
|
|
|
Supported album sources:
|
|
- cue + flac
|
|
- cue + wav
|
|
- cue + ape
|
|
|
|
Strategy:
|
|
- flac / wav: split directly when possible
|
|
- ape: try direct split first by default, then fall back to transcoding to flac
|
|
|
|
Output layout:
|
|
- one folder per cue sheet, defaulting to: <cue_stem>.split/
|
|
|
|
Examples:
|
|
python parse_cues.py ~/Music/album_dir
|
|
python parse_cues.py ~/Music/album_dir --ape-policy transcode
|
|
python parse_cues.py ~/Music/album_dir --dry-run
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import glob
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import unicodedata
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable, Optional
|
|
|
|
AUDIO_EXTS = (".flac", ".wav", ".ape")
|
|
FILE_LINE_RE = re.compile(r'^\s*FILE\s+"?(.+?)"?\s+\S+', re.IGNORECASE)
|
|
TRACK_LINE_RE = re.compile(r'^\s*TRACK\s+(\d+)\s+\S+', re.IGNORECASE)
|
|
TITLE_LINE_RE = re.compile(r'^\s*TITLE\s+(.*)$', re.IGNORECASE)
|
|
|
|
|
|
@dataclass
|
|
class AlbumJob:
|
|
cue: Path
|
|
audio: Path
|
|
audio_ext: str
|
|
output_dir: Path
|
|
|
|
|
|
class ToolError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def log(msg: str) -> None:
|
|
print(msg, flush=True)
|
|
|
|
|
|
def warn(msg: str) -> None:
|
|
print(f"[warn] {msg}", file=sys.stderr, flush=True)
|
|
|
|
|
|
def err(msg: str) -> None:
|
|
print(f"[error] {msg}", file=sys.stderr, flush=True)
|
|
|
|
|
|
def which_or_fail(name: str) -> str:
|
|
path = shutil.which(name)
|
|
if not path:
|
|
raise ToolError(f"missing required tool: {name}")
|
|
return path
|
|
|
|
|
|
def ensure_tools(required: Iterable[str]) -> None:
|
|
missing = [name for name in required if shutil.which(name) is None]
|
|
if missing:
|
|
raise ToolError("missing required tools: " + ", ".join(missing))
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Traverse a directory, find cue-sheet albums, and split tracks."
|
|
)
|
|
parser.add_argument("directory", help="Root directory to scan")
|
|
parser.add_argument(
|
|
"--no-recursive",
|
|
action="store_true",
|
|
help="Only scan the top-level directory",
|
|
)
|
|
parser.add_argument(
|
|
"--ape-policy",
|
|
choices=("auto", "direct", "transcode"),
|
|
default="auto",
|
|
help="How to handle APE sources",
|
|
)
|
|
parser.add_argument(
|
|
"--output-root",
|
|
default=None,
|
|
help="Optional root directory for split outputs. Default: beside the cue file",
|
|
)
|
|
parser.add_argument(
|
|
"--final-root",
|
|
default="/home/shenwei/mnt/volume2/navidrome",
|
|
help="Move finished album folders into this root. Default: navidrome mount",
|
|
)
|
|
parser.add_argument(
|
|
"--cue-encoding",
|
|
default="auto",
|
|
help="Cue file text encoding. Default: auto",
|
|
)
|
|
parser.add_argument(
|
|
"--mp3-bitrate",
|
|
default="320k",
|
|
help="MP3 target bitrate for conversion (e.g. 192k, 256k, 320k). Default: 320k",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Only print planned actions",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def read_text_lines(path: Path) -> list[str]:
|
|
return path.read_text(encoding="utf-8", errors="replace").splitlines()
|
|
|
|
|
|
def read_cue_text(path: Path, encoding: str = "auto") -> str:
|
|
if encoding != "auto":
|
|
return path.read_text(encoding=encoding, errors="replace")
|
|
|
|
raw = path.read_bytes()
|
|
candidates = ["utf-8-sig", "utf-8", "gb18030", "gbk", "big5", "cp936", "cp1252", "latin1"]
|
|
for enc in candidates:
|
|
try:
|
|
text = raw.decode(enc)
|
|
# Prefer a decode that doesn't introduce lots of replacement chars.
|
|
if "\ufffd" not in text:
|
|
return text
|
|
except UnicodeDecodeError:
|
|
continue
|
|
return raw.decode("utf-8", errors="replace")
|
|
|
|
|
|
def cue_lines(path: Path, encoding: str = "auto") -> list[str]:
|
|
return read_cue_text(path, encoding=encoding).splitlines()
|
|
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
name = name.strip().strip('"')
|
|
name = re.sub(r"[\\/:*?\"<>|]+", "_", name)
|
|
name = re.sub(r"\s+", " ", name).strip()
|
|
return name or "track"
|
|
|
|
|
|
def parse_cue_track_titles(cue_path: Path, encoding: str = "auto") -> dict[int, str]:
|
|
"""Return track number -> TITLE from a cue sheet.
|
|
|
|
The parser is intentionally lightweight and handles the common structure:
|
|
TRACK nn AUDIO
|
|
TITLE "Song Name"
|
|
"""
|
|
titles: dict[int, str] = {}
|
|
current_track: Optional[int] = None
|
|
for line in cue_lines(cue_path, encoding=encoding):
|
|
track_match = TRACK_LINE_RE.match(line)
|
|
if track_match:
|
|
current_track = int(track_match.group(1))
|
|
continue
|
|
title_match = TITLE_LINE_RE.match(line)
|
|
if title_match and current_track is not None:
|
|
title = title_match.group(1).strip()
|
|
if title.startswith('"') and title.endswith('"') and len(title) >= 2:
|
|
title = title[1:-1]
|
|
titles[current_track] = sanitize_filename(title)
|
|
return titles
|
|
|
|
|
|
def parse_cue_album_title(cue_path: Path, encoding: str = "auto") -> str:
|
|
"""Return the cue's top-level album title.
|
|
|
|
We use the first TITLE that appears before the first TRACK entry.
|
|
If absent, fall back to the cue filename stem.
|
|
"""
|
|
current_track_seen = False
|
|
for line in cue_lines(cue_path, encoding=encoding):
|
|
if TRACK_LINE_RE.match(line):
|
|
current_track_seen = True
|
|
continue
|
|
if current_track_seen:
|
|
continue
|
|
title_match = TITLE_LINE_RE.match(line)
|
|
if title_match:
|
|
title = title_match.group(1).strip()
|
|
if title.startswith('"') and title.endswith('"') and len(title) >= 2:
|
|
title = title[1:-1]
|
|
title = sanitize_filename(title)
|
|
if title:
|
|
return title
|
|
return sanitize_filename(cue_path.stem)
|
|
|
|
|
|
def detect_audio_from_cue(cue_path: Path, encoding: str = "auto") -> Optional[Path]:
|
|
"""Prefer the FILE entry inside the cue sheet.
|
|
|
|
Many cue sheets use FILE "album.ape" WAVE-style lines even if the payload is
|
|
flac/wav/ape. We use the path from the cue first, then fall back to same-stem
|
|
audio candidates in the same directory.
|
|
"""
|
|
|
|
lines = read_text_lines(cue_path)
|
|
file_refs: list[str] = []
|
|
for line in lines:
|
|
m = FILE_LINE_RE.match(line)
|
|
if m:
|
|
file_refs.append(m.group(1).strip())
|
|
|
|
# Multi-file cue sheets need a different strategy; keep this script focused on
|
|
# one-file albums for now.
|
|
if len(file_refs) > 1:
|
|
warn(f"{cue_path}: multiple FILE entries found; using the first one only")
|
|
|
|
candidates: list[Path] = []
|
|
if file_refs:
|
|
ref = file_refs[0]
|
|
ref_path = Path(ref)
|
|
if not ref_path.is_absolute():
|
|
candidates.append((cue_path.parent / ref_path).resolve())
|
|
else:
|
|
candidates.append(ref_path)
|
|
|
|
# Fallback: same stem with supported extensions.
|
|
for ext in AUDIO_EXTS:
|
|
candidates.append(cue_path.with_suffix(ext))
|
|
candidates.append(cue_path.with_suffix(ext.upper()))
|
|
|
|
seen: set[Path] = set()
|
|
for candidate in candidates:
|
|
candidate = candidate.resolve() if candidate.exists() else candidate
|
|
if candidate in seen:
|
|
continue
|
|
seen.add(candidate)
|
|
if candidate.exists() and candidate.is_file():
|
|
return candidate
|
|
|
|
return None
|
|
|
|
|
|
def find_cue_files(root: Path, recursive: bool) -> list[Path]:
|
|
if recursive:
|
|
cues = sorted(p for p in root.rglob("*.cue") if p.is_file())
|
|
else:
|
|
cues = sorted(p for p in root.glob("*.cue") if p.is_file())
|
|
return cues
|
|
|
|
|
|
def collect_album_jobs(root: Path, output_root: Optional[Path], recursive: bool, cue_encoding: str) -> list[AlbumJob]:
|
|
jobs: list[AlbumJob] = []
|
|
for cue in find_cue_files(root, recursive):
|
|
audio = detect_audio_from_cue(cue, encoding=cue_encoding)
|
|
if audio is None:
|
|
warn(f"skip {cue}: no matching flac/wav/ape audio found")
|
|
continue
|
|
audio_ext = audio.suffix.lower()
|
|
if audio_ext not in AUDIO_EXTS:
|
|
warn(f"skip {cue}: unsupported audio extension {audio.suffix}")
|
|
continue
|
|
album_dir_name = parse_cue_album_title(cue, encoding=cue_encoding)
|
|
out_dir = (output_root / album_dir_name) if output_root else cue.parent / album_dir_name
|
|
jobs.append(AlbumJob(cue=cue, audio=audio, audio_ext=audio_ext, output_dir=out_dir))
|
|
return jobs
|
|
|
|
|
|
def move_album_dir_to_final_root(album_dir: Path, final_root: Path, top_level_name: str, dry_run: bool) -> Path:
|
|
final_album_root = final_root / top_level_name
|
|
final_album_root.mkdir(parents=True, exist_ok=True)
|
|
if album_dir.resolve().parent == final_album_root.resolve():
|
|
# Already under final root with the desired top-level grouping.
|
|
return album_dir
|
|
|
|
dest = final_album_root / album_dir.name
|
|
if dest.exists():
|
|
suffix = 2
|
|
while True:
|
|
candidate = final_root / f"{album_dir.name} ({suffix})"
|
|
if not candidate.exists():
|
|
dest = candidate
|
|
break
|
|
suffix += 1
|
|
|
|
if dry_run:
|
|
log(f"[dry-run] mv -T {album_dir} {dest}")
|
|
return dest
|
|
|
|
shutil.move(str(album_dir), str(dest))
|
|
return dest
|
|
|
|
|
|
def shell_quote(path: Path) -> str:
|
|
import shlex
|
|
|
|
return shlex.quote(str(path))
|
|
|
|
|
|
def run_cmd(cmd: list[str], cwd: Optional[Path] = None, dry_run: bool = False) -> None:
|
|
printable = " ".join(shlex_quote_arg(x) for x in cmd)
|
|
if cwd:
|
|
printable = f"(cd {cwd} && {printable})"
|
|
if dry_run:
|
|
log(f"[dry-run] {printable}")
|
|
return
|
|
|
|
proc = subprocess.run(cmd, cwd=str(cwd) if cwd else None)
|
|
if proc.returncode != 0:
|
|
raise ToolError(f"command failed ({proc.returncode}): {printable}")
|
|
|
|
|
|
def shlex_quote_arg(arg: str) -> str:
|
|
import shlex
|
|
|
|
return shlex.quote(arg)
|
|
|
|
|
|
def run_shell(script: str, cwd: Optional[Path] = None, dry_run: bool = False) -> None:
|
|
printable = script
|
|
if cwd:
|
|
printable = f"(cd {cwd} && {script})"
|
|
if dry_run:
|
|
log(f"[dry-run] {printable}")
|
|
return
|
|
proc = subprocess.run(["bash", "-lc", script], cwd=str(cwd) if cwd else None)
|
|
if proc.returncode != 0:
|
|
raise ToolError(f"command failed ({proc.returncode}): {printable}")
|
|
|
|
|
|
def tag_output_files(cue: Path, out_dir: Path, ext: str, dry_run: bool) -> None:
|
|
files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{ext}")))
|
|
if not files:
|
|
warn(f"{cue}: no *.{ext} files found for tagging")
|
|
return
|
|
cmd = ["cuetag", str(cue), *[str(p) for p in files]]
|
|
run_cmd(cmd, dry_run=dry_run)
|
|
|
|
|
|
def rename_split_files_by_cue_titles(cue: Path, out_dir: Path, ext: str, dry_run: bool) -> None:
|
|
titles = parse_cue_track_titles(cue)
|
|
files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{ext}")))
|
|
if not files:
|
|
return
|
|
|
|
width = max(2, len(str(len(files))))
|
|
for idx, src in enumerate(files, start=1):
|
|
title = titles.get(idx, f"track {idx:0{width}d}")
|
|
prefix = f"{idx:0{width}d} - "
|
|
dest = src.with_name(f"{prefix}{title}{src.suffix.lower()}")
|
|
if dest == src:
|
|
continue
|
|
if dest.exists():
|
|
stem = dest.stem
|
|
suffix = dest.suffix
|
|
n = 2
|
|
while True:
|
|
alt = dest.with_name(f"{stem} ({n}){suffix}")
|
|
if not alt.exists():
|
|
dest = alt
|
|
break
|
|
n += 1
|
|
if dry_run:
|
|
log(f"[dry-run] mv {src} -> {dest}")
|
|
else:
|
|
src.rename(dest)
|
|
|
|
|
|
def convert_tracks_to_mp3(out_dir: Path, source_ext: str, bitrate: str, dry_run: bool) -> None:
|
|
"""Convert split tracks to mp3 in-place, preserving filenames."""
|
|
ensure_tools(["ffmpeg"])
|
|
source_files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{source_ext}")))
|
|
if not source_files:
|
|
warn(f"{out_dir}: no *.{source_ext} files found for mp3 conversion")
|
|
return
|
|
|
|
for src in source_files:
|
|
dest = src.with_suffix(".mp3")
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-i",
|
|
str(src),
|
|
"-map_metadata",
|
|
"0",
|
|
"-vn",
|
|
"-codec:a",
|
|
"libmp3lame",
|
|
"-b:a",
|
|
bitrate,
|
|
str(dest),
|
|
]
|
|
printable = " ".join(shlex_quote_arg(x) for x in cmd)
|
|
if dry_run:
|
|
log(f"[dry-run] {printable}")
|
|
log(f"[dry-run] rm {src}")
|
|
continue
|
|
proc = subprocess.run(cmd)
|
|
if proc.returncode != 0:
|
|
raise ToolError(f"mp3 conversion failed ({proc.returncode}): {src}")
|
|
src.unlink()
|
|
|
|
|
|
def split_direct(job: AlbumJob, dry_run: bool) -> None:
|
|
job.output_dir.mkdir(parents=True, exist_ok=True)
|
|
script = (
|
|
f"set -euo pipefail; "
|
|
f"cd {shell_quote(job.output_dir)}; "
|
|
f"cuebreakpoints {shell_quote(job.cue)} | shnsplit -o {job.audio_ext.lstrip('.')} {shell_quote(job.audio)}"
|
|
)
|
|
run_shell(script, dry_run=dry_run)
|
|
tag_output_files(job.cue, job.output_dir, job.audio_ext.lstrip('.'), dry_run=dry_run)
|
|
rename_split_files_by_cue_titles(job.cue, job.output_dir, job.audio_ext.lstrip('.'), dry_run=dry_run)
|
|
|
|
|
|
def split_via_flac(job: AlbumJob, bitrate: str, dry_run: bool) -> None:
|
|
ensure_tools(["ffmpeg"])
|
|
job.output_dir.mkdir(parents=True, exist_ok=True)
|
|
temp_flac = job.output_dir / f".{job.cue.stem}.transcode.flac"
|
|
script = (
|
|
f"set -euo pipefail; "
|
|
f"ffmpeg -y -i {shell_quote(job.audio)} -c:a flac {shell_quote(temp_flac)} >/dev/null 2>&1; "
|
|
f"cd {shell_quote(job.output_dir)}; "
|
|
f"cuebreakpoints {shell_quote(job.cue)} | shnsplit -o flac {shell_quote(temp_flac)}; "
|
|
f"rm -f {shell_quote(temp_flac)}"
|
|
)
|
|
run_shell(script, dry_run=dry_run)
|
|
tag_output_files(job.cue, job.output_dir, "flac", dry_run=dry_run)
|
|
rename_split_files_by_cue_titles(job.cue, job.output_dir, "flac", dry_run=dry_run)
|
|
convert_tracks_to_mp3(job.output_dir, "flac", bitrate, dry_run=dry_run)
|
|
|
|
|
|
def process_job(job: AlbumJob, ape_policy: str, dry_run: bool, cue_encoding: str, mp3_bitrate: str) -> None:
|
|
log(f"[album] {job.cue}")
|
|
log(f" audio: {job.audio}")
|
|
log(f" output: {job.output_dir}")
|
|
|
|
if job.audio_ext in (".flac", ".wav"):
|
|
split_direct(job, dry_run=dry_run)
|
|
convert_tracks_to_mp3(job.output_dir, job.audio_ext.lstrip('.'), mp3_bitrate, dry_run=dry_run)
|
|
return
|
|
|
|
if job.audio_ext == ".ape":
|
|
if ape_policy == "direct":
|
|
split_direct(job, dry_run=dry_run)
|
|
convert_tracks_to_mp3(job.output_dir, "ape", mp3_bitrate, dry_run=dry_run)
|
|
return
|
|
if ape_policy == "transcode":
|
|
split_via_flac(job, mp3_bitrate, dry_run=dry_run)
|
|
return
|
|
|
|
# auto
|
|
try:
|
|
split_direct(job, dry_run=dry_run)
|
|
convert_tracks_to_mp3(job.output_dir, job.audio_ext.lstrip('.'), mp3_bitrate, dry_run=dry_run)
|
|
except Exception as direct_exc:
|
|
warn(f"{job.cue}: direct APE split failed, falling back to FLAC transcode")
|
|
warn(f"reason: {direct_exc}")
|
|
split_via_flac(job, mp3_bitrate, dry_run=dry_run)
|
|
return
|
|
|
|
raise ToolError(f"unsupported audio extension: {job.audio_ext}")
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
root = Path(args.directory).expanduser().resolve()
|
|
if not root.exists() or not root.is_dir():
|
|
err(f"directory not found: {root}")
|
|
return 2
|
|
|
|
ensure_tools(["cuebreakpoints", "shnsplit", "cuetag"])
|
|
if args.ape_policy in ("auto", "transcode"):
|
|
# ffmpeg is only required for the fallback path or explicit transcode mode.
|
|
if shutil.which("ffmpeg") is None and args.ape_policy == "transcode":
|
|
raise ToolError("missing required tool: ffmpeg")
|
|
|
|
output_root = Path(args.output_root).expanduser().resolve() if args.output_root else None
|
|
if output_root:
|
|
output_root.mkdir(parents=True, exist_ok=True)
|
|
final_root = Path(args.final_root).expanduser().resolve()
|
|
|
|
top_level_name = root.name
|
|
|
|
jobs = collect_album_jobs(root, output_root, recursive=not args.no_recursive, cue_encoding=args.cue_encoding)
|
|
if not jobs:
|
|
log("no cue-sheet albums found")
|
|
return 0
|
|
|
|
ok = 0
|
|
failed = 0
|
|
for job in jobs:
|
|
try:
|
|
process_job(job, ape_policy=args.ape_policy, dry_run=args.dry_run, cue_encoding=args.cue_encoding, mp3_bitrate=args.mp3_bitrate)
|
|
moved = move_album_dir_to_final_root(job.output_dir, final_root, top_level_name, dry_run=args.dry_run)
|
|
log(f" final: {moved}")
|
|
ok += 1
|
|
except Exception as exc:
|
|
failed += 1
|
|
err(f"{job.cue}: {exc}")
|
|
|
|
log(f"done: {ok} ok, {failed} failed")
|
|
return 0 if failed == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|