Add MP3 fingerprint lookup script

This commit is contained in:
ishenwei
2026-05-22 10:51:18 +08:00
commit 67289cff81
3 changed files with 1049 additions and 0 deletions

143
extract_7z.py Normal file
View File

@@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""Extract 7z archives into a directory with the same filename.
Behavior:
- scans a directory for *.7z files
- extracts album.7z -> album/
- optionally recurses into subdirectories
Examples:
python extract_7z.py ~/Music/inbox
python extract_7z.py ~/Music/inbox --dry-run
python extract_7z.py ~/Music/inbox --no-recursive
"""
from __future__ import annotations
import argparse
import shutil
import subprocess
import sys
from pathlib import Path
try:
import py7zr # type: ignore
except Exception:
py7zr = None
class ToolError(RuntimeError):
pass
def log(msg: str) -> None:
print(msg, flush=True)
def warn(msg: str) -> None:
print(f"[warn] {msg}", file=sys.stderr, flush=True)
def err(msg: str) -> None:
print(f"[error] {msg}", file=sys.stderr, flush=True)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Extract 7z archives into sibling directories named after the archive stem."
)
parser.add_argument("directory", help="Root directory to scan")
parser.add_argument(
"--no-recursive",
action="store_true",
help="Only scan the top-level directory",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only print planned actions",
)
return parser.parse_args()
def find_7z_files(root: Path, recursive: bool) -> list[Path]:
if recursive:
return sorted(p for p in root.rglob("*.7z") if p.is_file())
return sorted(p for p in root.glob("*.7z") if p.is_file())
def find_7z_bin() -> str | None:
for name in ("7z", "7za", "7zr"):
path = shutil.which(name)
if path:
return path
return None
def run_extract_cli(archive: Path, dest_dir: Path, seven_z: str, dry_run: bool) -> None:
dest_dir.mkdir(parents=True, exist_ok=True)
cmd = [seven_z, "x", f"-o{str(dest_dir)}", "-y", str(archive)]
printable = " ".join(shlex_quote(a) for a in cmd)
if dry_run:
log(f"[dry-run] {printable}")
return
proc = subprocess.run(cmd)
if proc.returncode != 0:
raise ToolError(f"extraction failed ({proc.returncode}): {archive}")
def run_extract_py7zr(archive: Path, dest_dir: Path, dry_run: bool) -> None:
dest_dir.mkdir(parents=True, exist_ok=True)
if dry_run:
log(f"[dry-run] py7zr extract {archive} -> {dest_dir}")
return
if py7zr is None:
raise ToolError("py7zr is not installed")
with py7zr.SevenZipFile(archive, mode="r") as zf:
zf.extractall(path=dest_dir)
def shlex_quote(text: str) -> str:
import shlex
return shlex.quote(text)
def main() -> int:
args = parse_args()
root = Path(args.directory).expanduser().resolve()
if not root.exists() or not root.is_dir():
err(f"directory not found: {root}")
return 2
seven_z = find_7z_bin()
archives = find_7z_files(root, recursive=not args.no_recursive)
if not archives:
log("no 7z archives found")
return 0
if seven_z is None and py7zr is None:
raise ToolError("missing required tool: 7z/7za/7zr and python module py7zr")
ok = 0
failed = 0
for archive in archives:
dest_dir = archive.with_suffix("")
log(f"[archive] {archive}")
log(f" output: {dest_dir}")
try:
if seven_z is not None:
run_extract_cli(archive, dest_dir, seven_z, dry_run=args.dry_run)
else:
run_extract_py7zr(archive, dest_dir, dry_run=args.dry_run)
ok += 1
except Exception as exc:
failed += 1
err(f"{archive}: {exc}")
log(f"done: {ok} ok, {failed} failed")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,394 @@
#!/usr/bin/env python3
"""Traverse a directory of MP3 files, fingerprint each file, look it up via
AcoustID, then fetch MusicBrainz recording details.
Usage:
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir>
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --output result.json
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --write-tags
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --write-tags --output result.json
What it does for each MP3:
1. Generate duration + fingerprint with acoustid.fingerprint_file()
2. Call AcoustID lookup API
3. Parse MusicBrainz recording MBIDs from the AcoustID response
4. Fetch MusicBrainz recording detail JSON for each MBID
5. Optionally write title/artist/album back into the MP3 tags with mutagen
Output behavior:
- Prints duration & fingerprint for each file
- Prints the AcoustID response for each file
- Prints the MusicBrainz recording details for each file
- If --write-tags is enabled, prints the tag update result
- Writes a final summary JSON to stdout, or to --output if specified
Notes:
- Traversal is recursive by default.
- MusicBrainz requests are rate-limited with a small delay by default.
- Metadata updates use TIT2 (title), TPE1 (artist), and TALB (album).
- If no suitable metadata is found, the file is left unchanged.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Iterable
import acoustid
from mutagen.id3 import ID3, TALB, TPE1, TIT2
from mutagen.mp3 import MP3
ACOUSTID_CLIENT_ID = "JIvtbG79eAg"
ACOUSTID_LOOKUP_URL = "https://api.acoustid.org/v2/lookup"
MUSICBRAINZ_RECORDING_URL = "https://musicbrainz.org/ws/2/recording/{mbid}?fmt=json"
DEFAULT_USER_AGENT = "moss-mp3-lookup/1.0 (https://musicbrainz.org/doc/MusicBrainz_API)"
@dataclass
class RecordingDetail:
mbid: str
data: dict[str, Any] | None = None
error: str | None = None
@dataclass
class FileResult:
file: str
duration: int | None = None
fingerprint: str | None = None
acoustid: dict[str, Any] | None = None
recordings: list[dict[str, Any]] | None = None
recording_details: list[dict[str, Any]] | None = None
error: str | None = None
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Fingerprint MP3 files and fetch AcoustID / MusicBrainz metadata."
)
parser.add_argument("directory", help="Root directory to scan for MP3 files")
parser.add_argument(
"--no-recursive",
action="store_true",
help="Only scan the top-level directory",
)
parser.add_argument(
"--maxlength",
type=int,
default=120,
help="Maximum seconds used by acoustid.fingerprint_file() (default: 120)",
)
parser.add_argument(
"--mb-delay",
type=float,
default=1.1,
help="Delay in seconds between MusicBrainz detail requests (default: 1.1)",
)
parser.add_argument(
"--output",
default=None,
help="Optional output file path. Default: stdout",
)
parser.add_argument(
"--write-tags",
action="store_true",
help="Write title/artist/album back into each MP3 using mutagen",
)
return parser.parse_args()
def iter_mp3_files(root: Path, recursive: bool = True) -> Iterable[Path]:
if recursive:
for path in sorted(root.rglob("*")):
if path.is_file() and path.suffix.lower() == ".mp3":
yield path
else:
for path in sorted(root.iterdir()):
if path.is_file() and path.suffix.lower() == ".mp3":
yield path
def fingerprint_mp3(path: Path, maxlength: int) -> tuple[int, str]:
duration, fingerprint = acoustid.fingerprint_file(str(path), maxlength=maxlength)
if isinstance(fingerprint, (bytes, bytearray)):
fingerprint = fingerprint.decode("ascii", errors="strict")
return int(duration), str(fingerprint)
def http_get_json(url: str, headers: dict[str, str] | None = None, timeout: int = 60) -> dict[str, Any]:
req = urllib.request.Request(url, headers=headers or {})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = resp.read().decode("utf-8", errors="replace")
return json.loads(payload)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else ""
raise RuntimeError(f"HTTP {e.code} {e.reason}: {body[:500]}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"network error: {e}") from e
def acoustid_lookup(duration: int, fingerprint: str) -> dict[str, Any]:
query = urllib.parse.urlencode(
{
"client": ACOUSTID_CLIENT_ID,
"meta": "recordings releasegroups compress",
"duration": str(duration),
"fingerprint": fingerprint,
}
)
url = f"{ACOUSTID_LOOKUP_URL}?{query}"
return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT})
def extract_recordings(lookup_json: dict[str, Any]) -> list[dict[str, Any]]:
recordings: list[dict[str, Any]] = []
for result in lookup_json.get("results", []):
if not isinstance(result, dict):
continue
match_score = result.get("score")
for rec in result.get("recordings", []) or []:
if isinstance(rec, dict):
rec_copy = dict(rec)
rec_copy["_match_score"] = match_score
recordings.append(rec_copy)
return recordings
def choose_best_recording(recordings: list[dict[str, Any]]) -> dict[str, Any] | None:
"""Choose the most trustworthy recording.
Preference order:
1. Highest AcoustID result score
2. If tied or missing, keep the first candidate encountered
"""
best: dict[str, Any] | None = None
best_score = float("-inf")
for rec in recordings:
score = rec.get("_match_score")
try:
score_f = float(score)
except (TypeError, ValueError):
score_f = float("-inf")
if best is None or score_f > best_score:
best = rec
best_score = score_f
return best
def fetch_musicbrainz_recording(mbid: str) -> dict[str, Any]:
url = MUSICBRAINZ_RECORDING_URL.format(mbid=urllib.parse.quote(mbid))
return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT})
def choose_text(value: Any) -> str | None:
if isinstance(value, list):
if not value:
return None
return choose_text(value[0])
if value is None:
return None
return str(value)
def extract_mb_metadata(detail_json: dict[str, Any], fallback_rec: dict[str, Any] | None = None) -> dict[str, str | None]:
"""Extract the tag values we want to write back.
Priority:
1. Use the chosen recording from AcoustID (highest score)
2. Prefer these fields from that recording entry:
- recordings[0].title -> song title
- recordings[0].artists[0].name -> artist
- recordings[0].releasegroups[0].title -> album
3. If any of those are missing, fall back to MusicBrainz recording detail fields.
"""
title: str | None = None
artist: str | None = None
album: str | None = None
if fallback_rec:
title = choose_text(fallback_rec.get("title"))
artists = fallback_rec.get("artists")
if isinstance(artists, list) and artists:
first_artist = artists[0]
if isinstance(first_artist, dict):
artist = choose_text(first_artist.get("name"))
releasegroups = fallback_rec.get("releasegroups")
if isinstance(releasegroups, list) and releasegroups:
first_rg = releasegroups[0]
if isinstance(first_rg, dict):
album = choose_text(first_rg.get("title"))
if not title:
title = choose_text(detail_json.get("title"))
if not artist:
artist_credit = detail_json.get("artist-credit") or detail_json.get("artist_credit")
if isinstance(artist_credit, list):
parts: list[str] = []
for item in artist_credit:
if isinstance(item, dict) and item.get("name"):
parts.append(str(item["name"]))
elif isinstance(item, str):
parts.append(item)
if parts:
artist = "".join(parts).strip()
if not album:
release_list = detail_json.get("releases")
if isinstance(release_list, list) and release_list:
first_release = release_list[0]
if isinstance(first_release, dict):
album = choose_text(first_release.get("title"))
return {"title": title, "artist": artist, "album": album}
def update_metadata(file_path: Path, title: str, artist: str, album: str) -> None:
audio = MP3(str(file_path), ID3=ID3)
audio["TIT2"] = TIT2(encoding=3, text=title)
audio["TPE1"] = TPE1(encoding=3, text=artist)
audio["TALB"] = TALB(encoding=3, text=album)
audio.save()
def print_section(title: str, payload: Any) -> None:
print(f"\n=== {title} ===")
print(json.dumps(payload, ensure_ascii=False, indent=2))
def process_file(
path: Path,
maxlength: int,
mb_delay: float,
mb_cache: dict[str, dict[str, Any]],
write_tags: bool,
) -> FileResult:
result = FileResult(file=str(path))
try:
duration, fingerprint = fingerprint_mp3(path, maxlength=maxlength)
result.duration = duration
result.fingerprint = fingerprint
print_section(
f"{path} / duration & fingerprint",
{"file": str(path), "duration": duration, "fingerprint": fingerprint},
)
except Exception as e:
result.error = f"fingerprint failed: {e}"
print_section(f"{path} / duration & fingerprint", {"file": str(path), "error": result.error})
return result
try:
lookup_json = acoustid_lookup(result.duration, result.fingerprint)
result.acoustid = lookup_json
print_section(f"{path} / AcoustID response", lookup_json)
except Exception as e:
result.error = f"acoustid lookup failed: {e}"
print_section(f"{path} / AcoustID response", {"file": str(path), "error": result.error})
return result
recordings = extract_recordings(result.acoustid)
result.recordings = recordings
details: list[dict[str, Any]] = []
chosen_metadata: dict[str, str | None] | None = None
best_recording = choose_best_recording(recordings)
best_recording_detail: dict[str, Any] | None = None
for rec in recordings:
mbid = rec.get("id")
if not mbid:
continue
if mbid in mb_cache:
detail_json = mb_cache[mbid]
else:
try:
detail_json = fetch_musicbrainz_recording(mbid)
mb_cache[mbid] = detail_json
if mb_delay > 0:
time.sleep(mb_delay)
except Exception as e:
detail_json = {"mbid": mbid, "error": str(e)}
mb_cache[mbid] = detail_json
if mb_delay > 0:
time.sleep(mb_delay)
details.append(detail_json)
if best_recording is not None and mbid == best_recording.get("id") and isinstance(detail_json, dict) and "error" not in detail_json:
best_recording_detail = detail_json
if chosen_metadata is None and isinstance(detail_json, dict) and "error" not in detail_json:
chosen_metadata = extract_mb_metadata(detail_json, fallback_rec=rec)
if best_recording_detail is not None:
chosen_metadata = extract_mb_metadata(best_recording_detail, fallback_rec=best_recording)
result.recording_details = details
print_section(f"{path} / MusicBrainz recording details", details)
if write_tags and chosen_metadata:
title = chosen_metadata.get("title") or path.stem
artist = chosen_metadata.get("artist") or ""
album = chosen_metadata.get("album") or ""
try:
update_metadata(path, title=title, artist=artist, album=album)
print_section(
f"{path} / metadata updated",
{"title": title, "artist": artist, "album": album},
)
except Exception as e:
print_section(f"{path} / metadata update failed", {"file": str(path), "error": str(e)})
return result
def main() -> int:
args = parse_args()
root = Path(args.directory).expanduser().resolve()
if not root.exists() or not root.is_dir():
print(f"[error] directory not found: {root}", file=sys.stderr)
return 2
mb_cache: dict[str, dict[str, Any]] = {}
items: list[dict[str, Any]] = []
for mp3_path in iter_mp3_files(root, recursive=not args.no_recursive):
item = process_file(
mp3_path,
maxlength=args.maxlength,
mb_delay=args.mb_delay,
mb_cache=mb_cache,
write_tags=args.write_tags,
)
items.append(asdict(item))
output = {
"root": str(root),
"count": len(items),
"items": items,
}
text = json.dumps(output, ensure_ascii=False, indent=2)
if args.output:
out_path = Path(args.output).expanduser().resolve()
out_path.write_text(text, encoding="utf-8")
else:
print(text)
return 0
if __name__ == "__main__":
raise SystemExit(main())

512
parse_cues.py Normal file
View File

@@ -0,0 +1,512 @@
#!/usr/bin/env python3
"""Scan a directory for cue-sheet albums and split them into tracks.
Supported album sources:
- cue + flac
- cue + wav
- cue + ape
Strategy:
- flac / wav: split directly when possible
- ape: try direct split first by default, then fall back to transcoding to flac
Output layout:
- one folder per cue sheet, defaulting to: <cue_stem>.split/
Examples:
python parse_cues.py ~/Music/album_dir
python parse_cues.py ~/Music/album_dir --ape-policy transcode
python parse_cues.py ~/Music/album_dir --dry-run
"""
from __future__ import annotations
import argparse
import glob
import os
import re
import shutil
import subprocess
import sys
import tempfile
import unicodedata
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
AUDIO_EXTS = (".flac", ".wav", ".ape")
FILE_LINE_RE = re.compile(r'^\s*FILE\s+"?(.+?)"?\s+\S+', re.IGNORECASE)
TRACK_LINE_RE = re.compile(r'^\s*TRACK\s+(\d+)\s+\S+', re.IGNORECASE)
TITLE_LINE_RE = re.compile(r'^\s*TITLE\s+(.*)$', re.IGNORECASE)
@dataclass
class AlbumJob:
cue: Path
audio: Path
audio_ext: str
output_dir: Path
class ToolError(RuntimeError):
pass
def log(msg: str) -> None:
print(msg, flush=True)
def warn(msg: str) -> None:
print(f"[warn] {msg}", file=sys.stderr, flush=True)
def err(msg: str) -> None:
print(f"[error] {msg}", file=sys.stderr, flush=True)
def which_or_fail(name: str) -> str:
path = shutil.which(name)
if not path:
raise ToolError(f"missing required tool: {name}")
return path
def ensure_tools(required: Iterable[str]) -> None:
missing = [name for name in required if shutil.which(name) is None]
if missing:
raise ToolError("missing required tools: " + ", ".join(missing))
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Traverse a directory, find cue-sheet albums, and split tracks."
)
parser.add_argument("directory", help="Root directory to scan")
parser.add_argument(
"--no-recursive",
action="store_true",
help="Only scan the top-level directory",
)
parser.add_argument(
"--ape-policy",
choices=("auto", "direct", "transcode"),
default="auto",
help="How to handle APE sources",
)
parser.add_argument(
"--output-root",
default=None,
help="Optional root directory for split outputs. Default: beside the cue file",
)
parser.add_argument(
"--final-root",
default="/home/shenwei/mnt/volume2/navidrome",
help="Move finished album folders into this root. Default: navidrome mount",
)
parser.add_argument(
"--cue-encoding",
default="auto",
help="Cue file text encoding. Default: auto",
)
parser.add_argument(
"--mp3-bitrate",
default="320k",
help="MP3 target bitrate for conversion (e.g. 192k, 256k, 320k). Default: 320k",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only print planned actions",
)
return parser.parse_args()
def read_text_lines(path: Path) -> list[str]:
return path.read_text(encoding="utf-8", errors="replace").splitlines()
def read_cue_text(path: Path, encoding: str = "auto") -> str:
if encoding != "auto":
return path.read_text(encoding=encoding, errors="replace")
raw = path.read_bytes()
candidates = ["utf-8-sig", "utf-8", "gb18030", "gbk", "big5", "cp936", "cp1252", "latin1"]
for enc in candidates:
try:
text = raw.decode(enc)
# Prefer a decode that doesn't introduce lots of replacement chars.
if "\ufffd" not in text:
return text
except UnicodeDecodeError:
continue
return raw.decode("utf-8", errors="replace")
def cue_lines(path: Path, encoding: str = "auto") -> list[str]:
return read_cue_text(path, encoding=encoding).splitlines()
def sanitize_filename(name: str) -> str:
name = name.strip().strip('"')
name = re.sub(r"[\\/:*?\"<>|]+", "_", name)
name = re.sub(r"\s+", " ", name).strip()
return name or "track"
def parse_cue_track_titles(cue_path: Path, encoding: str = "auto") -> dict[int, str]:
"""Return track number -> TITLE from a cue sheet.
The parser is intentionally lightweight and handles the common structure:
TRACK nn AUDIO
TITLE "Song Name"
"""
titles: dict[int, str] = {}
current_track: Optional[int] = None
for line in cue_lines(cue_path, encoding=encoding):
track_match = TRACK_LINE_RE.match(line)
if track_match:
current_track = int(track_match.group(1))
continue
title_match = TITLE_LINE_RE.match(line)
if title_match and current_track is not None:
title = title_match.group(1).strip()
if title.startswith('"') and title.endswith('"') and len(title) >= 2:
title = title[1:-1]
titles[current_track] = sanitize_filename(title)
return titles
def parse_cue_album_title(cue_path: Path, encoding: str = "auto") -> str:
"""Return the cue's top-level album title.
We use the first TITLE that appears before the first TRACK entry.
If absent, fall back to the cue filename stem.
"""
current_track_seen = False
for line in cue_lines(cue_path, encoding=encoding):
if TRACK_LINE_RE.match(line):
current_track_seen = True
continue
if current_track_seen:
continue
title_match = TITLE_LINE_RE.match(line)
if title_match:
title = title_match.group(1).strip()
if title.startswith('"') and title.endswith('"') and len(title) >= 2:
title = title[1:-1]
title = sanitize_filename(title)
if title:
return title
return sanitize_filename(cue_path.stem)
def detect_audio_from_cue(cue_path: Path, encoding: str = "auto") -> Optional[Path]:
"""Prefer the FILE entry inside the cue sheet.
Many cue sheets use FILE "album.ape" WAVE-style lines even if the payload is
flac/wav/ape. We use the path from the cue first, then fall back to same-stem
audio candidates in the same directory.
"""
lines = read_text_lines(cue_path)
file_refs: list[str] = []
for line in lines:
m = FILE_LINE_RE.match(line)
if m:
file_refs.append(m.group(1).strip())
# Multi-file cue sheets need a different strategy; keep this script focused on
# one-file albums for now.
if len(file_refs) > 1:
warn(f"{cue_path}: multiple FILE entries found; using the first one only")
candidates: list[Path] = []
if file_refs:
ref = file_refs[0]
ref_path = Path(ref)
if not ref_path.is_absolute():
candidates.append((cue_path.parent / ref_path).resolve())
else:
candidates.append(ref_path)
# Fallback: same stem with supported extensions.
for ext in AUDIO_EXTS:
candidates.append(cue_path.with_suffix(ext))
candidates.append(cue_path.with_suffix(ext.upper()))
seen: set[Path] = set()
for candidate in candidates:
candidate = candidate.resolve() if candidate.exists() else candidate
if candidate in seen:
continue
seen.add(candidate)
if candidate.exists() and candidate.is_file():
return candidate
return None
def find_cue_files(root: Path, recursive: bool) -> list[Path]:
if recursive:
cues = sorted(p for p in root.rglob("*.cue") if p.is_file())
else:
cues = sorted(p for p in root.glob("*.cue") if p.is_file())
return cues
def collect_album_jobs(root: Path, output_root: Optional[Path], recursive: bool, cue_encoding: str) -> list[AlbumJob]:
jobs: list[AlbumJob] = []
for cue in find_cue_files(root, recursive):
audio = detect_audio_from_cue(cue, encoding=cue_encoding)
if audio is None:
warn(f"skip {cue}: no matching flac/wav/ape audio found")
continue
audio_ext = audio.suffix.lower()
if audio_ext not in AUDIO_EXTS:
warn(f"skip {cue}: unsupported audio extension {audio.suffix}")
continue
album_dir_name = parse_cue_album_title(cue, encoding=cue_encoding)
out_dir = (output_root / album_dir_name) if output_root else cue.parent / album_dir_name
jobs.append(AlbumJob(cue=cue, audio=audio, audio_ext=audio_ext, output_dir=out_dir))
return jobs
def move_album_dir_to_final_root(album_dir: Path, final_root: Path, top_level_name: str, dry_run: bool) -> Path:
final_album_root = final_root / top_level_name
final_album_root.mkdir(parents=True, exist_ok=True)
if album_dir.resolve().parent == final_album_root.resolve():
# Already under final root with the desired top-level grouping.
return album_dir
dest = final_album_root / album_dir.name
if dest.exists():
suffix = 2
while True:
candidate = final_root / f"{album_dir.name} ({suffix})"
if not candidate.exists():
dest = candidate
break
suffix += 1
if dry_run:
log(f"[dry-run] mv -T {album_dir} {dest}")
return dest
shutil.move(str(album_dir), str(dest))
return dest
def shell_quote(path: Path) -> str:
import shlex
return shlex.quote(str(path))
def run_cmd(cmd: list[str], cwd: Optional[Path] = None, dry_run: bool = False) -> None:
printable = " ".join(shlex_quote_arg(x) for x in cmd)
if cwd:
printable = f"(cd {cwd} && {printable})"
if dry_run:
log(f"[dry-run] {printable}")
return
proc = subprocess.run(cmd, cwd=str(cwd) if cwd else None)
if proc.returncode != 0:
raise ToolError(f"command failed ({proc.returncode}): {printable}")
def shlex_quote_arg(arg: str) -> str:
import shlex
return shlex.quote(arg)
def run_shell(script: str, cwd: Optional[Path] = None, dry_run: bool = False) -> None:
printable = script
if cwd:
printable = f"(cd {cwd} && {script})"
if dry_run:
log(f"[dry-run] {printable}")
return
proc = subprocess.run(["bash", "-lc", script], cwd=str(cwd) if cwd else None)
if proc.returncode != 0:
raise ToolError(f"command failed ({proc.returncode}): {printable}")
def tag_output_files(cue: Path, out_dir: Path, ext: str, dry_run: bool) -> None:
files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{ext}")))
if not files:
warn(f"{cue}: no *.{ext} files found for tagging")
return
cmd = ["cuetag", str(cue), *[str(p) for p in files]]
run_cmd(cmd, dry_run=dry_run)
def rename_split_files_by_cue_titles(cue: Path, out_dir: Path, ext: str, dry_run: bool) -> None:
titles = parse_cue_track_titles(cue)
files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{ext}")))
if not files:
return
width = max(2, len(str(len(files))))
for idx, src in enumerate(files, start=1):
title = titles.get(idx, f"track {idx:0{width}d}")
prefix = f"{idx:0{width}d} - "
dest = src.with_name(f"{prefix}{title}{src.suffix.lower()}")
if dest == src:
continue
if dest.exists():
stem = dest.stem
suffix = dest.suffix
n = 2
while True:
alt = dest.with_name(f"{stem} ({n}){suffix}")
if not alt.exists():
dest = alt
break
n += 1
if dry_run:
log(f"[dry-run] mv {src} -> {dest}")
else:
src.rename(dest)
def convert_tracks_to_mp3(out_dir: Path, source_ext: str, bitrate: str, dry_run: bool) -> None:
"""Convert split tracks to mp3 in-place, preserving filenames."""
ensure_tools(["ffmpeg"])
source_files = sorted(Path(p) for p in glob.glob(str(out_dir / f"*.{source_ext}")))
if not source_files:
warn(f"{out_dir}: no *.{source_ext} files found for mp3 conversion")
return
for src in source_files:
dest = src.with_suffix(".mp3")
cmd = [
"ffmpeg",
"-y",
"-i",
str(src),
"-map_metadata",
"0",
"-vn",
"-codec:a",
"libmp3lame",
"-b:a",
bitrate,
str(dest),
]
printable = " ".join(shlex_quote_arg(x) for x in cmd)
if dry_run:
log(f"[dry-run] {printable}")
log(f"[dry-run] rm {src}")
continue
proc = subprocess.run(cmd)
if proc.returncode != 0:
raise ToolError(f"mp3 conversion failed ({proc.returncode}): {src}")
src.unlink()
def split_direct(job: AlbumJob, dry_run: bool) -> None:
job.output_dir.mkdir(parents=True, exist_ok=True)
script = (
f"set -euo pipefail; "
f"cd {shell_quote(job.output_dir)}; "
f"cuebreakpoints {shell_quote(job.cue)} | shnsplit -o {job.audio_ext.lstrip('.')} {shell_quote(job.audio)}"
)
run_shell(script, dry_run=dry_run)
tag_output_files(job.cue, job.output_dir, job.audio_ext.lstrip('.'), dry_run=dry_run)
rename_split_files_by_cue_titles(job.cue, job.output_dir, job.audio_ext.lstrip('.'), dry_run=dry_run)
def split_via_flac(job: AlbumJob, bitrate: str, dry_run: bool) -> None:
ensure_tools(["ffmpeg"])
job.output_dir.mkdir(parents=True, exist_ok=True)
temp_flac = job.output_dir / f".{job.cue.stem}.transcode.flac"
script = (
f"set -euo pipefail; "
f"ffmpeg -y -i {shell_quote(job.audio)} -c:a flac {shell_quote(temp_flac)} >/dev/null 2>&1; "
f"cd {shell_quote(job.output_dir)}; "
f"cuebreakpoints {shell_quote(job.cue)} | shnsplit -o flac {shell_quote(temp_flac)}; "
f"rm -f {shell_quote(temp_flac)}"
)
run_shell(script, dry_run=dry_run)
tag_output_files(job.cue, job.output_dir, "flac", dry_run=dry_run)
rename_split_files_by_cue_titles(job.cue, job.output_dir, "flac", dry_run=dry_run)
convert_tracks_to_mp3(job.output_dir, "flac", bitrate, dry_run=dry_run)
def process_job(job: AlbumJob, ape_policy: str, dry_run: bool, cue_encoding: str, mp3_bitrate: str) -> None:
log(f"[album] {job.cue}")
log(f" audio: {job.audio}")
log(f" output: {job.output_dir}")
if job.audio_ext in (".flac", ".wav"):
split_direct(job, dry_run=dry_run)
convert_tracks_to_mp3(job.output_dir, job.audio_ext.lstrip('.'), mp3_bitrate, dry_run=dry_run)
return
if job.audio_ext == ".ape":
if ape_policy == "direct":
split_direct(job, dry_run=dry_run)
convert_tracks_to_mp3(job.output_dir, "ape", mp3_bitrate, dry_run=dry_run)
return
if ape_policy == "transcode":
split_via_flac(job, mp3_bitrate, dry_run=dry_run)
return
# auto
try:
split_direct(job, dry_run=dry_run)
convert_tracks_to_mp3(job.output_dir, job.audio_ext.lstrip('.'), mp3_bitrate, dry_run=dry_run)
except Exception as direct_exc:
warn(f"{job.cue}: direct APE split failed, falling back to FLAC transcode")
warn(f"reason: {direct_exc}")
split_via_flac(job, mp3_bitrate, dry_run=dry_run)
return
raise ToolError(f"unsupported audio extension: {job.audio_ext}")
def main() -> int:
args = parse_args()
root = Path(args.directory).expanduser().resolve()
if not root.exists() or not root.is_dir():
err(f"directory not found: {root}")
return 2
ensure_tools(["cuebreakpoints", "shnsplit", "cuetag"])
if args.ape_policy in ("auto", "transcode"):
# ffmpeg is only required for the fallback path or explicit transcode mode.
if shutil.which("ffmpeg") is None and args.ape_policy == "transcode":
raise ToolError("missing required tool: ffmpeg")
output_root = Path(args.output_root).expanduser().resolve() if args.output_root else None
if output_root:
output_root.mkdir(parents=True, exist_ok=True)
final_root = Path(args.final_root).expanduser().resolve()
top_level_name = root.name
jobs = collect_album_jobs(root, output_root, recursive=not args.no_recursive, cue_encoding=args.cue_encoding)
if not jobs:
log("no cue-sheet albums found")
return 0
ok = 0
failed = 0
for job in jobs:
try:
process_job(job, ape_policy=args.ape_policy, dry_run=args.dry_run, cue_encoding=args.cue_encoding, mp3_bitrate=args.mp3_bitrate)
moved = move_album_dir_to_final_root(job.output_dir, final_root, top_level_name, dry_run=args.dry_run)
log(f" final: {moved}")
ok += 1
except Exception as exc:
failed += 1
err(f"{job.cue}: {exc}")
log(f"done: {ok} ok, {failed} failed")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())