Files
moss/acoustid_musicbrainz_tagger.py

488 lines
19 KiB
Python

#!/usr/bin/env python3
"""Enrich MP3s using MusicBrainz recording IDs from acoustid_album_scan.py.
Usage overview:
- Default: run acoustid_album_scan.py internally, read its JSON from stdout,
resolve each MP3's musicbrainz_recording_id, fetch MusicBrainz recording
details, and print a JSON report.
- Preview only: do not pass --write-tags. The script will fetch metadata and
show the tag preview without modifying files.
- Write tags: pass --write-tags to write title/artist/album plus Navidrome-friendly mapped tags (album artist/date/track/disc/genre) and embed album cover art into each MP3.
- Use existing scan JSON: pass --scan-json /path/to/scan.json if you already
have a saved acoustid_album_scan.py result and want to skip re-scanning.
- Control recursion: pass --no-recursive to limit processing to the top-level
directory only.
- Control pacing: pass --mb-delay <seconds> to slow down MusicBrainz requests.
- Save report: pass --output /path/to/report.json to write the final JSON to a
file.
- Stdout JSON: pass --json to print the final JSON report to stdout.
Examples:
- python acoustid_musicbrainz_tagger.py /path/to/album
- python acoustid_musicbrainz_tagger.py /path/to/album --write-tags
- python acoustid_musicbrainz_tagger.py /path/to/album --scan-json /tmp/scan.json --json
- python acoustid_musicbrainz_tagger.py /path/to/album --output report.json
This script is intentionally designed around temporary scan output: if
--scan-json is omitted, it invokes acoustid_album_scan.py directly and keeps the
scan JSON in memory rather than requiring a permanent intermediate file.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import subprocess
import requests
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
def iter_mp3_files(root: Path, recursive: bool = True) -> List[Path]:
if root.is_file():
return [root] if root.suffix.lower() == ".mp3" else []
if recursive:
return sorted(p for p in root.rglob("*.mp3") if p.is_file())
return sorted(p for p in root.glob("*.mp3") if p.is_file())
def run_album_scan(root: Path, recursive: bool = True) -> Dict[str, Any]:
scan_script = Path(__file__).with_name("acoustid_album_scan.py")
if not scan_script.exists():
raise RuntimeError(f"acoustid_album_scan.py not found next to this script: {scan_script}")
cmd = [sys.executable, str(scan_script), str(root), "--json"]
if not recursive:
cmd.append("--no-recursive")
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(
f"acoustid_album_scan.py failed with exit code {proc.returncode}: {proc.stderr.strip()}"
)
try:
return json.loads(proc.stdout)
except Exception as exc:
raise RuntimeError(f"failed to parse acoustid_album_scan.py JSON output: {exc}") from exc
def build_recording_id_map(scan_payload: Dict[str, Any]) -> Dict[str, str]:
mapping: Dict[str, str] = {}
majority = scan_payload.get("majority_album", {}) or {}
for item in majority.get("tracks", []) or []:
file_path = str(item.get("file") or "")
recording_id = str(item.get("musicbrainz_recording_id") or "")
if file_path and recording_id:
mapping[file_path] = recording_id
for track in scan_payload.get("tracks", []) or []:
file_path = str(track.get("file") or "")
best_guess = track.get("best_guess") or {}
recording_id = str(best_guess.get("musicbrainz_recording_id") or "")
if file_path and recording_id and file_path not in mapping:
mapping[file_path] = recording_id
return mapping
@dataclass
class MetadataPreview:
title: str
artist: str
album_artist: str
album: str
date: str
track_number: str
disc_number: str
genre: str
@dataclass
class FileResult:
file: str
musicbrainz_recording_id: Optional[str]
musicbrainz_recording: Optional[Dict[str, Any]]
metadata_preview: Optional[Dict[str, str]]
cover_art: Optional[Dict[str, Any]]
tag_write: Dict[str, Any]
error: Optional[str]
def _extract_artist_names(recording: Dict[str, Any]) -> List[str]:
names: List[str] = []
artist_credit = recording.get("artist-credit") or recording.get("artist_credit") or []
if isinstance(artist_credit, list):
for part in artist_credit:
if isinstance(part, dict):
artist = part.get("artist") or {}
if isinstance(artist, dict):
name = artist.get("name") or artist.get("sort-name")
if name:
names.append(str(name))
elif isinstance(part, str) and part.strip():
names.append(part.strip())
if not names:
artists = recording.get("artist-list") or recording.get("artists") or []
if isinstance(artists, list):
for artist in artists:
if isinstance(artist, dict):
name = artist.get("name") or artist.get("sort-name")
if name:
names.append(str(name))
# Deduplicate while preserving order.
return list(dict.fromkeys(n for n in names if n))
def _extract_album_title(recording: Dict[str, Any]) -> str:
releases = recording.get("release-list") or recording.get("releases") or []
if isinstance(releases, list):
for release in releases:
if isinstance(release, dict):
title = release.get("title")
if title:
return str(title)
return ""
def _extract_album_artist(recording: Dict[str, Any]) -> str:
release_list = recording.get("release-list") or recording.get("releases") or []
if isinstance(release_list, list):
for release in release_list:
if isinstance(release, dict):
credit = release.get("artist-credit") or release.get("artist_credit") or []
if isinstance(credit, list) and credit:
names: List[str] = []
for part in credit:
if isinstance(part, dict):
artist = part.get("artist") or {}
if isinstance(artist, dict):
name = artist.get("name") or artist.get("sort-name")
if name:
names.append(str(name))
elif isinstance(part, str) and part.strip():
names.append(part.strip())
if names:
return " / ".join(dict.fromkeys(names))
artist_name = release.get("artist-credit-name") or release.get("artist")
if isinstance(artist_name, dict):
artist_name = artist_name.get("name")
if artist_name:
return str(artist_name)
return ""
def _extract_release_info(recording: Dict[str, Any]) -> Dict[str, str]:
release_list = recording.get("release-list") or recording.get("releases") or []
if not isinstance(release_list, list):
return {}
for release in release_list:
if not isinstance(release, dict):
continue
data = {
"album": str(release.get("title") or ""),
"date": str(release.get("date") or release.get("first-release-date") or ""),
"track_number": "",
"disc_number": "",
"genre": "",
}
if release.get("medium-list"):
medium_list = release.get("medium-list") or []
if isinstance(medium_list, list) and medium_list:
medium = medium_list[0]
if isinstance(medium, dict):
data["disc_number"] = str(medium.get("position") or "")
track_list = medium.get("track-list") or []
if isinstance(track_list, list) and track_list:
track = track_list[0]
if isinstance(track, dict):
data["track_number"] = str(track.get("position") or "")
release_group = release.get("release-group") or {}
if isinstance(release_group, dict):
genres = release_group.get("genre-list") or release_group.get("genres") or []
if isinstance(genres, list) and genres:
first = genres[0]
if isinstance(first, dict):
data["genre"] = str(first.get("name") or first.get("genre") or "")
elif isinstance(first, str):
data["genre"] = first
return data
return {}
def build_metadata_preview(recording: Dict[str, Any]) -> MetadataPreview:
title = str(recording.get("title") or "")
artists = _extract_artist_names(recording)
artist = " / ".join(artists)
album_artist = _extract_album_artist(recording)
if not album_artist:
album_artist = artist
release_info = _extract_release_info(recording)
album = release_info.get("album") or _extract_album_title(recording)
return MetadataPreview(
title=title,
artist=artist,
album_artist=album_artist,
album=album,
date=release_info.get("date", ""),
track_number=release_info.get("track_number", ""),
disc_number=release_info.get("disc_number", ""),
genre=release_info.get("genre", ""),
)
def fetch_recording(recording_id: str) -> Dict[str, Any]:
try:
import musicbrainzngs
except Exception as exc: # pragma: no cover - dependency error path
raise RuntimeError(f"missing dependency 'musicbrainzngs': {exc}") from exc
musicbrainzngs.set_useragent("moss-acoustid-mb-tagger", "1.0", "shenwei@example.com")
result = musicbrainzngs.get_recording_by_id(
recording_id,
includes=["artists", "releases", "artist-credits"],
)
recording = result.get("recording") if isinstance(result, dict) else None
if not isinstance(recording, dict):
raise RuntimeError(f"unexpected MusicBrainz response for {recording_id}")
return recording
def fetch_cover_art(release_mbid: str, timeout: int = 60) -> Tuple[Optional[bytes], Optional[str], Optional[str]]:
url = f"https://coverartarchive.org/release/{release_mbid}/front"
headers = {"User-Agent": "moss-acoustid-mb-tagger/1.0"}
try:
resp = requests.get(url, headers=headers, timeout=timeout)
if resp.status_code != 200:
return None, None, f"cover art http {resp.status_code}"
content_type = resp.headers.get("Content-Type", "")
return resp.content, content_type, None
except Exception as exc:
return None, None, str(exc)
def _mime_to_apic_type(mime: str) -> str:
m = (mime or "").lower()
if "png" in m:
return "image/png"
if "jpeg" in m or "jpg" in m:
return "image/jpeg"
if "webp" in m:
return "image/webp"
return "image/jpeg"
def _extract_release_mbid(recording: Dict[str, Any]) -> str:
releases = recording.get("release-list") or recording.get("releases") or []
if isinstance(releases, list):
for release in releases:
if isinstance(release, dict):
mbid = release.get("id") or release.get("mbid")
if mbid:
return str(mbid)
return ""
def _set_text_frame(tag, frame_cls, value: str) -> None:
if value:
tag.add(frame_cls(encoding=3, text=value))
def _set_numeric_frame(tag, frame_cls, value: str) -> None:
if value:
tag.add(frame_cls(encoding=3, text=value))
def write_tags(path: Path, preview: MetadataPreview, cover_art_bytes: Optional[bytes] = None, cover_art_mime: str = "image/jpeg") -> Dict[str, Any]:
try:
from mutagen.id3 import APIC, ID3, TALB, TCON, TDRC, TPE1, TPE2, TIT2, TRCK, TPOS
from mutagen.mp3 import MP3
except Exception as exc: # pragma: no cover - dependency error path
return {"written": False, "error": f"missing dependency 'mutagen': {exc}"}
def _add_text(tag_list, frame):
if frame:
tag_list.add(frame)
try:
audio = MP3(str(path), ID3=ID3)
if audio.tags is None:
audio.add_tags()
audio.tags.delall("TIT2")
audio.tags.delall("TPE1")
audio.tags.delall("TPE2")
audio.tags.delall("TALB")
audio.tags.delall("TDRC")
audio.tags.delall("TRCK")
audio.tags.delall("TPOS")
audio.tags.delall("TCON")
audio.tags.delall("APIC")
if preview.title:
_add_text(audio.tags, TIT2(encoding=3, text=preview.title))
if preview.artist:
_add_text(audio.tags, TPE1(encoding=3, text=preview.artist))
if preview.album_artist:
_add_text(audio.tags, TPE2(encoding=3, text=preview.album_artist))
if preview.album:
_add_text(audio.tags, TALB(encoding=3, text=preview.album))
if preview.date:
_add_text(audio.tags, TDRC(encoding=3, text=preview.date))
if preview.track_number:
_add_text(audio.tags, TRCK(encoding=3, text=preview.track_number))
if preview.disc_number:
_add_text(audio.tags, TPOS(encoding=3, text=preview.disc_number))
if preview.genre:
_add_text(audio.tags, TCON(encoding=3, text=preview.genre))
if cover_art_bytes:
audio.tags.add(
APIC(
encoding=3,
mime=_mime_to_apic_type(cover_art_mime),
type=3,
desc="Front Cover",
data=cover_art_bytes,
)
)
audio.save()
return {"written": True, "error": None, "cover_art_embedded": bool(cover_art_bytes)}
except Exception as exc:
return {"written": False, "error": f"tag write failed: {exc}"}
def process_file(path: Path, recording_id: Optional[str], write_tags_flag: bool, cache: Dict[str, Dict[str, Any]], cover_cache: Dict[str, Dict[str, Any]]) -> FileResult:
if not recording_id:
return FileResult(
file=str(path),
musicbrainz_recording_id=None,
musicbrainz_recording=None,
metadata_preview=None,
cover_art=None,
tag_write={"written": False, "error": "missing musicbrainz_recording_id"},
error=None,
)
try:
if recording_id not in cache:
cache[recording_id] = fetch_recording(recording_id)
recording = cache[recording_id]
preview = build_metadata_preview(recording)
release_mbid = _extract_release_mbid(recording)
cover_art_info = {
"release_mbid": release_mbid,
"embedded": False,
"mime": None,
"source": None,
"error": None,
}
cover_art_bytes: Optional[bytes] = None
cover_art_mime = "image/jpeg"
if release_mbid:
if release_mbid not in cover_cache:
blob, mime, err = fetch_cover_art(release_mbid)
cover_cache[release_mbid] = {"blob": blob, "mime": mime, "error": err}
cached_cover = cover_cache[release_mbid]
cover_art_bytes = cached_cover.get("blob")
cover_art_mime = str(cached_cover.get("mime") or "image/jpeg")
if cover_art_bytes:
cover_art_info["mime"] = cover_art_mime
cover_art_info["source"] = f"https://coverartarchive.org/release/{release_mbid}/front"
else:
cover_art_info["error"] = cached_cover.get("error") or "cover art not found"
tag_write = {"written": False, "error": None}
if write_tags_flag:
tag_write = write_tags(path, preview, cover_art_bytes=cover_art_bytes, cover_art_mime=cover_art_mime)
if tag_write.get("written") and tag_write.get("cover_art_embedded"):
cover_art_info["embedded"] = True
return FileResult(
file=str(path),
musicbrainz_recording_id=recording_id,
musicbrainz_recording=recording,
metadata_preview=asdict(preview),
cover_art=cover_art_info,
tag_write=tag_write,
error=None,
)
except Exception as exc:
return FileResult(
file=str(path),
musicbrainz_recording_id=recording_id,
musicbrainz_recording=None,
metadata_preview=None,
cover_art=None,
tag_write={"written": False, "error": None},
error=str(exc),
)
def build_report(scan_payload: Dict[str, Any], results: Sequence[FileResult]) -> Dict[str, Any]:
return {
"directory": scan_payload.get("directory", ""),
"source_scan": scan_payload,
"files": [asdict(result) for result in results],
}
def main(argv: Optional[Sequence[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Resolve MusicBrainz recording details for MP3s using acoustid_album_scan.py output and optionally update ID3 tags.",
)
parser.add_argument("path", help="Directory containing MP3 files")
parser.add_argument("--scan-json", help="Optional JSON output produced by acoustid_album_scan.py. If omitted, the script runs acoustid_album_scan.py itself.")
parser.add_argument("--output", default="", help="Write the JSON report to this file")
parser.add_argument("--json", action="store_true", help="Print JSON to stdout")
parser.add_argument("--write-tags", action="store_true", help="Actually write MP3 tags; default is preview-only")
parser.add_argument("--no-recursive", action="store_true", help="Do not recurse into subdirectories")
parser.add_argument("--mb-delay", type=float, default=0.0, help="Delay in seconds between MusicBrainz requests")
args = parser.parse_args(argv)
root = Path(args.path).expanduser().resolve()
scan_json = Path(args.scan_json).expanduser().resolve() if args.scan_json else None
if not root.exists():
print(f"Path not found: {root}", file=sys.stderr)
return 2
if scan_json is not None:
if not scan_json.exists():
print(f"Scan JSON not found: {scan_json}", file=sys.stderr)
return 2
scan_payload = json.loads(scan_json.read_text(encoding="utf-8"))
else:
scan_payload = run_album_scan(root, recursive=not args.no_recursive)
id_map = build_recording_id_map(scan_payload)
files = iter_mp3_files(root, recursive=not args.no_recursive)
cache: Dict[str, Dict[str, Any]] = {}
cover_cache: Dict[str, Dict[str, Any]] = {}
results: List[FileResult] = []
for index, path in enumerate(files, start=1):
print(f"[{index}/{len(files)}] {path}", file=sys.stderr)
recording_id = id_map.get(str(path))
result = process_file(path, recording_id, args.write_tags, cache, cover_cache)
results.append(result)
if args.mb_delay > 0 and index < len(files):
time.sleep(args.mb_delay)
report = build_report(scan_payload, results)
if args.output:
Path(args.output).expanduser().resolve().write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
rendered = json.dumps(report, ensure_ascii=False, indent=2)
if args.json or not args.output:
print(rendered)
return 0
if __name__ == "__main__":
raise SystemExit(main())