commit 67289cff8117f0a5a2741fb616d67aa3bc7c2084 Author: ishenwei Date: Fri May 22 10:51:18 2026 +0800 Add MP3 fingerprint lookup script diff --git a/extract_7z.py b/extract_7z.py new file mode 100644 index 0000000..ffb2baa --- /dev/null +++ b/extract_7z.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +"""Extract 7z archives into a directory with the same filename. + +Behavior: +- scans a directory for *.7z files +- extracts album.7z -> album/ +- optionally recurses into subdirectories + +Examples: + python extract_7z.py ~/Music/inbox + python extract_7z.py ~/Music/inbox --dry-run + python extract_7z.py ~/Music/inbox --no-recursive +""" + +from __future__ import annotations + +import argparse +import shutil +import subprocess +import sys +from pathlib import Path + +try: + import py7zr # type: ignore +except Exception: + py7zr = None + + +class ToolError(RuntimeError): + pass + + +def log(msg: str) -> None: + print(msg, flush=True) + + +def warn(msg: str) -> None: + print(f"[warn] {msg}", file=sys.stderr, flush=True) + + +def err(msg: str) -> None: + print(f"[error] {msg}", file=sys.stderr, flush=True) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Extract 7z archives into sibling directories named after the archive stem." + ) + parser.add_argument("directory", help="Root directory to scan") + parser.add_argument( + "--no-recursive", + action="store_true", + help="Only scan the top-level directory", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Only print planned actions", + ) + return parser.parse_args() + + +def find_7z_files(root: Path, recursive: bool) -> list[Path]: + if recursive: + return sorted(p for p in root.rglob("*.7z") if p.is_file()) + return sorted(p for p in root.glob("*.7z") if p.is_file()) + + +def find_7z_bin() -> str | None: + for name in ("7z", "7za", "7zr"): + path = shutil.which(name) + if path: + return path + return None + + +def run_extract_cli(archive: Path, dest_dir: Path, seven_z: str, dry_run: bool) -> None: + dest_dir.mkdir(parents=True, exist_ok=True) + cmd = [seven_z, "x", f"-o{str(dest_dir)}", "-y", str(archive)] + printable = " ".join(shlex_quote(a) for a in cmd) + if dry_run: + log(f"[dry-run] {printable}") + return + proc = subprocess.run(cmd) + if proc.returncode != 0: + raise ToolError(f"extraction failed ({proc.returncode}): {archive}") + + +def run_extract_py7zr(archive: Path, dest_dir: Path, dry_run: bool) -> None: + dest_dir.mkdir(parents=True, exist_ok=True) + if dry_run: + log(f"[dry-run] py7zr extract {archive} -> {dest_dir}") + return + if py7zr is None: + raise ToolError("py7zr is not installed") + with py7zr.SevenZipFile(archive, mode="r") as zf: + zf.extractall(path=dest_dir) + + +def shlex_quote(text: str) -> str: + import shlex + + return shlex.quote(text) + + +def main() -> int: + args = parse_args() + root = Path(args.directory).expanduser().resolve() + if not root.exists() or not root.is_dir(): + err(f"directory not found: {root}") + return 2 + + seven_z = find_7z_bin() + archives = find_7z_files(root, recursive=not args.no_recursive) + if not archives: + log("no 7z archives found") + return 0 + + if seven_z is None and py7zr is None: + raise ToolError("missing required tool: 7z/7za/7zr and python module py7zr") + + ok = 0 + failed = 0 + for archive in archives: + dest_dir = archive.with_suffix("") + log(f"[archive] {archive}") + log(f" output: {dest_dir}") + try: + if seven_z is not None: + run_extract_cli(archive, dest_dir, seven_z, dry_run=args.dry_run) + else: + run_extract_py7zr(archive, dest_dir, dry_run=args.dry_run) + ok += 1 + except Exception as exc: + failed += 1 + err(f"{archive}: {exc}") + + log(f"done: {ok} ok, {failed} failed") + return 0 if failed == 0 else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/mp3_acoustid_musicbrainz_lookup.py b/mp3_acoustid_musicbrainz_lookup.py new file mode 100644 index 0000000..da52c4b --- /dev/null +++ b/mp3_acoustid_musicbrainz_lookup.py @@ -0,0 +1,394 @@ +#!/usr/bin/env python3 +"""Traverse a directory of MP3 files, fingerprint each file, look it up via +AcoustID, then fetch MusicBrainz recording details. + +Usage: + python mp3_acoustid_musicbrainz_lookup.py + python mp3_acoustid_musicbrainz_lookup.py --output result.json + python mp3_acoustid_musicbrainz_lookup.py --write-tags + python mp3_acoustid_musicbrainz_lookup.py --write-tags --output result.json + +What it does for each MP3: +1. Generate duration + fingerprint with acoustid.fingerprint_file() +2. Call AcoustID lookup API +3. Parse MusicBrainz recording MBIDs from the AcoustID response +4. Fetch MusicBrainz recording detail JSON for each MBID +5. Optionally write title/artist/album back into the MP3 tags with mutagen + +Output behavior: +- Prints duration & fingerprint for each file +- Prints the AcoustID response for each file +- Prints the MusicBrainz recording details for each file +- If --write-tags is enabled, prints the tag update result +- Writes a final summary JSON to stdout, or to --output if specified + +Notes: +- Traversal is recursive by default. +- MusicBrainz requests are rate-limited with a small delay by default. +- Metadata updates use TIT2 (title), TPE1 (artist), and TALB (album). +- If no suitable metadata is found, the file is left unchanged. +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any, Iterable + +import acoustid +from mutagen.id3 import ID3, TALB, TPE1, TIT2 +from mutagen.mp3 import MP3 + +ACOUSTID_CLIENT_ID = "JIvtbG79eAg" +ACOUSTID_LOOKUP_URL = "https://api.acoustid.org/v2/lookup" +MUSICBRAINZ_RECORDING_URL = "https://musicbrainz.org/ws/2/recording/{mbid}?fmt=json" +DEFAULT_USER_AGENT = "moss-mp3-lookup/1.0 (https://musicbrainz.org/doc/MusicBrainz_API)" + + +@dataclass +class RecordingDetail: + mbid: str + data: dict[str, Any] | None = None + error: str | None = None + + +@dataclass +class FileResult: + file: str + duration: int | None = None + fingerprint: str | None = None + acoustid: dict[str, Any] | None = None + recordings: list[dict[str, Any]] | None = None + recording_details: list[dict[str, Any]] | None = None + error: str | None = None + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Fingerprint MP3 files and fetch AcoustID / MusicBrainz metadata." + ) + parser.add_argument("directory", help="Root directory to scan for MP3 files") + parser.add_argument( + "--no-recursive", + action="store_true", + help="Only scan the top-level directory", + ) + parser.add_argument( + "--maxlength", + type=int, + default=120, + help="Maximum seconds used by acoustid.fingerprint_file() (default: 120)", + ) + parser.add_argument( + "--mb-delay", + type=float, + default=1.1, + help="Delay in seconds between MusicBrainz detail requests (default: 1.1)", + ) + parser.add_argument( + "--output", + default=None, + help="Optional output file path. Default: stdout", + ) + parser.add_argument( + "--write-tags", + action="store_true", + help="Write title/artist/album back into each MP3 using mutagen", + ) + return parser.parse_args() + + +def iter_mp3_files(root: Path, recursive: bool = True) -> Iterable[Path]: + if recursive: + for path in sorted(root.rglob("*")): + if path.is_file() and path.suffix.lower() == ".mp3": + yield path + else: + for path in sorted(root.iterdir()): + if path.is_file() and path.suffix.lower() == ".mp3": + yield path + + +def fingerprint_mp3(path: Path, maxlength: int) -> tuple[int, str]: + duration, fingerprint = acoustid.fingerprint_file(str(path), maxlength=maxlength) + if isinstance(fingerprint, (bytes, bytearray)): + fingerprint = fingerprint.decode("ascii", errors="strict") + return int(duration), str(fingerprint) + + +def http_get_json(url: str, headers: dict[str, str] | None = None, timeout: int = 60) -> dict[str, Any]: + req = urllib.request.Request(url, headers=headers or {}) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + payload = resp.read().decode("utf-8", errors="replace") + return json.loads(payload) + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else "" + raise RuntimeError(f"HTTP {e.code} {e.reason}: {body[:500]}") from e + except urllib.error.URLError as e: + raise RuntimeError(f"network error: {e}") from e + + +def acoustid_lookup(duration: int, fingerprint: str) -> dict[str, Any]: + query = urllib.parse.urlencode( + { + "client": ACOUSTID_CLIENT_ID, + "meta": "recordings releasegroups compress", + "duration": str(duration), + "fingerprint": fingerprint, + } + ) + url = f"{ACOUSTID_LOOKUP_URL}?{query}" + return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT}) + + +def extract_recordings(lookup_json: dict[str, Any]) -> list[dict[str, Any]]: + recordings: list[dict[str, Any]] = [] + for result in lookup_json.get("results", []): + if not isinstance(result, dict): + continue + match_score = result.get("score") + for rec in result.get("recordings", []) or []: + if isinstance(rec, dict): + rec_copy = dict(rec) + rec_copy["_match_score"] = match_score + recordings.append(rec_copy) + return recordings + + +def choose_best_recording(recordings: list[dict[str, Any]]) -> dict[str, Any] | None: + """Choose the most trustworthy recording. + + Preference order: + 1. Highest AcoustID result score + 2. If tied or missing, keep the first candidate encountered + """ + + best: dict[str, Any] | None = None + best_score = float("-inf") + for rec in recordings: + score = rec.get("_match_score") + try: + score_f = float(score) + except (TypeError, ValueError): + score_f = float("-inf") + if best is None or score_f > best_score: + best = rec + best_score = score_f + return best + + +def fetch_musicbrainz_recording(mbid: str) -> dict[str, Any]: + url = MUSICBRAINZ_RECORDING_URL.format(mbid=urllib.parse.quote(mbid)) + return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT}) + + +def choose_text(value: Any) -> str | None: + if isinstance(value, list): + if not value: + return None + return choose_text(value[0]) + if value is None: + return None + return str(value) + + +def extract_mb_metadata(detail_json: dict[str, Any], fallback_rec: dict[str, Any] | None = None) -> dict[str, str | None]: + """Extract the tag values we want to write back. + + Priority: + 1. Use the chosen recording from AcoustID (highest score) + 2. Prefer these fields from that recording entry: + - recordings[0].title -> song title + - recordings[0].artists[0].name -> artist + - recordings[0].releasegroups[0].title -> album + 3. If any of those are missing, fall back to MusicBrainz recording detail fields. + """ + title: str | None = None + artist: str | None = None + album: str | None = None + + if fallback_rec: + title = choose_text(fallback_rec.get("title")) + + artists = fallback_rec.get("artists") + if isinstance(artists, list) and artists: + first_artist = artists[0] + if isinstance(first_artist, dict): + artist = choose_text(first_artist.get("name")) + + releasegroups = fallback_rec.get("releasegroups") + if isinstance(releasegroups, list) and releasegroups: + first_rg = releasegroups[0] + if isinstance(first_rg, dict): + album = choose_text(first_rg.get("title")) + + if not title: + title = choose_text(detail_json.get("title")) + + if not artist: + artist_credit = detail_json.get("artist-credit") or detail_json.get("artist_credit") + if isinstance(artist_credit, list): + parts: list[str] = [] + for item in artist_credit: + if isinstance(item, dict) and item.get("name"): + parts.append(str(item["name"])) + elif isinstance(item, str): + parts.append(item) + if parts: + artist = "".join(parts).strip() + + if not album: + release_list = detail_json.get("releases") + if isinstance(release_list, list) and release_list: + first_release = release_list[0] + if isinstance(first_release, dict): + album = choose_text(first_release.get("title")) + + return {"title": title, "artist": artist, "album": album} + + +def update_metadata(file_path: Path, title: str, artist: str, album: str) -> None: + audio = MP3(str(file_path), ID3=ID3) + audio["TIT2"] = TIT2(encoding=3, text=title) + audio["TPE1"] = TPE1(encoding=3, text=artist) + audio["TALB"] = TALB(encoding=3, text=album) + audio.save() + + +def print_section(title: str, payload: Any) -> None: + print(f"\n=== {title} ===") + print(json.dumps(payload, ensure_ascii=False, indent=2)) + + +def process_file( + path: Path, + maxlength: int, + mb_delay: float, + mb_cache: dict[str, dict[str, Any]], + write_tags: bool, +) -> FileResult: + result = FileResult(file=str(path)) + try: + duration, fingerprint = fingerprint_mp3(path, maxlength=maxlength) + result.duration = duration + result.fingerprint = fingerprint + print_section( + f"{path} / duration & fingerprint", + {"file": str(path), "duration": duration, "fingerprint": fingerprint}, + ) + except Exception as e: + result.error = f"fingerprint failed: {e}" + print_section(f"{path} / duration & fingerprint", {"file": str(path), "error": result.error}) + return result + + try: + lookup_json = acoustid_lookup(result.duration, result.fingerprint) + result.acoustid = lookup_json + print_section(f"{path} / AcoustID response", lookup_json) + except Exception as e: + result.error = f"acoustid lookup failed: {e}" + print_section(f"{path} / AcoustID response", {"file": str(path), "error": result.error}) + return result + + recordings = extract_recordings(result.acoustid) + result.recordings = recordings + + details: list[dict[str, Any]] = [] + chosen_metadata: dict[str, str | None] | None = None + best_recording = choose_best_recording(recordings) + best_recording_detail: dict[str, Any] | None = None + + for rec in recordings: + mbid = rec.get("id") + if not mbid: + continue + + if mbid in mb_cache: + detail_json = mb_cache[mbid] + else: + try: + detail_json = fetch_musicbrainz_recording(mbid) + mb_cache[mbid] = detail_json + if mb_delay > 0: + time.sleep(mb_delay) + except Exception as e: + detail_json = {"mbid": mbid, "error": str(e)} + mb_cache[mbid] = detail_json + if mb_delay > 0: + time.sleep(mb_delay) + + details.append(detail_json) + if best_recording is not None and mbid == best_recording.get("id") and isinstance(detail_json, dict) and "error" not in detail_json: + best_recording_detail = detail_json + + if chosen_metadata is None and isinstance(detail_json, dict) and "error" not in detail_json: + chosen_metadata = extract_mb_metadata(detail_json, fallback_rec=rec) + + if best_recording_detail is not None: + chosen_metadata = extract_mb_metadata(best_recording_detail, fallback_rec=best_recording) + + result.recording_details = details + print_section(f"{path} / MusicBrainz recording details", details) + + if write_tags and chosen_metadata: + title = chosen_metadata.get("title") or path.stem + artist = chosen_metadata.get("artist") or "" + album = chosen_metadata.get("album") or "" + try: + update_metadata(path, title=title, artist=artist, album=album) + print_section( + f"{path} / metadata updated", + {"title": title, "artist": artist, "album": album}, + ) + except Exception as e: + print_section(f"{path} / metadata update failed", {"file": str(path), "error": str(e)}) + + return result + + +def main() -> int: + args = parse_args() + root = Path(args.directory).expanduser().resolve() + if not root.exists() or not root.is_dir(): + print(f"[error] directory not found: {root}", file=sys.stderr) + return 2 + + mb_cache: dict[str, dict[str, Any]] = {} + items: list[dict[str, Any]] = [] + + for mp3_path in iter_mp3_files(root, recursive=not args.no_recursive): + item = process_file( + mp3_path, + maxlength=args.maxlength, + mb_delay=args.mb_delay, + mb_cache=mb_cache, + write_tags=args.write_tags, + ) + items.append(asdict(item)) + + output = { + "root": str(root), + "count": len(items), + "items": items, + } + text = json.dumps(output, ensure_ascii=False, indent=2) + + if args.output: + out_path = Path(args.output).expanduser().resolve() + out_path.write_text(text, encoding="utf-8") + else: + print(text) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/parse_cues.py b/parse_cues.py new file mode 100644 index 0000000..ddaa7ab --- /dev/null +++ b/parse_cues.py @@ -0,0 +1,512 @@ +#!/usr/bin/env python3 +"""Scan a directory for cue-sheet albums and split them into tracks. + +Supported album sources: +- cue + flac +- cue + wav +- cue + ape + +Strategy: +- flac / wav: split directly when possible +- ape: try direct split first by default, then fall back to transcoding to flac + +Output layout: +- one folder per cue sheet, defaulting to: .split/ + +Examples: + python parse_cues.py ~/Music/album_dir + python parse_cues.py ~/Music/album_dir --ape-policy transcode + python parse_cues.py ~/Music/album_dir --dry-run +""" + +from __future__ import annotations + +import argparse +import glob +import os +import re +import shutil +import subprocess +import sys +import tempfile +import unicodedata +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, Optional + +AUDIO_EXTS = (".flac", ".wav", ".ape") +FILE_LINE_RE = re.compile(r'^\s*FILE\s+"?(.+?)"?\s+\S+', re.IGNORECASE) +TRACK_LINE_RE = re.compile(r'^\s*TRACK\s+(\d+)\s+\S+', re.IGNORECASE) +TITLE_LINE_RE = re.compile(r'^\s*TITLE\s+(.*)$', re.IGNORECASE) + + +@dataclass +class AlbumJob: + cue: Path + audio: Path + audio_ext: str + output_dir: Path + + +class ToolError(RuntimeError): + pass + + +def log(msg: str) -> None: + print(msg, flush=True) + + +def warn(msg: str) -> None: + print(f"[warn] {msg}", file=sys.stderr, flush=True) + + +def err(msg: str) -> None: + print(f"[error] {msg}", file=sys.stderr, flush=True) + + +def which_or_fail(name: str) -> str: + path = shutil.which(name) + if not path: + raise ToolError(f"missing required tool: {name}") + return path + + +def ensure_tools(required: Iterable[str]) -> None: + missing = [name for name in required if shutil.which(name) is None] + if missing: + raise ToolError("missing required tools: " + ", ".join(missing)) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Traverse a directory, find cue-sheet albums, and split tracks." + ) + parser.add_argument("directory", help="Root directory to scan") + parser.add_argument( + "--no-recursive", + action="store_true", + help="Only scan the top-level directory", + ) + parser.add_argument( + "--ape-policy", + choices=("auto", "direct", "transcode"), + default="auto", + help="How to handle APE sources", + ) + parser.add_argument( + "--output-root", + default=None, + help="Optional root directory for split outputs. Default: beside the cue file", + ) + parser.add_argument( + "--final-root", + default="/home/shenwei/mnt/volume2/navidrome", + help="Move finished album folders into this root. Default: navidrome mount", + ) + parser.add_argument( + "--cue-encoding", + default="auto", + help="Cue file text encoding. Default: auto", + ) + parser.add_argument( + "--mp3-bitrate", + default="320k", + help="MP3 target bitrate for conversion (e.g. 192k, 256k, 320k). Default: 320k", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Only print planned actions", + ) + return parser.parse_args() + + +def read_text_lines(path: Path) -> list[str]: + return path.read_text(encoding="utf-8", errors="replace").splitlines() + + +def read_cue_text(path: Path, encoding: str = "auto") -> str: + if encoding != "auto": + return path.read_text(encoding=encoding, errors="replace") + + raw = path.read_bytes() + candidates = ["utf-8-sig", "utf-8", "gb18030", "gbk", "big5", "cp936", "cp1252", "latin1"] + for enc in candidates: + try: + text = raw.decode(enc) + # Prefer a decode that doesn't introduce lots of replacement chars. + if "\ufffd" not in text: + return text + except UnicodeDecodeError: + continue + return raw.decode("utf-8", errors="replace") + + +def cue_lines(path: Path, encoding: str = "auto") -> list[str]: + return read_cue_text(path, encoding=encoding).splitlines() + + +def sanitize_filename(name: str) -> str: + name = name.strip().strip('"') + name = re.sub(r"[\\/:*?\"<>|]+", "_", name) + name = re.sub(r"\s+", " ", name).strip() + return name or "track" + + +def parse_cue_track_titles(cue_path: Path, encoding: str = "auto") -> dict[int, str]: + """Return track number -> TITLE from a cue sheet. + + The parser is intentionally lightweight and handles the common structure: + TRACK nn AUDIO + TITLE "Song Name" + """ + titles: dict[int, str] = {} + current_track: Optional[int] = None + for line in cue_lines(cue_path, encoding=encoding): + track_match = TRACK_LINE_RE.match(line) + if track_match: + current_track = int(track_match.group(1)) + continue + title_match = TITLE_LINE_RE.match(line) + if title_match and current_track is not None: + title = title_match.group(1).strip() + if title.startswith('"') and title.endswith('"') and len(title) >= 2: + title = title[1:-1] + titles[current_track] = sanitize_filename(title) + return titles + + +def parse_cue_album_title(cue_path: Path, encoding: str = "auto") -> str: + """Return the cue's top-level album title. + + We use the first TITLE that appears before the first TRACK entry. + If absent, fall back to the cue filename stem. + """ + current_track_seen = False + for line in cue_lines(cue_path, encoding=encoding): + if TRACK_LINE_RE.match(line): + current_track_seen = True + continue + if current_track_seen: + continue + title_match = TITLE_LINE_RE.match(line) + if title_match: + title = title_match.group(1).strip() + if title.startswith('"') and title.endswith('"') and len(title) >= 2: + title = title[1:-1] + title = sanitize_filename(title) + if title: + return title + return sanitize_filename(cue_path.stem) + + +def detect_audio_from_cue(cue_path: Path, encoding: str = "auto") -> Optional[Path]: + """Prefer the FILE entry inside the cue sheet. + + Many cue sheets use FILE "album.ape" WAVE-style lines even if the payload is + flac/wav/ape. We use the path from the cue first, then fall back to same-stem + audio candidates in the same directory. + """ + + lines = read_text_lines(cue_path) + file_refs: list[str] = [] + for line in lines: + m = FILE_LINE_RE.match(line) + if m: + file_refs.append(m.group(1).strip()) + + # Multi-file cue sheets need a different strategy; keep this script focused on + # one-file albums for now. + if len(file_refs) > 1: + warn(f"{cue_path}: multiple FILE entries found; using the first one only") + + candidates: list[Path] = [] + if file_refs: + ref = file_refs[0] + ref_path = Path(ref) + if not ref_path.is_absolute(): + candidates.append((cue_path.parent / ref_path).resolve()) + else: + candidates.append(ref_path) + + # Fallback: same stem with supported extensions. + for ext in AUDIO_EXTS: + candidates.append(cue_path.with_suffix(ext)) + candidates.append(cue_path.with_suffix(ext.upper())) + + seen: set[Path] = set() + for candidate in candidates: + candidate = candidate.resolve() if candidate.exists() else candidate + if candidate in seen: + continue + seen.add(candidate) + if candidate.exists() and candidate.is_file(): + return candidate + + return None + + +def find_cue_files(root: Path, recursive: bool) -> list[Path]: + if recursive: + cues = sorted(p for p in root.rglob("*.cue") if p.is_file()) + else: + cues = sorted(p for p in root.glob("*.cue") if p.is_file()) + return cues + + +def collect_album_jobs(root: Path, output_root: Optional[Path], recursive: bool, cue_encoding: str) -> list[AlbumJob]: + jobs: list[AlbumJob] = [] + for cue in find_cue_files(root, recursive): + audio = detect_audio_from_cue(cue, encoding=cue_encoding) + if audio is None: + warn(f"skip {cue}: no matching flac/wav/ape audio found") + continue + audio_ext = audio.suffix.lower() + if audio_ext not in AUDIO_EXTS: + warn(f"skip {cue}: unsupported audio extension {audio.suffix}") + continue + album_dir_name = parse_cue_album_title(cue, encoding=cue_encoding) + out_dir = (output_root / album_dir_name) if output_root else cue.parent / album_dir_name + jobs.append(AlbumJob(cue=cue, audio=audio, audio_ext=audio_ext, output_dir=out_dir)) + return jobs + + +def move_album_dir_to_final_root(album_dir: Path, final_root: Path, top_level_name: str, dry_run: bool) -> Path: + final_album_root = final_root / top_level_name + final_album_root.mkdir(parents=True, exist_ok=True) + if album_dir.resolve().parent == final_album_root.resolve(): + # Already under final root with the desired top-level grouping. + return album_dir + + dest = final_album_root / album_dir.name + if dest.exists(): + suffix = 2 + while True: + candidate = final_root / f"{album_dir.name} ({suffix})" + if not candidate.exists(): + dest = candidate + break + suffix += 1 + + if dry_run: + log(f"[dry-run] mv -T {album_dir} {dest}") + return dest + + shutil.move(str(album_dir), str(dest)) + return dest + + +def shell_quote(path: Path) -> str: + import shlex + + return shlex.quote(str(path)) + + +def run_cmd(cmd: list[str], cwd: Optional[Path] = None, dry_run: bool = False) -> None: + printable = " ".join(shlex_quote_arg(x) for x in cmd) + if cwd: + printable = f"(cd {cwd} && {printable})" + if dry_run: + log(f"[dry-run] {printable}") + return + + proc = subprocess.run(cmd, cwd=str(cwd) if cwd else None) + if proc.returncode != 0: + raise ToolError(f"command failed ({proc.returncode}): {printable}") + + +def shlex_quote_arg(arg: str) -> str: + import shlex + + return shlex.quote(arg) + + +def run_shell(script: str, cwd: Optional[Path] = None, dry_run: bool = False) -> None: + printable = script + if cwd: + printable = f"(cd {cwd} && {script})" + if dry_run: + log(f"[dry-run] {printable}") + return + proc = subprocess.run(["bash", "-lc", script], cwd=str(cwd) if cwd else None) + if proc.returncode != 0: + raise ToolError(f"command failed ({proc.returncode}): {printable}") + + +def tag_output_files(cue: Path, out_dir: Path, ext: str, dry_run: bool) -> None: + files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{ext}"))) + if not files: + warn(f"{cue}: no *.{ext} files found for tagging") + return + cmd = ["cuetag", str(cue), *[str(p) for p in files]] + run_cmd(cmd, dry_run=dry_run) + + +def rename_split_files_by_cue_titles(cue: Path, out_dir: Path, ext: str, dry_run: bool) -> None: + titles = parse_cue_track_titles(cue) + files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{ext}"))) + if not files: + return + + width = max(2, len(str(len(files)))) + for idx, src in enumerate(files, start=1): + title = titles.get(idx, f"track {idx:0{width}d}") + prefix = f"{idx:0{width}d} - " + dest = src.with_name(f"{prefix}{title}{src.suffix.lower()}") + if dest == src: + continue + if dest.exists(): + stem = dest.stem + suffix = dest.suffix + n = 2 + while True: + alt = dest.with_name(f"{stem} ({n}){suffix}") + if not alt.exists(): + dest = alt + break + n += 1 + if dry_run: + log(f"[dry-run] mv {src} -> {dest}") + else: + src.rename(dest) + + +def convert_tracks_to_mp3(out_dir: Path, source_ext: str, bitrate: str, dry_run: bool) -> None: + """Convert split tracks to mp3 in-place, preserving filenames.""" + ensure_tools(["ffmpeg"]) + source_files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{source_ext}"))) + if not source_files: + warn(f"{out_dir}: no *.{source_ext} files found for mp3 conversion") + return + + for src in source_files: + dest = src.with_suffix(".mp3") + cmd = [ + "ffmpeg", + "-y", + "-i", + str(src), + "-map_metadata", + "0", + "-vn", + "-codec:a", + "libmp3lame", + "-b:a", + bitrate, + str(dest), + ] + printable = " ".join(shlex_quote_arg(x) for x in cmd) + if dry_run: + log(f"[dry-run] {printable}") + log(f"[dry-run] rm {src}") + continue + proc = subprocess.run(cmd) + if proc.returncode != 0: + raise ToolError(f"mp3 conversion failed ({proc.returncode}): {src}") + src.unlink() + + +def split_direct(job: AlbumJob, dry_run: bool) -> None: + job.output_dir.mkdir(parents=True, exist_ok=True) + script = ( + f"set -euo pipefail; " + f"cd {shell_quote(job.output_dir)}; " + f"cuebreakpoints {shell_quote(job.cue)} | shnsplit -o {job.audio_ext.lstrip('.')} {shell_quote(job.audio)}" + ) + run_shell(script, dry_run=dry_run) + tag_output_files(job.cue, job.output_dir, job.audio_ext.lstrip('.'), dry_run=dry_run) + rename_split_files_by_cue_titles(job.cue, job.output_dir, job.audio_ext.lstrip('.'), dry_run=dry_run) + + +def split_via_flac(job: AlbumJob, bitrate: str, dry_run: bool) -> None: + ensure_tools(["ffmpeg"]) + job.output_dir.mkdir(parents=True, exist_ok=True) + temp_flac = job.output_dir / f".{job.cue.stem}.transcode.flac" + script = ( + f"set -euo pipefail; " + f"ffmpeg -y -i {shell_quote(job.audio)} -c:a flac {shell_quote(temp_flac)} >/dev/null 2>&1; " + f"cd {shell_quote(job.output_dir)}; " + f"cuebreakpoints {shell_quote(job.cue)} | shnsplit -o flac {shell_quote(temp_flac)}; " + f"rm -f {shell_quote(temp_flac)}" + ) + run_shell(script, dry_run=dry_run) + tag_output_files(job.cue, job.output_dir, "flac", dry_run=dry_run) + rename_split_files_by_cue_titles(job.cue, job.output_dir, "flac", dry_run=dry_run) + convert_tracks_to_mp3(job.output_dir, "flac", bitrate, dry_run=dry_run) + + +def process_job(job: AlbumJob, ape_policy: str, dry_run: bool, cue_encoding: str, mp3_bitrate: str) -> None: + log(f"[album] {job.cue}") + log(f" audio: {job.audio}") + log(f" output: {job.output_dir}") + + if job.audio_ext in (".flac", ".wav"): + split_direct(job, dry_run=dry_run) + convert_tracks_to_mp3(job.output_dir, job.audio_ext.lstrip('.'), mp3_bitrate, dry_run=dry_run) + return + + if job.audio_ext == ".ape": + if ape_policy == "direct": + split_direct(job, dry_run=dry_run) + convert_tracks_to_mp3(job.output_dir, "ape", mp3_bitrate, dry_run=dry_run) + return + if ape_policy == "transcode": + split_via_flac(job, mp3_bitrate, dry_run=dry_run) + return + + # auto + try: + split_direct(job, dry_run=dry_run) + convert_tracks_to_mp3(job.output_dir, job.audio_ext.lstrip('.'), mp3_bitrate, dry_run=dry_run) + except Exception as direct_exc: + warn(f"{job.cue}: direct APE split failed, falling back to FLAC transcode") + warn(f"reason: {direct_exc}") + split_via_flac(job, mp3_bitrate, dry_run=dry_run) + return + + raise ToolError(f"unsupported audio extension: {job.audio_ext}") + + +def main() -> int: + args = parse_args() + root = Path(args.directory).expanduser().resolve() + if not root.exists() or not root.is_dir(): + err(f"directory not found: {root}") + return 2 + + ensure_tools(["cuebreakpoints", "shnsplit", "cuetag"]) + if args.ape_policy in ("auto", "transcode"): + # ffmpeg is only required for the fallback path or explicit transcode mode. + if shutil.which("ffmpeg") is None and args.ape_policy == "transcode": + raise ToolError("missing required tool: ffmpeg") + + output_root = Path(args.output_root).expanduser().resolve() if args.output_root else None + if output_root: + output_root.mkdir(parents=True, exist_ok=True) + final_root = Path(args.final_root).expanduser().resolve() + + top_level_name = root.name + + jobs = collect_album_jobs(root, output_root, recursive=not args.no_recursive, cue_encoding=args.cue_encoding) + if not jobs: + log("no cue-sheet albums found") + return 0 + + ok = 0 + failed = 0 + for job in jobs: + try: + process_job(job, ape_policy=args.ape_policy, dry_run=args.dry_run, cue_encoding=args.cue_encoding, mp3_bitrate=args.mp3_bitrate) + moved = move_album_dir_to_final_root(job.output_dir, final_root, top_level_name, dry_run=args.dry_run) + log(f" final: {moved}") + ok += 1 + except Exception as exc: + failed += 1 + err(f"{job.cue}: {exc}") + + log(f"done: {ok} ok, {failed} failed") + return 0 if failed == 0 else 1 + + +if __name__ == "__main__": + sys.exit(main())