From 974ada1ced78c32bc4887f274e6fa9cc97a0d0a0 Mon Sep 17 00:00:00 2001 From: ishenwei Date: Fri, 22 May 2026 17:12:01 +0800 Subject: [PATCH] feat: add acoustid album scan and navidrome-friendly musicbrainz tagger --- acoustid_album_scan.py | 467 +++++++++++++++++++++++++++++++ acoustid_musicbrainz_tagger.py | 487 +++++++++++++++++++++++++++++++++ 2 files changed, 954 insertions(+) create mode 100644 acoustid_album_scan.py create mode 100644 acoustid_musicbrainz_tagger.py diff --git a/acoustid_album_scan.py b/acoustid_album_scan.py new file mode 100644 index 0000000..40e1b3c --- /dev/null +++ b/acoustid_album_scan.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python3 +"""Scan a directory of MP3s with AcoustID only. + +This tool fingerprints every MP3 in a directory, queries AcoustID for each +track, and prints the per-track identification evidence plus an aggregate +directory-level candidate summary. + +It intentionally does not call MusicBrainz. The output is meant to be reviewed +as a batch so a downstream AI or human can decide which release the directory +belongs to. +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +from collections import Counter, defaultdict +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple + +import requests + +ACOUSTID_CLIENT_ID = "JIvtbG79eAg" +ACOUSTID_LOOKUP_URL = "https://api.acoustid.org/v2/lookup" +DEFAULT_META = "recordings releasegroups compress" +DEFAULT_MAXLEN = 120 + + +@dataclass +class Candidate: + recording_id: str + title: str + artists: Tuple[str, ...] + releases: Tuple[str, ...] + score: float + + @property + def artist_key(self) -> str: + return " / ".join(self.artists) if self.artists else "" + + @property + def release_key(self) -> str: + return " / ".join(self.releases) if self.releases else "" + + @property + def pair_key(self) -> Tuple[str, str]: + return (self.artist_key, self.release_key) + + +@dataclass +class TrackResult: + file: str + duration: Optional[float] + fingerprint: Optional[str] + fingerprint_error: Optional[str] + lookup_error: Optional[str] + candidates: List[Dict[str, Any]] + best_guess: Optional[Dict[str, Any]] + ambiguous: bool + + +def iter_mp3_files(root: Path, recursive: bool = True) -> List[Path]: + if root.is_file(): + return [root] if root.suffix.lower() == ".mp3" else [] + if recursive: + return sorted(p for p in root.rglob("*.mp3") if p.is_file()) + return sorted(p for p in root.glob("*.mp3") if p.is_file()) + + +def normalize_fingerprint(fp: Any) -> Optional[str]: + if fp is None: + return None + if isinstance(fp, bytes): + return fp.decode("ascii", errors="ignore") + return str(fp) + + +def fingerprint_mp3(path: Path, maxlength: int) -> Tuple[Optional[float], Optional[str], Optional[str]]: + try: + import acoustid + except Exception as exc: # pragma: no cover - dependency error path + return None, None, f"missing dependency 'acoustid': {exc}" + + try: + duration, fingerprint = acoustid.fingerprint_file(str(path), maxlength=maxlength) + return float(duration) if duration is not None else None, normalize_fingerprint(fingerprint), None + except Exception as exc: + return None, None, f"fingerprint failed: {exc}" + + +def acoustid_lookup(duration: float, fingerprint: str, retries: int = 2, timeout: int = 60) -> Dict[str, Any]: + params = { + "client": ACOUSTID_CLIENT_ID, + "meta": DEFAULT_META, + "duration": int(round(duration)), + "fingerprint": fingerprint, + } + + last_error: Optional[Exception] = None + for attempt in range(retries + 1): + try: + resp = requests.get(ACOUSTID_LOOKUP_URL, params=params, timeout=timeout) + resp.raise_for_status() + payload = resp.json() + if payload.get("status") != "ok": + raise RuntimeError(f"AcoustID status not ok: {payload}") + return payload + except Exception as exc: + last_error = exc + if attempt < retries: + time.sleep(1.5 * (attempt + 1)) + continue + raise RuntimeError(f"AcoustID lookup failed: {last_error}") from exc + + raise RuntimeError(f"AcoustID lookup failed: {last_error}") + + +def _names_from_people(items: Any) -> List[str]: + names: List[str] = [] + if isinstance(items, list): + for item in items: + if isinstance(item, str): + if item.strip(): + names.append(item.strip()) + elif isinstance(item, dict): + name = item.get("name") or item.get("artist", {}).get("name") + if name and str(name).strip(): + names.append(str(name).strip()) + elif isinstance(items, dict): + name = items.get("name") + if name: + names.append(str(name).strip()) + return names + + +def _recording_artists(recording: Dict[str, Any]) -> Tuple[str, ...]: + artists = _names_from_people(recording.get("artists")) + if not artists: + artists = _names_from_people(recording.get("artist-credit")) + if not artists: + artist = recording.get("artist", {}) + if isinstance(artist, dict) and artist.get("name"): + artists = [str(artist["name"]).strip()] + return tuple(dict.fromkeys(a for a in artists if a)) + + +def _recording_releases(recording: Dict[str, Any]) -> Tuple[str, ...]: + titles: List[str] = [] + for key in ("releasegroups", "releasegroups", "release-group", "release-list", "releases"): + values = recording.get(key) + if isinstance(values, list): + for item in values: + if isinstance(item, dict): + title = item.get("title") or item.get("name") + if title and str(title).strip(): + titles.append(str(title).strip()) + elif isinstance(item, str) and item.strip(): + titles.append(item.strip()) + elif isinstance(values, dict): + title = values.get("title") or values.get("name") + if title and str(title).strip(): + titles.append(str(title).strip()) + return tuple(dict.fromkeys(titles)) + + +def extract_candidates(payload: Dict[str, Any]) -> List[Candidate]: + candidates: Dict[Tuple[str, str, Tuple[str, ...], Tuple[str, ...]], Candidate] = {} + for result in payload.get("results", []): + try: + score = float(result.get("score", 0.0)) + except Exception: + score = 0.0 + for recording in result.get("recordings", []): + if not isinstance(recording, dict): + continue + recording_id = str(recording.get("id") or recording.get("mbid") or "") + title = str(recording.get("title") or "").strip() + artists = _recording_artists(recording) + releases = _recording_releases(recording) + key = (recording_id, title, artists, releases) + existing = candidates.get(key) + if existing is None or score > existing.score: + candidates[key] = Candidate( + recording_id=recording_id, + title=title, + artists=artists, + releases=releases, + score=score, + ) + return sorted(candidates.values(), key=lambda c: c.score, reverse=True) + + +def candidate_to_dict(candidate: Candidate) -> Dict[str, Any]: + return { + "musicbrainz_recording_id": candidate.recording_id, + "title": candidate.title, + "artists": list(candidate.artists), + "releases": list(candidate.releases), + "artist": candidate.artist_key, + "release": candidate.release_key, + "score": candidate.score, + } + + +def choose_best_guess(candidates: Sequence[Candidate]) -> Optional[Dict[str, Any]]: + if not candidates: + return None + return candidate_to_dict(candidates[0]) + + +def track_is_ambiguous(candidates: Sequence[Candidate]) -> bool: + if len(candidates) <= 1: + return False + artists = {c.artist_key for c in candidates if c.artist_key} + releases = {c.release_key for c in candidates if c.release_key} + return len(artists) > 1 or len(releases) > 1 + + +def summarize_album(tracks: Sequence[TrackResult]) -> Dict[str, Any]: + pair_stats: Dict[Tuple[str, str], Dict[str, Any]] = defaultdict(lambda: {"tracks": 0, "score_sum": 0.0, "files": []}) + artist_counter: Counter[str] = Counter() + release_counter: Counter[str] = Counter() + ambiguous_files: List[str] = [] + + for track in tracks: + if track.ambiguous: + ambiguous_files.append(track.file) + if not track.best_guess: + continue + artist = str(track.best_guess.get("artist") or "") + release = str(track.best_guess.get("release") or "") + score = float(track.best_guess.get("score") or 0.0) + key = (artist, release) + pair_stats[key]["tracks"] += 1 + pair_stats[key]["score_sum"] += score + pair_stats[key]["files"].append(track.file) + if artist: + artist_counter[artist] += 1 + if release: + release_counter[release] += 1 + + ranked_pairs = sorted( + ( + { + "artist": artist, + "release": release, + "tracks": stats["tracks"], + "score_sum": round(stats["score_sum"], 4), + "files": stats["files"], + } + for (artist, release), stats in pair_stats.items() + ), + key=lambda item: (item["tracks"], item["score_sum"]), + reverse=True, + ) + + majority_pair = ranked_pairs[0] if ranked_pairs else None + majority_track_count = majority_pair["tracks"] if majority_pair else 0 + + return { + "track_count": len(tracks), + "ambiguous_track_count": len(ambiguous_files), + "ambiguous_files": ambiguous_files, + "top_artist_candidates": artist_counter.most_common(10), + "top_release_candidates": release_counter.most_common(10), + "candidate_pairs": ranked_pairs, + "majority_pair": majority_pair, + "majority_track_count": majority_track_count, + } + + +def choose_majority_album(summary: Dict[str, Any]) -> Tuple[str, str]: + pair = summary.get("majority_pair") or {} + return str(pair.get("artist") or ""), str(pair.get("release") or "") + + +def _match_candidate_to_album(track: TrackResult, artist: str, release: str) -> Optional[str]: + for candidate in track.candidates: + if str(candidate.get("artist") or "") == artist and str(candidate.get("release") or "") == release: + return str(candidate.get("musicbrainz_recording_id") or "") or None + return None + + +def resolve_majority_recording_ids(tracks: Sequence[TrackResult], summary: Dict[str, Any]) -> Dict[str, Any]: + artist, release = choose_majority_album(summary) + selected_tracks: List[Dict[str, Any]] = [] + unresolved_tracks: List[str] = [] + + for track in tracks: + recording_id = _match_candidate_to_album(track, artist, release) + if recording_id is None and track.best_guess: + bg_artist = str(track.best_guess.get("artist") or "") + bg_release = str(track.best_guess.get("release") or "") + if bg_artist == artist and bg_release == release: + recording_id = str(track.best_guess.get("musicbrainz_recording_id") or "") or None + if recording_id: + selected_tracks.append({"file": track.file, "musicbrainz_recording_id": recording_id}) + else: + unresolved_tracks.append(track.file) + + return { + "artist": artist, + "release": release, + "tracks": selected_tracks, + "track_count": len(selected_tracks), + "unresolved_files": unresolved_tracks, + } + + +def scan_directory(directory: Path, recursive: bool, maxlength: int, delay: float) -> Dict[str, Any]: + files = iter_mp3_files(directory, recursive=recursive) + results: List[TrackResult] = [] + + if not files: + return { + "directory": str(directory), + "tracks": [], + "album_summary": { + "track_count": 0, + "ambiguous_track_count": 0, + "ambiguous_files": [], + "top_artist_candidates": [], + "top_release_candidates": [], + "candidate_pairs": [], + "majority_pair": None, + "majority_track_count": 0, + }, + "majority_album": {"artist": "", "release": "", "tracks": [], "track_count": 0, "unresolved_files": []}, + } + + for index, path in enumerate(files, start=1): + print(f"[{index}/{len(files)}] {path}", file=sys.stderr) + duration, fingerprint, fp_error = fingerprint_mp3(path, maxlength=maxlength) + lookup_error = None + candidates: List[Candidate] = [] + + if fingerprint and duration is not None: + try: + payload = acoustid_lookup(duration=duration, fingerprint=fingerprint) + candidates = extract_candidates(payload) + except Exception as exc: + lookup_error = str(exc) + else: + lookup_error = fp_error or "missing fingerprint" + + track = TrackResult( + file=str(path), + duration=duration, + fingerprint=fingerprint, + fingerprint_error=fp_error, + lookup_error=lookup_error, + candidates=[candidate_to_dict(c) for c in candidates], + best_guess=choose_best_guess(candidates), + ambiguous=track_is_ambiguous(candidates), + ) + results.append(track) + + if delay > 0 and index < len(files): + time.sleep(delay) + + summary = summarize_album(results) + majority_album = resolve_majority_recording_ids(results, summary) + + return { + "directory": str(directory), + "tracks": [asdict(track) for track in results], + "album_summary": summary, + "majority_album": majority_album, + } + + +def format_human_report(payload: Dict[str, Any]) -> str: + lines: List[str] = [] + lines.append(f"Directory: {payload['directory']}") + lines.append(f"Tracks: {len(payload.get('tracks', []))}") + summary = payload.get("album_summary", {}) + lines.append(f"Ambiguous tracks: {summary.get('ambiguous_track_count', 0)}") + majority = payload.get("majority_album", {}) + lines.append( + f"Majority album: artist={majority.get('artist') or '-'} | release={majority.get('release') or '-'} | matched_tracks={majority.get('track_count', 0)}" + ) + lines.append("") + + for i, track in enumerate(payload.get("tracks", []), start=1): + lines.append(f"{i}. {track['file']}") + lines.append(f" duration: {track.get('duration')}") + if track.get("fingerprint_error"): + lines.append(f" fingerprint_error: {track['fingerprint_error']}") + if track.get("lookup_error"): + lines.append(f" lookup_error: {track['lookup_error']}") + if track.get("best_guess"): + bg = track["best_guess"] + lines.append( + f" best_guess: artist={bg.get('artist') or '-'} | release={bg.get('release') or '-'} | title={bg.get('title') or '-'} | score={bg.get('score')}" + ) + else: + lines.append(" best_guess: -") + if track.get("ambiguous"): + lines.append(" ambiguous: yes") + if track.get("candidates"): + lines.append(" candidates:") + for cand in track["candidates"]: + lines.append( + f" - score={cand.get('score')} | artist={cand.get('artist') or '-'} | release={cand.get('release') or '-'} | title={cand.get('title') or '-'} | musicbrainz_recording_id={cand.get('musicbrainz_recording_id') or '-'}" + ) + lines.append("") + + lines.append("Album-level candidate pairs:") + for cand in summary.get("candidate_pairs", []): + lines.append( + f"- tracks={cand['tracks']} | score_sum={cand['score_sum']} | artist={cand['artist'] or '-'} | release={cand['release'] or '-'}" + ) + if not summary.get("candidate_pairs"): + lines.append("- none") + + lines.append("") + lines.append("Majority album recording IDs:") + majority_tracks = majority.get("tracks", []) + if majority_tracks: + for item in majority_tracks: + lines.append(f"- {item['file']} -> {item['recording_id']}") + else: + lines.append("- none") + unresolved = majority.get("unresolved_files", []) + if unresolved: + lines.append("Unresolved files:") + for file in unresolved: + lines.append(f"- {file}") + + return "\n".join(lines) + + +def main(argv: Optional[Sequence[str]] = None) -> int: + parser = argparse.ArgumentParser( + description="Fingerprint a directory of MP3s with AcoustID and aggregate candidate releases/artists without calling MusicBrainz.", + ) + parser.add_argument("path", help="MP3 file or directory to scan") + parser.add_argument("--no-recursive", action="store_true", help="Do not recurse into subdirectories") + parser.add_argument("--maxlength", type=int, default=DEFAULT_MAXLEN, help="Maximum audio length passed to fingerprinting") + parser.add_argument("--delay", type=float, default=0.0, help="Delay in seconds between AcoustID lookups") + parser.add_argument("--json", action="store_true", help="Print JSON output instead of human-readable text") + parser.add_argument("--output", type=str, default="", help="Write JSON output to a file") + args = parser.parse_args(argv) + + root = Path(args.path).expanduser().resolve() + if not root.exists(): + print(f"Path not found: {root}", file=sys.stderr) + return 2 + + payload = scan_directory(root, recursive=not args.no_recursive, maxlength=args.maxlength, delay=args.delay) + + if args.output: + Path(args.output).expanduser().resolve().write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + if args.json: + print(json.dumps(payload, ensure_ascii=False, indent=2)) + else: + print(format_human_report(payload)) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/acoustid_musicbrainz_tagger.py b/acoustid_musicbrainz_tagger.py new file mode 100644 index 0000000..0cfde25 --- /dev/null +++ b/acoustid_musicbrainz_tagger.py @@ -0,0 +1,487 @@ +#!/usr/bin/env python3 +"""Enrich MP3s using MusicBrainz recording IDs from acoustid_album_scan.py. + +Usage overview: +- Default: run acoustid_album_scan.py internally, read its JSON from stdout, + resolve each MP3's musicbrainz_recording_id, fetch MusicBrainz recording + details, and print a JSON report. +- Preview only: do not pass --write-tags. The script will fetch metadata and + show the tag preview without modifying files. +- Write tags: pass --write-tags to write title/artist/album plus Navidrome-friendly mapped tags (album artist/date/track/disc/genre) and embed album cover art into each MP3. +- Use existing scan JSON: pass --scan-json /path/to/scan.json if you already + have a saved acoustid_album_scan.py result and want to skip re-scanning. +- Control recursion: pass --no-recursive to limit processing to the top-level + directory only. +- Control pacing: pass --mb-delay to slow down MusicBrainz requests. +- Save report: pass --output /path/to/report.json to write the final JSON to a + file. +- Stdout JSON: pass --json to print the final JSON report to stdout. + +Examples: +- python acoustid_musicbrainz_tagger.py /path/to/album +- python acoustid_musicbrainz_tagger.py /path/to/album --write-tags +- python acoustid_musicbrainz_tagger.py /path/to/album --scan-json /tmp/scan.json --json +- python acoustid_musicbrainz_tagger.py /path/to/album --output report.json + +This script is intentionally designed around temporary scan output: if +--scan-json is omitted, it invokes acoustid_album_scan.py directly and keeps the +scan JSON in memory rather than requiring a permanent intermediate file. +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +import subprocess +import requests +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any, Dict, List, Optional, Sequence, Tuple + + +def iter_mp3_files(root: Path, recursive: bool = True) -> List[Path]: + if root.is_file(): + return [root] if root.suffix.lower() == ".mp3" else [] + if recursive: + return sorted(p for p in root.rglob("*.mp3") if p.is_file()) + return sorted(p for p in root.glob("*.mp3") if p.is_file()) + + +def run_album_scan(root: Path, recursive: bool = True) -> Dict[str, Any]: + scan_script = Path(__file__).with_name("acoustid_album_scan.py") + if not scan_script.exists(): + raise RuntimeError(f"acoustid_album_scan.py not found next to this script: {scan_script}") + + cmd = [sys.executable, str(scan_script), str(root), "--json"] + if not recursive: + cmd.append("--no-recursive") + + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0: + raise RuntimeError( + f"acoustid_album_scan.py failed with exit code {proc.returncode}: {proc.stderr.strip()}" + ) + try: + return json.loads(proc.stdout) + except Exception as exc: + raise RuntimeError(f"failed to parse acoustid_album_scan.py JSON output: {exc}") from exc + + +def build_recording_id_map(scan_payload: Dict[str, Any]) -> Dict[str, str]: + mapping: Dict[str, str] = {} + + majority = scan_payload.get("majority_album", {}) or {} + for item in majority.get("tracks", []) or []: + file_path = str(item.get("file") or "") + recording_id = str(item.get("musicbrainz_recording_id") or "") + if file_path and recording_id: + mapping[file_path] = recording_id + + for track in scan_payload.get("tracks", []) or []: + file_path = str(track.get("file") or "") + best_guess = track.get("best_guess") or {} + recording_id = str(best_guess.get("musicbrainz_recording_id") or "") + if file_path and recording_id and file_path not in mapping: + mapping[file_path] = recording_id + + return mapping + + +@dataclass +class MetadataPreview: + title: str + artist: str + album_artist: str + album: str + date: str + track_number: str + disc_number: str + genre: str + + +@dataclass +class FileResult: + file: str + musicbrainz_recording_id: Optional[str] + musicbrainz_recording: Optional[Dict[str, Any]] + metadata_preview: Optional[Dict[str, str]] + cover_art: Optional[Dict[str, Any]] + tag_write: Dict[str, Any] + error: Optional[str] + + +def _extract_artist_names(recording: Dict[str, Any]) -> List[str]: + names: List[str] = [] + + artist_credit = recording.get("artist-credit") or recording.get("artist_credit") or [] + if isinstance(artist_credit, list): + for part in artist_credit: + if isinstance(part, dict): + artist = part.get("artist") or {} + if isinstance(artist, dict): + name = artist.get("name") or artist.get("sort-name") + if name: + names.append(str(name)) + elif isinstance(part, str) and part.strip(): + names.append(part.strip()) + + if not names: + artists = recording.get("artist-list") or recording.get("artists") or [] + if isinstance(artists, list): + for artist in artists: + if isinstance(artist, dict): + name = artist.get("name") or artist.get("sort-name") + if name: + names.append(str(name)) + + # Deduplicate while preserving order. + return list(dict.fromkeys(n for n in names if n)) + + +def _extract_album_title(recording: Dict[str, Any]) -> str: + releases = recording.get("release-list") or recording.get("releases") or [] + if isinstance(releases, list): + for release in releases: + if isinstance(release, dict): + title = release.get("title") + if title: + return str(title) + return "" + + +def _extract_album_artist(recording: Dict[str, Any]) -> str: + release_list = recording.get("release-list") or recording.get("releases") or [] + if isinstance(release_list, list): + for release in release_list: + if isinstance(release, dict): + credit = release.get("artist-credit") or release.get("artist_credit") or [] + if isinstance(credit, list) and credit: + names: List[str] = [] + for part in credit: + if isinstance(part, dict): + artist = part.get("artist") or {} + if isinstance(artist, dict): + name = artist.get("name") or artist.get("sort-name") + if name: + names.append(str(name)) + elif isinstance(part, str) and part.strip(): + names.append(part.strip()) + if names: + return " / ".join(dict.fromkeys(names)) + artist_name = release.get("artist-credit-name") or release.get("artist") + if isinstance(artist_name, dict): + artist_name = artist_name.get("name") + if artist_name: + return str(artist_name) + return "" + + +def _extract_release_info(recording: Dict[str, Any]) -> Dict[str, str]: + release_list = recording.get("release-list") or recording.get("releases") or [] + if not isinstance(release_list, list): + return {} + for release in release_list: + if not isinstance(release, dict): + continue + data = { + "album": str(release.get("title") or ""), + "date": str(release.get("date") or release.get("first-release-date") or ""), + "track_number": "", + "disc_number": "", + "genre": "", + } + if release.get("medium-list"): + medium_list = release.get("medium-list") or [] + if isinstance(medium_list, list) and medium_list: + medium = medium_list[0] + if isinstance(medium, dict): + data["disc_number"] = str(medium.get("position") or "") + track_list = medium.get("track-list") or [] + if isinstance(track_list, list) and track_list: + track = track_list[0] + if isinstance(track, dict): + data["track_number"] = str(track.get("position") or "") + release_group = release.get("release-group") or {} + if isinstance(release_group, dict): + genres = release_group.get("genre-list") or release_group.get("genres") or [] + if isinstance(genres, list) and genres: + first = genres[0] + if isinstance(first, dict): + data["genre"] = str(first.get("name") or first.get("genre") or "") + elif isinstance(first, str): + data["genre"] = first + return data + return {} + + +def build_metadata_preview(recording: Dict[str, Any]) -> MetadataPreview: + title = str(recording.get("title") or "") + artists = _extract_artist_names(recording) + artist = " / ".join(artists) + album_artist = _extract_album_artist(recording) + if not album_artist: + album_artist = artist + release_info = _extract_release_info(recording) + album = release_info.get("album") or _extract_album_title(recording) + return MetadataPreview( + title=title, + artist=artist, + album_artist=album_artist, + album=album, + date=release_info.get("date", ""), + track_number=release_info.get("track_number", ""), + disc_number=release_info.get("disc_number", ""), + genre=release_info.get("genre", ""), + ) + + +def fetch_recording(recording_id: str) -> Dict[str, Any]: + try: + import musicbrainzngs + except Exception as exc: # pragma: no cover - dependency error path + raise RuntimeError(f"missing dependency 'musicbrainzngs': {exc}") from exc + + musicbrainzngs.set_useragent("moss-acoustid-mb-tagger", "1.0", "shenwei@example.com") + result = musicbrainzngs.get_recording_by_id( + recording_id, + includes=["artists", "releases", "artist-credits"], + ) + + recording = result.get("recording") if isinstance(result, dict) else None + if not isinstance(recording, dict): + raise RuntimeError(f"unexpected MusicBrainz response for {recording_id}") + return recording + + +def fetch_cover_art(release_mbid: str, timeout: int = 60) -> Tuple[Optional[bytes], Optional[str], Optional[str]]: + url = f"https://coverartarchive.org/release/{release_mbid}/front" + headers = {"User-Agent": "moss-acoustid-mb-tagger/1.0"} + try: + resp = requests.get(url, headers=headers, timeout=timeout) + if resp.status_code != 200: + return None, None, f"cover art http {resp.status_code}" + content_type = resp.headers.get("Content-Type", "") + return resp.content, content_type, None + except Exception as exc: + return None, None, str(exc) + + +def _mime_to_apic_type(mime: str) -> str: + m = (mime or "").lower() + if "png" in m: + return "image/png" + if "jpeg" in m or "jpg" in m: + return "image/jpeg" + if "webp" in m: + return "image/webp" + return "image/jpeg" + + +def _extract_release_mbid(recording: Dict[str, Any]) -> str: + releases = recording.get("release-list") or recording.get("releases") or [] + if isinstance(releases, list): + for release in releases: + if isinstance(release, dict): + mbid = release.get("id") or release.get("mbid") + if mbid: + return str(mbid) + return "" + + +def _set_text_frame(tag, frame_cls, value: str) -> None: + if value: + tag.add(frame_cls(encoding=3, text=value)) + + +def _set_numeric_frame(tag, frame_cls, value: str) -> None: + if value: + tag.add(frame_cls(encoding=3, text=value)) + + +def write_tags(path: Path, preview: MetadataPreview, cover_art_bytes: Optional[bytes] = None, cover_art_mime: str = "image/jpeg") -> Dict[str, Any]: + try: + from mutagen.id3 import APIC, ID3, TALB, TCON, TDRC, TPE1, TPE2, TIT2, TRCK, TPOS + from mutagen.mp3 import MP3 + except Exception as exc: # pragma: no cover - dependency error path + return {"written": False, "error": f"missing dependency 'mutagen': {exc}"} + + def _add_text(tag_list, frame): + if frame: + tag_list.add(frame) + + try: + audio = MP3(str(path), ID3=ID3) + if audio.tags is None: + audio.add_tags() + audio.tags.delall("TIT2") + audio.tags.delall("TPE1") + audio.tags.delall("TPE2") + audio.tags.delall("TALB") + audio.tags.delall("TDRC") + audio.tags.delall("TRCK") + audio.tags.delall("TPOS") + audio.tags.delall("TCON") + audio.tags.delall("APIC") + if preview.title: + _add_text(audio.tags, TIT2(encoding=3, text=preview.title)) + if preview.artist: + _add_text(audio.tags, TPE1(encoding=3, text=preview.artist)) + if preview.album_artist: + _add_text(audio.tags, TPE2(encoding=3, text=preview.album_artist)) + if preview.album: + _add_text(audio.tags, TALB(encoding=3, text=preview.album)) + if preview.date: + _add_text(audio.tags, TDRC(encoding=3, text=preview.date)) + if preview.track_number: + _add_text(audio.tags, TRCK(encoding=3, text=preview.track_number)) + if preview.disc_number: + _add_text(audio.tags, TPOS(encoding=3, text=preview.disc_number)) + if preview.genre: + _add_text(audio.tags, TCON(encoding=3, text=preview.genre)) + if cover_art_bytes: + audio.tags.add( + APIC( + encoding=3, + mime=_mime_to_apic_type(cover_art_mime), + type=3, + desc="Front Cover", + data=cover_art_bytes, + ) + ) + audio.save() + return {"written": True, "error": None, "cover_art_embedded": bool(cover_art_bytes)} + except Exception as exc: + return {"written": False, "error": f"tag write failed: {exc}"} + + +def process_file(path: Path, recording_id: Optional[str], write_tags_flag: bool, cache: Dict[str, Dict[str, Any]], cover_cache: Dict[str, Dict[str, Any]]) -> FileResult: + if not recording_id: + return FileResult( + file=str(path), + musicbrainz_recording_id=None, + musicbrainz_recording=None, + metadata_preview=None, + cover_art=None, + tag_write={"written": False, "error": "missing musicbrainz_recording_id"}, + error=None, + ) + + try: + if recording_id not in cache: + cache[recording_id] = fetch_recording(recording_id) + recording = cache[recording_id] + preview = build_metadata_preview(recording) + + release_mbid = _extract_release_mbid(recording) + cover_art_info = { + "release_mbid": release_mbid, + "embedded": False, + "mime": None, + "source": None, + "error": None, + } + cover_art_bytes: Optional[bytes] = None + cover_art_mime = "image/jpeg" + if release_mbid: + if release_mbid not in cover_cache: + blob, mime, err = fetch_cover_art(release_mbid) + cover_cache[release_mbid] = {"blob": blob, "mime": mime, "error": err} + cached_cover = cover_cache[release_mbid] + cover_art_bytes = cached_cover.get("blob") + cover_art_mime = str(cached_cover.get("mime") or "image/jpeg") + if cover_art_bytes: + cover_art_info["mime"] = cover_art_mime + cover_art_info["source"] = f"https://coverartarchive.org/release/{release_mbid}/front" + else: + cover_art_info["error"] = cached_cover.get("error") or "cover art not found" + + tag_write = {"written": False, "error": None} + if write_tags_flag: + tag_write = write_tags(path, preview, cover_art_bytes=cover_art_bytes, cover_art_mime=cover_art_mime) + if tag_write.get("written") and tag_write.get("cover_art_embedded"): + cover_art_info["embedded"] = True + + return FileResult( + file=str(path), + musicbrainz_recording_id=recording_id, + musicbrainz_recording=recording, + metadata_preview=asdict(preview), + cover_art=cover_art_info, + tag_write=tag_write, + error=None, + ) + except Exception as exc: + return FileResult( + file=str(path), + musicbrainz_recording_id=recording_id, + musicbrainz_recording=None, + metadata_preview=None, + cover_art=None, + tag_write={"written": False, "error": None}, + error=str(exc), + ) + + +def build_report(scan_payload: Dict[str, Any], results: Sequence[FileResult]) -> Dict[str, Any]: + return { + "directory": scan_payload.get("directory", ""), + "source_scan": scan_payload, + "files": [asdict(result) for result in results], + } + + +def main(argv: Optional[Sequence[str]] = None) -> int: + parser = argparse.ArgumentParser( + description="Resolve MusicBrainz recording details for MP3s using acoustid_album_scan.py output and optionally update ID3 tags.", + ) + parser.add_argument("path", help="Directory containing MP3 files") + parser.add_argument("--scan-json", help="Optional JSON output produced by acoustid_album_scan.py. If omitted, the script runs acoustid_album_scan.py itself.") + parser.add_argument("--output", default="", help="Write the JSON report to this file") + parser.add_argument("--json", action="store_true", help="Print JSON to stdout") + parser.add_argument("--write-tags", action="store_true", help="Actually write MP3 tags; default is preview-only") + parser.add_argument("--no-recursive", action="store_true", help="Do not recurse into subdirectories") + parser.add_argument("--mb-delay", type=float, default=0.0, help="Delay in seconds between MusicBrainz requests") + args = parser.parse_args(argv) + + root = Path(args.path).expanduser().resolve() + scan_json = Path(args.scan_json).expanduser().resolve() if args.scan_json else None + if not root.exists(): + print(f"Path not found: {root}", file=sys.stderr) + return 2 + if scan_json is not None: + if not scan_json.exists(): + print(f"Scan JSON not found: {scan_json}", file=sys.stderr) + return 2 + scan_payload = json.loads(scan_json.read_text(encoding="utf-8")) + else: + scan_payload = run_album_scan(root, recursive=not args.no_recursive) + id_map = build_recording_id_map(scan_payload) + files = iter_mp3_files(root, recursive=not args.no_recursive) + + cache: Dict[str, Dict[str, Any]] = {} + cover_cache: Dict[str, Dict[str, Any]] = {} + results: List[FileResult] = [] + + for index, path in enumerate(files, start=1): + print(f"[{index}/{len(files)}] {path}", file=sys.stderr) + recording_id = id_map.get(str(path)) + result = process_file(path, recording_id, args.write_tags, cache, cover_cache) + results.append(result) + if args.mb_delay > 0 and index < len(files): + time.sleep(args.mb_delay) + + report = build_report(scan_payload, results) + if args.output: + Path(args.output).expanduser().resolve().write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") + + rendered = json.dumps(report, ensure_ascii=False, indent=2) + if args.json or not args.output: + print(rendered) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())