#!/usr/bin/env python3 """Traverse a directory of MP3 files, fingerprint each file, look it up via AcoustID, then fetch MusicBrainz recording details. Usage: python mp3_acoustid_musicbrainz_lookup.py python mp3_acoustid_musicbrainz_lookup.py --output result.json python mp3_acoustid_musicbrainz_lookup.py --write-tags python mp3_acoustid_musicbrainz_lookup.py --write-tags --output result.json What it does for each MP3: 1. Generate duration + fingerprint with acoustid.fingerprint_file() 2. Call AcoustID lookup API 3. Parse MusicBrainz recording MBIDs from the AcoustID response 4. Fetch MusicBrainz recording detail JSON for each MBID 5. Optionally write title/artist/album back into the MP3 tags with mutagen Output behavior: - Prints duration & fingerprint for each file - Prints the AcoustID response for each file - Prints the MusicBrainz recording details for each file - If --write-tags is enabled, prints the tag update result - Writes a final summary JSON to stdout, or to --output if specified Notes: - Traversal is recursive by default. - MusicBrainz requests are rate-limited with a small delay by default. - Metadata updates use TIT2 (title), TPE1 (artist), and TALB (album). - If no suitable metadata is found, the file is left unchanged. """ from __future__ import annotations import argparse import json import sys import time import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass, asdict from pathlib import Path from typing import Any, Iterable import acoustid from mutagen.id3 import ID3, TALB, TPE1, TIT2 from mutagen.mp3 import MP3 ACOUSTID_CLIENT_ID = "JIvtbG79eAg" ACOUSTID_LOOKUP_URL = "https://api.acoustid.org/v2/lookup" MUSICBRAINZ_RECORDING_URL = "https://musicbrainz.org/ws/2/recording/{mbid}?fmt=json" DEFAULT_USER_AGENT = "moss-mp3-lookup/1.0 (https://musicbrainz.org/doc/MusicBrainz_API)" @dataclass class RecordingDetail: mbid: str data: dict[str, Any] | None = None error: str | None = None @dataclass class FileResult: file: str duration: int | None = None fingerprint: str | None = None acoustid: dict[str, Any] | None = None recordings: list[dict[str, Any]] | None = None recording_details: list[dict[str, Any]] | None = None error: str | None = None def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Fingerprint MP3 files and fetch AcoustID / MusicBrainz metadata." ) parser.add_argument("directory", help="Root directory to scan for MP3 files") parser.add_argument( "--no-recursive", action="store_true", help="Only scan the top-level directory", ) parser.add_argument( "--maxlength", type=int, default=120, help="Maximum seconds used by acoustid.fingerprint_file() (default: 120)", ) parser.add_argument( "--mb-delay", type=float, default=1.1, help="Delay in seconds between MusicBrainz detail requests (default: 1.1)", ) parser.add_argument( "--output", default=None, help="Optional output file path. Default: stdout", ) parser.add_argument( "--write-tags", action="store_true", help="Write title/artist/album back into each MP3 using mutagen", ) return parser.parse_args() def iter_mp3_files(root: Path, recursive: bool = True) -> Iterable[Path]: if recursive: for path in sorted(root.rglob("*")): if path.is_file() and path.suffix.lower() == ".mp3": yield path else: for path in sorted(root.iterdir()): if path.is_file() and path.suffix.lower() == ".mp3": yield path def fingerprint_mp3(path: Path, maxlength: int) -> tuple[int, str]: duration, fingerprint = acoustid.fingerprint_file(str(path), maxlength=maxlength) if isinstance(fingerprint, (bytes, bytearray)): fingerprint = fingerprint.decode("ascii", errors="strict") return int(duration), str(fingerprint) def http_get_json(url: str, headers: dict[str, str] | None = None, timeout: int = 60) -> dict[str, Any]: req = urllib.request.Request(url, headers=headers or {}) try: with urllib.request.urlopen(req, timeout=timeout) as resp: payload = resp.read().decode("utf-8", errors="replace") return json.loads(payload) except urllib.error.HTTPError as e: body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else "" raise RuntimeError(f"HTTP {e.code} {e.reason}: {body[:500]}") from e except urllib.error.URLError as e: raise RuntimeError(f"network error: {e}") from e def acoustid_lookup(duration: int, fingerprint: str) -> dict[str, Any]: query = urllib.parse.urlencode( { "client": ACOUSTID_CLIENT_ID, "meta": "recordings releasegroups compress", "duration": str(duration), "fingerprint": fingerprint, } ) url = f"{ACOUSTID_LOOKUP_URL}?{query}" return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT}) def extract_recordings(lookup_json: dict[str, Any]) -> list[dict[str, Any]]: recordings: list[dict[str, Any]] = [] for result in lookup_json.get("results", []): if not isinstance(result, dict): continue match_score = result.get("score") for rec in result.get("recordings", []) or []: if isinstance(rec, dict): rec_copy = dict(rec) rec_copy["_match_score"] = match_score recordings.append(rec_copy) return recordings def choose_best_recording(recordings: list[dict[str, Any]]) -> dict[str, Any] | None: """Choose the most trustworthy recording. Preference order: 1. Highest AcoustID result score 2. If tied or missing, keep the first candidate encountered """ best: dict[str, Any] | None = None best_score = float("-inf") for rec in recordings: score = rec.get("_match_score") try: score_f = float(score) except (TypeError, ValueError): score_f = float("-inf") if best is None or score_f > best_score: best = rec best_score = score_f return best def fetch_musicbrainz_recording(mbid: str) -> dict[str, Any]: url = MUSICBRAINZ_RECORDING_URL.format(mbid=urllib.parse.quote(mbid)) return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT}) def choose_text(value: Any) -> str | None: if isinstance(value, list): if not value: return None return choose_text(value[0]) if value is None: return None return str(value) def extract_mb_metadata(detail_json: dict[str, Any], fallback_rec: dict[str, Any] | None = None) -> dict[str, str | None]: """Extract the tag values we want to write back. Priority: 1. Use the chosen recording from AcoustID (highest score) 2. Prefer these fields from that recording entry: - recordings[0].title -> song title - recordings[0].artists[0].name -> artist - recordings[0].releasegroups[0].title -> album 3. If any of those are missing, fall back to MusicBrainz recording detail fields. """ title: str | None = None artist: str | None = None album: str | None = None if fallback_rec: title = choose_text(fallback_rec.get("title")) artists = fallback_rec.get("artists") if isinstance(artists, list) and artists: first_artist = artists[0] if isinstance(first_artist, dict): artist = choose_text(first_artist.get("name")) releasegroups = fallback_rec.get("releasegroups") if isinstance(releasegroups, list) and releasegroups: first_rg = releasegroups[0] if isinstance(first_rg, dict): album = choose_text(first_rg.get("title")) if not title: title = choose_text(detail_json.get("title")) if not artist: artist_credit = detail_json.get("artist-credit") or detail_json.get("artist_credit") if isinstance(artist_credit, list): parts: list[str] = [] for item in artist_credit: if isinstance(item, dict) and item.get("name"): parts.append(str(item["name"])) elif isinstance(item, str): parts.append(item) if parts: artist = "".join(parts).strip() if not album: release_list = detail_json.get("releases") if isinstance(release_list, list) and release_list: first_release = release_list[0] if isinstance(first_release, dict): album = choose_text(first_release.get("title")) return {"title": title, "artist": artist, "album": album} def update_metadata(file_path: Path, title: str, artist: str, album: str) -> None: audio = MP3(str(file_path), ID3=ID3) audio["TIT2"] = TIT2(encoding=3, text=title) audio["TPE1"] = TPE1(encoding=3, text=artist) audio["TALB"] = TALB(encoding=3, text=album) audio.save() def print_section(title: str, payload: Any) -> None: print(f"\n=== {title} ===") print(json.dumps(payload, ensure_ascii=False, indent=2)) def process_file( path: Path, maxlength: int, mb_delay: float, mb_cache: dict[str, dict[str, Any]], write_tags: bool, ) -> FileResult: result = FileResult(file=str(path)) try: duration, fingerprint = fingerprint_mp3(path, maxlength=maxlength) result.duration = duration result.fingerprint = fingerprint print_section( f"{path} / duration & fingerprint", {"file": str(path), "duration": duration, "fingerprint": fingerprint}, ) except Exception as e: result.error = f"fingerprint failed: {e}" print_section(f"{path} / duration & fingerprint", {"file": str(path), "error": result.error}) return result try: lookup_json = acoustid_lookup(result.duration, result.fingerprint) result.acoustid = lookup_json print_section(f"{path} / AcoustID response", lookup_json) except Exception as e: result.error = f"acoustid lookup failed: {e}" print_section(f"{path} / AcoustID response", {"file": str(path), "error": result.error}) return result recordings = extract_recordings(result.acoustid) result.recordings = recordings details: list[dict[str, Any]] = [] chosen_metadata: dict[str, str | None] | None = None best_recording = choose_best_recording(recordings) best_recording_detail: dict[str, Any] | None = None for rec in recordings: mbid = rec.get("id") if not mbid: continue if mbid in mb_cache: detail_json = mb_cache[mbid] else: try: detail_json = fetch_musicbrainz_recording(mbid) mb_cache[mbid] = detail_json if mb_delay > 0: time.sleep(mb_delay) except Exception as e: detail_json = {"mbid": mbid, "error": str(e)} mb_cache[mbid] = detail_json if mb_delay > 0: time.sleep(mb_delay) details.append(detail_json) if best_recording is not None and mbid == best_recording.get("id") and isinstance(detail_json, dict) and "error" not in detail_json: best_recording_detail = detail_json if chosen_metadata is None and isinstance(detail_json, dict) and "error" not in detail_json: chosen_metadata = extract_mb_metadata(detail_json, fallback_rec=rec) if best_recording_detail is not None: chosen_metadata = extract_mb_metadata(best_recording_detail, fallback_rec=best_recording) result.recording_details = details print_section(f"{path} / MusicBrainz recording details", details) if write_tags and chosen_metadata: title = chosen_metadata.get("title") or path.stem artist = chosen_metadata.get("artist") or "" album = chosen_metadata.get("album") or "" try: update_metadata(path, title=title, artist=artist, album=album) print_section( f"{path} / metadata updated", {"title": title, "artist": artist, "album": album}, ) except Exception as e: print_section(f"{path} / metadata update failed", {"file": str(path), "error": str(e)}) return result def main() -> int: args = parse_args() root = Path(args.directory).expanduser().resolve() if not root.exists() or not root.is_dir(): print(f"[error] directory not found: {root}", file=sys.stderr) return 2 mb_cache: dict[str, dict[str, Any]] = {} items: list[dict[str, Any]] = [] for mp3_path in iter_mp3_files(root, recursive=not args.no_recursive): item = process_file( mp3_path, maxlength=args.maxlength, mb_delay=args.mb_delay, mb_cache=mb_cache, write_tags=args.write_tags, ) items.append(asdict(item)) output = { "root": str(root), "count": len(items), "items": items, } text = json.dumps(output, ensure_ascii=False, indent=2) if args.output: out_path = Path(args.output).expanduser().resolve() out_path.write_text(text, encoding="utf-8") else: print(text) return 0 if __name__ == "__main__": raise SystemExit(main())