395 lines
13 KiB
Python
395 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Traverse a directory of MP3 files, fingerprint each file, look it up via
|
|
AcoustID, then fetch MusicBrainz recording details.
|
|
|
|
Usage:
|
|
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir>
|
|
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --output result.json
|
|
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --write-tags
|
|
python mp3_acoustid_musicbrainz_lookup.py <mp3_dir> --write-tags --output result.json
|
|
|
|
What it does for each MP3:
|
|
1. Generate duration + fingerprint with acoustid.fingerprint_file()
|
|
2. Call AcoustID lookup API
|
|
3. Parse MusicBrainz recording MBIDs from the AcoustID response
|
|
4. Fetch MusicBrainz recording detail JSON for each MBID
|
|
5. Optionally write title/artist/album back into the MP3 tags with mutagen
|
|
|
|
Output behavior:
|
|
- Prints duration & fingerprint for each file
|
|
- Prints the AcoustID response for each file
|
|
- Prints the MusicBrainz recording details for each file
|
|
- If --write-tags is enabled, prints the tag update result
|
|
- Writes a final summary JSON to stdout, or to --output if specified
|
|
|
|
Notes:
|
|
- Traversal is recursive by default.
|
|
- MusicBrainz requests are rate-limited with a small delay by default.
|
|
- Metadata updates use TIT2 (title), TPE1 (artist), and TALB (album).
|
|
- If no suitable metadata is found, the file is left unchanged.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
import acoustid
|
|
from mutagen.id3 import ID3, TALB, TPE1, TIT2
|
|
from mutagen.mp3 import MP3
|
|
|
|
ACOUSTID_CLIENT_ID = "JIvtbG79eAg"
|
|
ACOUSTID_LOOKUP_URL = "https://api.acoustid.org/v2/lookup"
|
|
MUSICBRAINZ_RECORDING_URL = "https://musicbrainz.org/ws/2/recording/{mbid}?fmt=json"
|
|
DEFAULT_USER_AGENT = "moss-mp3-lookup/1.0 (https://musicbrainz.org/doc/MusicBrainz_API)"
|
|
|
|
|
|
@dataclass
|
|
class RecordingDetail:
|
|
mbid: str
|
|
data: dict[str, Any] | None = None
|
|
error: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class FileResult:
|
|
file: str
|
|
duration: int | None = None
|
|
fingerprint: str | None = None
|
|
acoustid: dict[str, Any] | None = None
|
|
recordings: list[dict[str, Any]] | None = None
|
|
recording_details: list[dict[str, Any]] | None = None
|
|
error: str | None = None
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Fingerprint MP3 files and fetch AcoustID / MusicBrainz metadata."
|
|
)
|
|
parser.add_argument("directory", help="Root directory to scan for MP3 files")
|
|
parser.add_argument(
|
|
"--no-recursive",
|
|
action="store_true",
|
|
help="Only scan the top-level directory",
|
|
)
|
|
parser.add_argument(
|
|
"--maxlength",
|
|
type=int,
|
|
default=120,
|
|
help="Maximum seconds used by acoustid.fingerprint_file() (default: 120)",
|
|
)
|
|
parser.add_argument(
|
|
"--mb-delay",
|
|
type=float,
|
|
default=1.1,
|
|
help="Delay in seconds between MusicBrainz detail requests (default: 1.1)",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
default=None,
|
|
help="Optional output file path. Default: stdout",
|
|
)
|
|
parser.add_argument(
|
|
"--write-tags",
|
|
action="store_true",
|
|
help="Write title/artist/album back into each MP3 using mutagen",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def iter_mp3_files(root: Path, recursive: bool = True) -> Iterable[Path]:
|
|
if recursive:
|
|
for path in sorted(root.rglob("*")):
|
|
if path.is_file() and path.suffix.lower() == ".mp3":
|
|
yield path
|
|
else:
|
|
for path in sorted(root.iterdir()):
|
|
if path.is_file() and path.suffix.lower() == ".mp3":
|
|
yield path
|
|
|
|
|
|
def fingerprint_mp3(path: Path, maxlength: int) -> tuple[int, str]:
|
|
duration, fingerprint = acoustid.fingerprint_file(str(path), maxlength=maxlength)
|
|
if isinstance(fingerprint, (bytes, bytearray)):
|
|
fingerprint = fingerprint.decode("ascii", errors="strict")
|
|
return int(duration), str(fingerprint)
|
|
|
|
|
|
def http_get_json(url: str, headers: dict[str, str] | None = None, timeout: int = 60) -> dict[str, Any]:
|
|
req = urllib.request.Request(url, headers=headers or {})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
payload = resp.read().decode("utf-8", errors="replace")
|
|
return json.loads(payload)
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else ""
|
|
raise RuntimeError(f"HTTP {e.code} {e.reason}: {body[:500]}") from e
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"network error: {e}") from e
|
|
|
|
|
|
def acoustid_lookup(duration: int, fingerprint: str) -> dict[str, Any]:
|
|
query = urllib.parse.urlencode(
|
|
{
|
|
"client": ACOUSTID_CLIENT_ID,
|
|
"meta": "recordings releasegroups compress",
|
|
"duration": str(duration),
|
|
"fingerprint": fingerprint,
|
|
}
|
|
)
|
|
url = f"{ACOUSTID_LOOKUP_URL}?{query}"
|
|
return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT})
|
|
|
|
|
|
def extract_recordings(lookup_json: dict[str, Any]) -> list[dict[str, Any]]:
|
|
recordings: list[dict[str, Any]] = []
|
|
for result in lookup_json.get("results", []):
|
|
if not isinstance(result, dict):
|
|
continue
|
|
match_score = result.get("score")
|
|
for rec in result.get("recordings", []) or []:
|
|
if isinstance(rec, dict):
|
|
rec_copy = dict(rec)
|
|
rec_copy["_match_score"] = match_score
|
|
recordings.append(rec_copy)
|
|
return recordings
|
|
|
|
|
|
def choose_best_recording(recordings: list[dict[str, Any]]) -> dict[str, Any] | None:
|
|
"""Choose the most trustworthy recording.
|
|
|
|
Preference order:
|
|
1. Highest AcoustID result score
|
|
2. If tied or missing, keep the first candidate encountered
|
|
"""
|
|
|
|
best: dict[str, Any] | None = None
|
|
best_score = float("-inf")
|
|
for rec in recordings:
|
|
score = rec.get("_match_score")
|
|
try:
|
|
score_f = float(score)
|
|
except (TypeError, ValueError):
|
|
score_f = float("-inf")
|
|
if best is None or score_f > best_score:
|
|
best = rec
|
|
best_score = score_f
|
|
return best
|
|
|
|
|
|
def fetch_musicbrainz_recording(mbid: str) -> dict[str, Any]:
|
|
url = MUSICBRAINZ_RECORDING_URL.format(mbid=urllib.parse.quote(mbid))
|
|
return http_get_json(url, headers={"User-Agent": DEFAULT_USER_AGENT})
|
|
|
|
|
|
def choose_text(value: Any) -> str | None:
|
|
if isinstance(value, list):
|
|
if not value:
|
|
return None
|
|
return choose_text(value[0])
|
|
if value is None:
|
|
return None
|
|
return str(value)
|
|
|
|
|
|
def extract_mb_metadata(detail_json: dict[str, Any], fallback_rec: dict[str, Any] | None = None) -> dict[str, str | None]:
|
|
"""Extract the tag values we want to write back.
|
|
|
|
Priority:
|
|
1. Use the chosen recording from AcoustID (highest score)
|
|
2. Prefer these fields from that recording entry:
|
|
- recordings[0].title -> song title
|
|
- recordings[0].artists[0].name -> artist
|
|
- recordings[0].releasegroups[0].title -> album
|
|
3. If any of those are missing, fall back to MusicBrainz recording detail fields.
|
|
"""
|
|
title: str | None = None
|
|
artist: str | None = None
|
|
album: str | None = None
|
|
|
|
if fallback_rec:
|
|
title = choose_text(fallback_rec.get("title"))
|
|
|
|
artists = fallback_rec.get("artists")
|
|
if isinstance(artists, list) and artists:
|
|
first_artist = artists[0]
|
|
if isinstance(first_artist, dict):
|
|
artist = choose_text(first_artist.get("name"))
|
|
|
|
releasegroups = fallback_rec.get("releasegroups")
|
|
if isinstance(releasegroups, list) and releasegroups:
|
|
first_rg = releasegroups[0]
|
|
if isinstance(first_rg, dict):
|
|
album = choose_text(first_rg.get("title"))
|
|
|
|
if not title:
|
|
title = choose_text(detail_json.get("title"))
|
|
|
|
if not artist:
|
|
artist_credit = detail_json.get("artist-credit") or detail_json.get("artist_credit")
|
|
if isinstance(artist_credit, list):
|
|
parts: list[str] = []
|
|
for item in artist_credit:
|
|
if isinstance(item, dict) and item.get("name"):
|
|
parts.append(str(item["name"]))
|
|
elif isinstance(item, str):
|
|
parts.append(item)
|
|
if parts:
|
|
artist = "".join(parts).strip()
|
|
|
|
if not album:
|
|
release_list = detail_json.get("releases")
|
|
if isinstance(release_list, list) and release_list:
|
|
first_release = release_list[0]
|
|
if isinstance(first_release, dict):
|
|
album = choose_text(first_release.get("title"))
|
|
|
|
return {"title": title, "artist": artist, "album": album}
|
|
|
|
|
|
def update_metadata(file_path: Path, title: str, artist: str, album: str) -> None:
|
|
audio = MP3(str(file_path), ID3=ID3)
|
|
audio["TIT2"] = TIT2(encoding=3, text=title)
|
|
audio["TPE1"] = TPE1(encoding=3, text=artist)
|
|
audio["TALB"] = TALB(encoding=3, text=album)
|
|
audio.save()
|
|
|
|
|
|
def print_section(title: str, payload: Any) -> None:
|
|
print(f"\n=== {title} ===")
|
|
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
|
|
|
|
|
def process_file(
|
|
path: Path,
|
|
maxlength: int,
|
|
mb_delay: float,
|
|
mb_cache: dict[str, dict[str, Any]],
|
|
write_tags: bool,
|
|
) -> FileResult:
|
|
result = FileResult(file=str(path))
|
|
try:
|
|
duration, fingerprint = fingerprint_mp3(path, maxlength=maxlength)
|
|
result.duration = duration
|
|
result.fingerprint = fingerprint
|
|
print_section(
|
|
f"{path} / duration & fingerprint",
|
|
{"file": str(path), "duration": duration, "fingerprint": fingerprint},
|
|
)
|
|
except Exception as e:
|
|
result.error = f"fingerprint failed: {e}"
|
|
print_section(f"{path} / duration & fingerprint", {"file": str(path), "error": result.error})
|
|
return result
|
|
|
|
try:
|
|
lookup_json = acoustid_lookup(result.duration, result.fingerprint)
|
|
result.acoustid = lookup_json
|
|
print_section(f"{path} / AcoustID response", lookup_json)
|
|
except Exception as e:
|
|
result.error = f"acoustid lookup failed: {e}"
|
|
print_section(f"{path} / AcoustID response", {"file": str(path), "error": result.error})
|
|
return result
|
|
|
|
recordings = extract_recordings(result.acoustid)
|
|
result.recordings = recordings
|
|
|
|
details: list[dict[str, Any]] = []
|
|
chosen_metadata: dict[str, str | None] | None = None
|
|
best_recording = choose_best_recording(recordings)
|
|
best_recording_detail: dict[str, Any] | None = None
|
|
|
|
for rec in recordings:
|
|
mbid = rec.get("id")
|
|
if not mbid:
|
|
continue
|
|
|
|
if mbid in mb_cache:
|
|
detail_json = mb_cache[mbid]
|
|
else:
|
|
try:
|
|
detail_json = fetch_musicbrainz_recording(mbid)
|
|
mb_cache[mbid] = detail_json
|
|
if mb_delay > 0:
|
|
time.sleep(mb_delay)
|
|
except Exception as e:
|
|
detail_json = {"mbid": mbid, "error": str(e)}
|
|
mb_cache[mbid] = detail_json
|
|
if mb_delay > 0:
|
|
time.sleep(mb_delay)
|
|
|
|
details.append(detail_json)
|
|
if best_recording is not None and mbid == best_recording.get("id") and isinstance(detail_json, dict) and "error" not in detail_json:
|
|
best_recording_detail = detail_json
|
|
|
|
if chosen_metadata is None and isinstance(detail_json, dict) and "error" not in detail_json:
|
|
chosen_metadata = extract_mb_metadata(detail_json, fallback_rec=rec)
|
|
|
|
if best_recording_detail is not None:
|
|
chosen_metadata = extract_mb_metadata(best_recording_detail, fallback_rec=best_recording)
|
|
|
|
result.recording_details = details
|
|
print_section(f"{path} / MusicBrainz recording details", details)
|
|
|
|
if write_tags and chosen_metadata:
|
|
title = chosen_metadata.get("title") or path.stem
|
|
artist = chosen_metadata.get("artist") or ""
|
|
album = chosen_metadata.get("album") or ""
|
|
try:
|
|
update_metadata(path, title=title, artist=artist, album=album)
|
|
print_section(
|
|
f"{path} / metadata updated",
|
|
{"title": title, "artist": artist, "album": album},
|
|
)
|
|
except Exception as e:
|
|
print_section(f"{path} / metadata update failed", {"file": str(path), "error": str(e)})
|
|
|
|
return result
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
root = Path(args.directory).expanduser().resolve()
|
|
if not root.exists() or not root.is_dir():
|
|
print(f"[error] directory not found: {root}", file=sys.stderr)
|
|
return 2
|
|
|
|
mb_cache: dict[str, dict[str, Any]] = {}
|
|
items: list[dict[str, Any]] = []
|
|
|
|
for mp3_path in iter_mp3_files(root, recursive=not args.no_recursive):
|
|
item = process_file(
|
|
mp3_path,
|
|
maxlength=args.maxlength,
|
|
mb_delay=args.mb_delay,
|
|
mb_cache=mb_cache,
|
|
write_tags=args.write_tags,
|
|
)
|
|
items.append(asdict(item))
|
|
|
|
output = {
|
|
"root": str(root),
|
|
"count": len(items),
|
|
"items": items,
|
|
}
|
|
text = json.dumps(output, ensure_ascii=False, indent=2)
|
|
|
|
if args.output:
|
|
out_path = Path(args.output).expanduser().resolve()
|
|
out_path.write_text(text, encoding="utf-8")
|
|
else:
|
|
print(text)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|