Files
moss/mp3_acoustid_musicbrainz_lookup.py
2026-05-22 10:51:18 +08:00

395 lines
13 KiB
Python

#!/usr/bin/env python3
"""Traverse a directory of MP3 files, fingerprint each file, look it up via
AcoustID, then fetch MusicBrainz recording details.
Usage:
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir>
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --output result.json
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --write-tags
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --write-tags --output result.json
What it does for each MP3:
1. Generate duration + fingerprint with acoustid.fingerprint_file()
2. Call AcoustID lookup API
3. Parse MusicBrainz recording MBIDs from the AcoustID response
4. Fetch MusicBrainz recording detail JSON for each MBID
5. Optionally write title/artist/album back into the MP3 tags with mutagen
Output behavior:
- Prints duration & fingerprint for each file
- Prints the AcoustID response for each file
- Prints the MusicBrainz recording details for each file
- If --write-tags is enabled, prints the tag update result
- Writes a final summary JSON to stdout, or to --output if specified
Notes:
- Traversal is recursive by default.
- MusicBrainz requests are rate-limited with a small delay by default.
- Metadata updates use TIT2 (title), TPE1 (artist), and TALB (album).
- If no suitable metadata is found, the file is left unchanged.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Iterable
import acoustid
from mutagen.id3 import ID3, TALB, TPE1, TIT2
from mutagen.mp3 import MP3
ACOUSTID_CLIENT_ID = "JIvtbG79eAg"
ACOUSTID_LOOKUP_URL = "https://api.acoustid.org/v2/lookup"
MUSICBRAINZ_RECORDING_URL = "https://musicbrainz.org/ws/2/recording/{mbid}?fmt=json"
DEFAULT_USER_AGENT = "moss-mp3-lookup/1.0 (https://musicbrainz.org/doc/MusicBrainz_API)"
@dataclass
class RecordingDetail:
mbid: str
data: dict[str, Any] | None = None
error: str | None = None
@dataclass
class FileResult:
file: str
duration: int | None = None
fingerprint: str | None = None
acoustid: dict[str, Any] | None = None
recordings: list[dict[str, Any]] | None = None
recording_details: list[dict[str, Any]] | None = None
error: str | None = None
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Fingerprint MP3 files and fetch AcoustID / MusicBrainz metadata."
)
parser.add_argument("directory", help="Root directory to scan for MP3 files")
parser.add_argument(
"--no-recursive",
action="store_true",
help="Only scan the top-level directory",
)
parser.add_argument(
"--maxlength",
type=int,
default=120,
help="Maximum seconds used by acoustid.fingerprint_file() (default: 120)",
)
parser.add_argument(
"--mb-delay",
type=float,
default=1.1,
help="Delay in seconds between MusicBrainz detail requests (default: 1.1)",
)
parser.add_argument(
"--output",
default=None,
help="Optional output file path. Default: stdout",
)
parser.add_argument(
"--write-tags",
action="store_true",
help="Write title/artist/album back into each MP3 using mutagen",
)
return parser.parse_args()
def iter_mp3_files(root: Path, recursive: bool = True) -> Iterable[Path]:
if recursive:
for path in sorted(root.rglob("*")):
if path.is_file() and path.suffix.lower() == ".mp3":
yield path
else:
for path in sorted(root.iterdir()):
if path.is_file() and path.suffix.lower() == ".mp3":
yield path
def fingerprint_mp3(path: Path, maxlength: int) -> tuple[int, str]:
duration, fingerprint = acoustid.fingerprint_file(str(path), maxlength=maxlength)
if isinstance(fingerprint, (bytes, bytearray)):
fingerprint = fingerprint.decode("ascii", errors="strict")
return int(duration), str(fingerprint)
def http_get_json(url: str, headers: dict[str, str] | None = None, timeout: int = 60) -> dict[str, Any]:
req = urllib.request.Request(url, headers=headers or {})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = resp.read().decode("utf-8", errors="replace")
return json.loads(payload)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else ""
raise RuntimeError(f"HTTP {e.code} {e.reason}: {body[:500]}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"network error: {e}") from e
def acoustid_lookup(duration: int, fingerprint: str) -> dict[str, Any]:
query = urllib.parse.urlencode(
{
"client": ACOUSTID_CLIENT_ID,
"meta": "recordings releasegroups compress",
"duration": str(duration),
"fingerprint": fingerprint,
}
)
url = f"{ACOUSTID_LOOKUP_URL}?{query}"
return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT})
def extract_recordings(lookup_json: dict[str, Any]) -> list[dict[str, Any]]:
recordings: list[dict[str, Any]] = []
for result in lookup_json.get("results", []):
if not isinstance(result, dict):
continue
match_score = result.get("score")
for rec in result.get("recordings", []) or []:
if isinstance(rec, dict):
rec_copy = dict(rec)
rec_copy["_match_score"] = match_score
recordings.append(rec_copy)
return recordings
def choose_best_recording(recordings: list[dict[str, Any]]) -> dict[str, Any] | None:
"""Choose the most trustworthy recording.
Preference order:
1. Highest AcoustID result score
2. If tied or missing, keep the first candidate encountered
"""
best: dict[str, Any] | None = None
best_score = float("-inf")
for rec in recordings:
score = rec.get("_match_score")
try:
score_f = float(score)
except (TypeError, ValueError):
score_f = float("-inf")
if best is None or score_f > best_score:
best = rec
best_score = score_f
return best
def fetch_musicbrainz_recording(mbid: str) -> dict[str, Any]:
url = MUSICBRAINZ_RECORDING_URL.format(mbid=urllib.parse.quote(mbid))
return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT})
def choose_text(value: Any) -> str | None:
if isinstance(value, list):
if not value:
return None
return choose_text(value[0])
if value is None:
return None
return str(value)
def extract_mb_metadata(detail_json: dict[str, Any], fallback_rec: dict[str, Any] | None = None) -> dict[str, str | None]:
"""Extract the tag values we want to write back.
Priority:
1. Use the chosen recording from AcoustID (highest score)
2. Prefer these fields from that recording entry:
- recordings[0].title -> song title
- recordings[0].artists[0].name -> artist
- recordings[0].releasegroups[0].title -> album
3. If any of those are missing, fall back to MusicBrainz recording detail fields.
"""
title: str | None = None
artist: str | None = None
album: str | None = None
if fallback_rec:
title = choose_text(fallback_rec.get("title"))
artists = fallback_rec.get("artists")
if isinstance(artists, list) and artists:
first_artist = artists[0]
if isinstance(first_artist, dict):
artist = choose_text(first_artist.get("name"))
releasegroups = fallback_rec.get("releasegroups")
if isinstance(releasegroups, list) and releasegroups:
first_rg = releasegroups[0]
if isinstance(first_rg, dict):
album = choose_text(first_rg.get("title"))
if not title:
title = choose_text(detail_json.get("title"))
if not artist:
artist_credit = detail_json.get("artist-credit") or detail_json.get("artist_credit")
if isinstance(artist_credit, list):
parts: list[str] = []
for item in artist_credit:
if isinstance(item, dict) and item.get("name"):
parts.append(str(item["name"]))
elif isinstance(item, str):
parts.append(item)
if parts:
artist = "".join(parts).strip()
if not album:
release_list = detail_json.get("releases")
if isinstance(release_list, list) and release_list:
first_release = release_list[0]
if isinstance(first_release, dict):
album = choose_text(first_release.get("title"))
return {"title": title, "artist": artist, "album": album}
def update_metadata(file_path: Path, title: str, artist: str, album: str) -> None:
audio = MP3(str(file_path), ID3=ID3)
audio["TIT2"] = TIT2(encoding=3, text=title)
audio["TPE1"] = TPE1(encoding=3, text=artist)
audio["TALB"] = TALB(encoding=3, text=album)
audio.save()
def print_section(title: str, payload: Any) -> None:
print(f"\n=== {title} ===")
print(json.dumps(payload, ensure_ascii=False, indent=2))
def process_file(
path: Path,
maxlength: int,
mb_delay: float,
mb_cache: dict[str, dict[str, Any]],
write_tags: bool,
) -> FileResult:
result = FileResult(file=str(path))
try:
duration, fingerprint = fingerprint_mp3(path, maxlength=maxlength)
result.duration = duration
result.fingerprint = fingerprint
print_section(
f"{path} / duration & fingerprint",
{"file": str(path), "duration": duration, "fingerprint": fingerprint},
)
except Exception as e:
result.error = f"fingerprint failed: {e}"
print_section(f"{path} / duration & fingerprint", {"file": str(path), "error": result.error})
return result
try:
lookup_json = acoustid_lookup(result.duration, result.fingerprint)
result.acoustid = lookup_json
print_section(f"{path} / AcoustID response", lookup_json)
except Exception as e:
result.error = f"acoustid lookup failed: {e}"
print_section(f"{path} / AcoustID response", {"file": str(path), "error": result.error})
return result
recordings = extract_recordings(result.acoustid)
result.recordings = recordings
details: list[dict[str, Any]] = []
chosen_metadata: dict[str, str | None] | None = None
best_recording = choose_best_recording(recordings)
best_recording_detail: dict[str, Any] | None = None
for rec in recordings:
mbid = rec.get("id")
if not mbid:
continue
if mbid in mb_cache:
detail_json = mb_cache[mbid]
else:
try:
detail_json = fetch_musicbrainz_recording(mbid)
mb_cache[mbid] = detail_json
if mb_delay > 0:
time.sleep(mb_delay)
except Exception as e:
detail_json = {"mbid": mbid, "error": str(e)}
mb_cache[mbid] = detail_json
if mb_delay > 0:
time.sleep(mb_delay)
details.append(detail_json)
if best_recording is not None and mbid == best_recording.get("id") and isinstance(detail_json, dict) and "error" not in detail_json:
best_recording_detail = detail_json
if chosen_metadata is None and isinstance(detail_json, dict) and "error" not in detail_json:
chosen_metadata = extract_mb_metadata(detail_json, fallback_rec=rec)
if best_recording_detail is not None:
chosen_metadata = extract_mb_metadata(best_recording_detail, fallback_rec=best_recording)
result.recording_details = details
print_section(f"{path} / MusicBrainz recording details", details)
if write_tags and chosen_metadata:
title = chosen_metadata.get("title") or path.stem
artist = chosen_metadata.get("artist") or ""
album = chosen_metadata.get("album") or ""
try:
update_metadata(path, title=title, artist=artist, album=album)
print_section(
f"{path} / metadata updated",
{"title": title, "artist": artist, "album": album},
)
except Exception as e:
print_section(f"{path} / metadata update failed", {"file": str(path), "error": str(e)})
return result
def main() -> int:
args = parse_args()
root = Path(args.directory).expanduser().resolve()
if not root.exists() or not root.is_dir():
print(f"[error] directory not found: {root}", file=sys.stderr)
return 2
mb_cache: dict[str, dict[str, Any]] = {}
items: list[dict[str, Any]] = []
for mp3_path in iter_mp3_files(root, recursive=not args.no_recursive):
item = process_file(
mp3_path,
maxlength=args.maxlength,
mb_delay=args.mb_delay,
mb_cache=mb_cache,
write_tags=args.write_tags,
)
items.append(asdict(item))
output = {
"root": str(root),
"count": len(items),
"items": items,
}
text = json.dumps(output, ensure_ascii=False, indent=2)
if args.output:
out_path = Path(args.output).expanduser().resolve()
out_path.write_text(text, encoding="utf-8")
else:
print(text)
return 0
if __name__ == "__main__":
raise SystemExit(main())