feat: add acoustid album scan and navidrome-friendly musicbrainz tagger

This commit is contained in:
ishenwei
2026-05-22 17:12:01 +08:00
parent 67289cff81
commit 974ada1ced
2 changed files with 954 additions and 0 deletions

467
acoustid_album_scan.py Normal file
View File

@@ -0,0 +1,467 @@
#!/usr/bin/env python3
"""Scan a directory of MP3s with AcoustID only.
This tool fingerprints every MP3 in a directory, queries AcoustID for each
track, and prints the per-track identification evidence plus an aggregate
directory-level candidate summary.
It intentionally does not call MusicBrainz. The output is meant to be reviewed
as a batch so a downstream AI or human can decide which release the directory
belongs to.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter, defaultdict
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
import requests
ACOUSTID_CLIENT_ID = "JIvtbG79eAg"
ACOUSTID_LOOKUP_URL = "https://api.acoustid.org/v2/lookup"
DEFAULT_META = "recordings releasegroups compress"
DEFAULT_MAXLEN = 120
@dataclass
class Candidate:
recording_id: str
title: str
artists: Tuple[str, ...]
releases: Tuple[str, ...]
score: float
@property
def artist_key(self) -> str:
return " / ".join(self.artists) if self.artists else ""
@property
def release_key(self) -> str:
return " / ".join(self.releases) if self.releases else ""
@property
def pair_key(self) -> Tuple[str, str]:
return (self.artist_key, self.release_key)
@dataclass
class TrackResult:
file: str
duration: Optional[float]
fingerprint: Optional[str]
fingerprint_error: Optional[str]
lookup_error: Optional[str]
candidates: List[Dict[str, Any]]
best_guess: Optional[Dict[str, Any]]
ambiguous: bool
def iter_mp3_files(root: Path, recursive: bool = True) -> List[Path]:
if root.is_file():
return [root] if root.suffix.lower() == ".mp3" else []
if recursive:
return sorted(p for p in root.rglob("*.mp3") if p.is_file())
return sorted(p for p in root.glob("*.mp3") if p.is_file())
def normalize_fingerprint(fp: Any) -> Optional[str]:
if fp is None:
return None
if isinstance(fp, bytes):
return fp.decode("ascii", errors="ignore")
return str(fp)
def fingerprint_mp3(path: Path, maxlength: int) -> Tuple[Optional[float], Optional[str], Optional[str]]:
try:
import acoustid
except Exception as exc: # pragma: no cover - dependency error path
return None, None, f"missing dependency 'acoustid': {exc}"
try:
duration, fingerprint = acoustid.fingerprint_file(str(path), maxlength=maxlength)
return float(duration) if duration is not None else None, normalize_fingerprint(fingerprint), None
except Exception as exc:
return None, None, f"fingerprint failed: {exc}"
def acoustid_lookup(duration: float, fingerprint: str, retries: int = 2, timeout: int = 60) -> Dict[str, Any]:
params = {
"client": ACOUSTID_CLIENT_ID,
"meta": DEFAULT_META,
"duration": int(round(duration)),
"fingerprint": fingerprint,
}
last_error: Optional[Exception] = None
for attempt in range(retries + 1):
try:
resp = requests.get(ACOUSTID_LOOKUP_URL, params=params, timeout=timeout)
resp.raise_for_status()
payload = resp.json()
if payload.get("status") != "ok":
raise RuntimeError(f"AcoustID status not ok: {payload}")
return payload
except Exception as exc:
last_error = exc
if attempt < retries:
time.sleep(1.5 * (attempt + 1))
continue
raise RuntimeError(f"AcoustID lookup failed: {last_error}") from exc
raise RuntimeError(f"AcoustID lookup failed: {last_error}")
def _names_from_people(items: Any) -> List[str]:
names: List[str] = []
if isinstance(items, list):
for item in items:
if isinstance(item, str):
if item.strip():
names.append(item.strip())
elif isinstance(item, dict):
name = item.get("name") or item.get("artist", {}).get("name")
if name and str(name).strip():
names.append(str(name).strip())
elif isinstance(items, dict):
name = items.get("name")
if name:
names.append(str(name).strip())
return names
def _recording_artists(recording: Dict[str, Any]) -> Tuple[str, ...]:
artists = _names_from_people(recording.get("artists"))
if not artists:
artists = _names_from_people(recording.get("artist-credit"))
if not artists:
artist = recording.get("artist", {})
if isinstance(artist, dict) and artist.get("name"):
artists = [str(artist["name"]).strip()]
return tuple(dict.fromkeys(a for a in artists if a))
def _recording_releases(recording: Dict[str, Any]) -> Tuple[str, ...]:
titles: List[str] = []
for key in ("releasegroups", "releasegroups", "release-group", "release-list", "releases"):
values = recording.get(key)
if isinstance(values, list):
for item in values:
if isinstance(item, dict):
title = item.get("title") or item.get("name")
if title and str(title).strip():
titles.append(str(title).strip())
elif isinstance(item, str) and item.strip():
titles.append(item.strip())
elif isinstance(values, dict):
title = values.get("title") or values.get("name")
if title and str(title).strip():
titles.append(str(title).strip())
return tuple(dict.fromkeys(titles))
def extract_candidates(payload: Dict[str, Any]) -> List[Candidate]:
candidates: Dict[Tuple[str, str, Tuple[str, ...], Tuple[str, ...]], Candidate] = {}
for result in payload.get("results", []):
try:
score = float(result.get("score", 0.0))
except Exception:
score = 0.0
for recording in result.get("recordings", []):
if not isinstance(recording, dict):
continue
recording_id = str(recording.get("id") or recording.get("mbid") or "")
title = str(recording.get("title") or "").strip()
artists = _recording_artists(recording)
releases = _recording_releases(recording)
key = (recording_id, title, artists, releases)
existing = candidates.get(key)
if existing is None or score > existing.score:
candidates[key] = Candidate(
recording_id=recording_id,
title=title,
artists=artists,
releases=releases,
score=score,
)
return sorted(candidates.values(), key=lambda c: c.score, reverse=True)
def candidate_to_dict(candidate: Candidate) -> Dict[str, Any]:
return {
"musicbrainz_recording_id": candidate.recording_id,
"title": candidate.title,
"artists": list(candidate.artists),
"releases": list(candidate.releases),
"artist": candidate.artist_key,
"release": candidate.release_key,
"score": candidate.score,
}
def choose_best_guess(candidates: Sequence[Candidate]) -> Optional[Dict[str, Any]]:
if not candidates:
return None
return candidate_to_dict(candidates[0])
def track_is_ambiguous(candidates: Sequence[Candidate]) -> bool:
if len(candidates) <= 1:
return False
artists = {c.artist_key for c in candidates if c.artist_key}
releases = {c.release_key for c in candidates if c.release_key}
return len(artists) > 1 or len(releases) > 1
def summarize_album(tracks: Sequence[TrackResult]) -> Dict[str, Any]:
pair_stats: Dict[Tuple[str, str], Dict[str, Any]] = defaultdict(lambda: {"tracks": 0, "score_sum": 0.0, "files": []})
artist_counter: Counter[str] = Counter()
release_counter: Counter[str] = Counter()
ambiguous_files: List[str] = []
for track in tracks:
if track.ambiguous:
ambiguous_files.append(track.file)
if not track.best_guess:
continue
artist = str(track.best_guess.get("artist") or "")
release = str(track.best_guess.get("release") or "")
score = float(track.best_guess.get("score") or 0.0)
key = (artist, release)
pair_stats[key]["tracks"] += 1
pair_stats[key]["score_sum"] += score
pair_stats[key]["files"].append(track.file)
if artist:
artist_counter[artist] += 1
if release:
release_counter[release] += 1
ranked_pairs = sorted(
(
{
"artist": artist,
"release": release,
"tracks": stats["tracks"],
"score_sum": round(stats["score_sum"], 4),
"files": stats["files"],
}
for (artist, release), stats in pair_stats.items()
),
key=lambda item: (item["tracks"], item["score_sum"]),
reverse=True,
)
majority_pair = ranked_pairs[0] if ranked_pairs else None
majority_track_count = majority_pair["tracks"] if majority_pair else 0
return {
"track_count": len(tracks),
"ambiguous_track_count": len(ambiguous_files),
"ambiguous_files": ambiguous_files,
"top_artist_candidates": artist_counter.most_common(10),
"top_release_candidates": release_counter.most_common(10),
"candidate_pairs": ranked_pairs,
"majority_pair": majority_pair,
"majority_track_count": majority_track_count,
}
def choose_majority_album(summary: Dict[str, Any]) -> Tuple[str, str]:
pair = summary.get("majority_pair") or {}
return str(pair.get("artist") or ""), str(pair.get("release") or "")
def _match_candidate_to_album(track: TrackResult, artist: str, release: str) -> Optional[str]:
for candidate in track.candidates:
if str(candidate.get("artist") or "") == artist and str(candidate.get("release") or "") == release:
return str(candidate.get("musicbrainz_recording_id") or "") or None
return None
def resolve_majority_recording_ids(tracks: Sequence[TrackResult], summary: Dict[str, Any]) -> Dict[str, Any]:
artist, release = choose_majority_album(summary)
selected_tracks: List[Dict[str, Any]] = []
unresolved_tracks: List[str] = []
for track in tracks:
recording_id = _match_candidate_to_album(track, artist, release)
if recording_id is None and track.best_guess:
bg_artist = str(track.best_guess.get("artist") or "")
bg_release = str(track.best_guess.get("release") or "")
if bg_artist == artist and bg_release == release:
recording_id = str(track.best_guess.get("musicbrainz_recording_id") or "") or None
if recording_id:
selected_tracks.append({"file": track.file, "musicbrainz_recording_id": recording_id})
else:
unresolved_tracks.append(track.file)
return {
"artist": artist,
"release": release,
"tracks": selected_tracks,
"track_count": len(selected_tracks),
"unresolved_files": unresolved_tracks,
}
def scan_directory(directory: Path, recursive: bool, maxlength: int, delay: float) -> Dict[str, Any]:
files = iter_mp3_files(directory, recursive=recursive)
results: List[TrackResult] = []
if not files:
return {
"directory": str(directory),
"tracks": [],
"album_summary": {
"track_count": 0,
"ambiguous_track_count": 0,
"ambiguous_files": [],
"top_artist_candidates": [],
"top_release_candidates": [],
"candidate_pairs": [],
"majority_pair": None,
"majority_track_count": 0,
},
"majority_album": {"artist": "", "release": "", "tracks": [], "track_count": 0, "unresolved_files": []},
}
for index, path in enumerate(files, start=1):
print(f"[{index}/{len(files)}] {path}", file=sys.stderr)
duration, fingerprint, fp_error = fingerprint_mp3(path, maxlength=maxlength)
lookup_error = None
candidates: List[Candidate] = []
if fingerprint and duration is not None:
try:
payload = acoustid_lookup(duration=duration, fingerprint=fingerprint)
candidates = extract_candidates(payload)
except Exception as exc:
lookup_error = str(exc)
else:
lookup_error = fp_error or "missing fingerprint"
track = TrackResult(
file=str(path),
duration=duration,
fingerprint=fingerprint,
fingerprint_error=fp_error,
lookup_error=lookup_error,
candidates=[candidate_to_dict(c) for c in candidates],
best_guess=choose_best_guess(candidates),
ambiguous=track_is_ambiguous(candidates),
)
results.append(track)
if delay > 0 and index < len(files):
time.sleep(delay)
summary = summarize_album(results)
majority_album = resolve_majority_recording_ids(results, summary)
return {
"directory": str(directory),
"tracks": [asdict(track) for track in results],
"album_summary": summary,
"majority_album": majority_album,
}
def format_human_report(payload: Dict[str, Any]) -> str:
lines: List[str] = []
lines.append(f"Directory: {payload['directory']}")
lines.append(f"Tracks: {len(payload.get('tracks', []))}")
summary = payload.get("album_summary", {})
lines.append(f"Ambiguous tracks: {summary.get('ambiguous_track_count', 0)}")
majority = payload.get("majority_album", {})
lines.append(
f"Majority album: artist={majority.get('artist') or '-'} | release={majority.get('release') or '-'} | matched_tracks={majority.get('track_count', 0)}"
)
lines.append("")
for i, track in enumerate(payload.get("tracks", []), start=1):
lines.append(f"{i}. {track['file']}")
lines.append(f" duration: {track.get('duration')}")
if track.get("fingerprint_error"):
lines.append(f" fingerprint_error: {track['fingerprint_error']}")
if track.get("lookup_error"):
lines.append(f" lookup_error: {track['lookup_error']}")
if track.get("best_guess"):
bg = track["best_guess"]
lines.append(
f" best_guess: artist={bg.get('artist') or '-'} | release={bg.get('release') or '-'} | title={bg.get('title') or '-'} | score={bg.get('score')}"
)
else:
lines.append(" best_guess: -")
if track.get("ambiguous"):
lines.append(" ambiguous: yes")
if track.get("candidates"):
lines.append(" candidates:")
for cand in track["candidates"]:
lines.append(
f" - score={cand.get('score')} | artist={cand.get('artist') or '-'} | release={cand.get('release') or '-'} | title={cand.get('title') or '-'} | musicbrainz_recording_id={cand.get('musicbrainz_recording_id') or '-'}"
)
lines.append("")
lines.append("Album-level candidate pairs:")
for cand in summary.get("candidate_pairs", []):
lines.append(
f"- tracks={cand['tracks']} | score_sum={cand['score_sum']} | artist={cand['artist'] or '-'} | release={cand['release'] or '-'}"
)
if not summary.get("candidate_pairs"):
lines.append("- none")
lines.append("")
lines.append("Majority album recording IDs:")
majority_tracks = majority.get("tracks", [])
if majority_tracks:
for item in majority_tracks:
lines.append(f"- {item['file']} -> {item['recording_id']}")
else:
lines.append("- none")
unresolved = majority.get("unresolved_files", [])
if unresolved:
lines.append("Unresolved files:")
for file in unresolved:
lines.append(f"- {file}")
return "\n".join(lines)
def main(argv: Optional[Sequence[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Fingerprint a directory of MP3s with AcoustID and aggregate candidate releases/artists without calling MusicBrainz.",
)
parser.add_argument("path", help="MP3 file or directory to scan")
parser.add_argument("--no-recursive", action="store_true", help="Do not recurse into subdirectories")
parser.add_argument("--maxlength", type=int, default=DEFAULT_MAXLEN, help="Maximum audio length passed to fingerprinting")
parser.add_argument("--delay", type=float, default=0.0, help="Delay in seconds between AcoustID lookups")
parser.add_argument("--json", action="store_true", help="Print JSON output instead of human-readable text")
parser.add_argument("--output", type=str, default="", help="Write JSON output to a file")
args = parser.parse_args(argv)
root = Path(args.path).expanduser().resolve()
if not root.exists():
print(f"Path not found: {root}", file=sys.stderr)
return 2
payload = scan_directory(root, recursive=not args.no_recursive, maxlength=args.maxlength, delay=args.delay)
if args.output:
Path(args.output).expanduser().resolve().write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
if args.json:
print(json.dumps(payload, ensure_ascii=False, indent=2))
else:
print(format_human_report(payload))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,487 @@
#!/usr/bin/env python3
"""Enrich MP3s using MusicBrainz recording IDs from acoustid_album_scan.py.
Usage overview:
- Default: run acoustid_album_scan.py internally, read its JSON from stdout,
resolve each MP3's musicbrainz_recording_id, fetch MusicBrainz recording
details, and print a JSON report.
- Preview only: do not pass --write-tags. The script will fetch metadata and
show the tag preview without modifying files.
- Write tags: pass --write-tags to write title/artist/album plus Navidrome-friendly mapped tags (album artist/date/track/disc/genre) and embed album cover art into each MP3.
- Use existing scan JSON: pass --scan-json /path/to/scan.json if you already
have a saved acoustid_album_scan.py result and want to skip re-scanning.
- Control recursion: pass --no-recursive to limit processing to the top-level
directory only.
- Control pacing: pass --mb-delay <seconds> to slow down MusicBrainz requests.
- Save report: pass --output /path/to/report.json to write the final JSON to a
file.
- Stdout JSON: pass --json to print the final JSON report to stdout.
Examples:
- python acoustid_musicbrainz_tagger.py /path/to/album
- python acoustid_musicbrainz_tagger.py /path/to/album --write-tags
- python acoustid_musicbrainz_tagger.py /path/to/album --scan-json /tmp/scan.json --json
- python acoustid_musicbrainz_tagger.py /path/to/album --output report.json
This script is intentionally designed around temporary scan output: if
--scan-json is omitted, it invokes acoustid_album_scan.py directly and keeps the
scan JSON in memory rather than requiring a permanent intermediate file.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import subprocess
import requests
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
def iter_mp3_files(root: Path, recursive: bool = True) -> List[Path]:
if root.is_file():
return [root] if root.suffix.lower() == ".mp3" else []
if recursive:
return sorted(p for p in root.rglob("*.mp3") if p.is_file())
return sorted(p for p in root.glob("*.mp3") if p.is_file())
def run_album_scan(root: Path, recursive: bool = True) -> Dict[str, Any]:
scan_script = Path(__file__).with_name("acoustid_album_scan.py")
if not scan_script.exists():
raise RuntimeError(f"acoustid_album_scan.py not found next to this script: {scan_script}")
cmd = [sys.executable, str(scan_script), str(root), "--json"]
if not recursive:
cmd.append("--no-recursive")
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(
f"acoustid_album_scan.py failed with exit code {proc.returncode}: {proc.stderr.strip()}"
)
try:
return json.loads(proc.stdout)
except Exception as exc:
raise RuntimeError(f"failed to parse acoustid_album_scan.py JSON output: {exc}") from exc
def build_recording_id_map(scan_payload: Dict[str, Any]) -> Dict[str, str]:
mapping: Dict[str, str] = {}
majority = scan_payload.get("majority_album", {}) or {}
for item in majority.get("tracks", []) or []:
file_path = str(item.get("file") or "")
recording_id = str(item.get("musicbrainz_recording_id") or "")
if file_path and recording_id:
mapping[file_path] = recording_id
for track in scan_payload.get("tracks", []) or []:
file_path = str(track.get("file") or "")
best_guess = track.get("best_guess") or {}
recording_id = str(best_guess.get("musicbrainz_recording_id") or "")
if file_path and recording_id and file_path not in mapping:
mapping[file_path] = recording_id
return mapping
@dataclass
class MetadataPreview:
title: str
artist: str
album_artist: str
album: str
date: str
track_number: str
disc_number: str
genre: str
@dataclass
class FileResult:
file: str
musicbrainz_recording_id: Optional[str]
musicbrainz_recording: Optional[Dict[str, Any]]
metadata_preview: Optional[Dict[str, str]]
cover_art: Optional[Dict[str, Any]]
tag_write: Dict[str, Any]
error: Optional[str]
def _extract_artist_names(recording: Dict[str, Any]) -> List[str]:
names: List[str] = []
artist_credit = recording.get("artist-credit") or recording.get("artist_credit") or []
if isinstance(artist_credit, list):
for part in artist_credit:
if isinstance(part, dict):
artist = part.get("artist") or {}
if isinstance(artist, dict):
name = artist.get("name") or artist.get("sort-name")
if name:
names.append(str(name))
elif isinstance(part, str) and part.strip():
names.append(part.strip())
if not names:
artists = recording.get("artist-list") or recording.get("artists") or []
if isinstance(artists, list):
for artist in artists:
if isinstance(artist, dict):
name = artist.get("name") or artist.get("sort-name")
if name:
names.append(str(name))
# Deduplicate while preserving order.
return list(dict.fromkeys(n for n in names if n))
def _extract_album_title(recording: Dict[str, Any]) -> str:
releases = recording.get("release-list") or recording.get("releases") or []
if isinstance(releases, list):
for release in releases:
if isinstance(release, dict):
title = release.get("title")
if title:
return str(title)
return ""
def _extract_album_artist(recording: Dict[str, Any]) -> str:
release_list = recording.get("release-list") or recording.get("releases") or []
if isinstance(release_list, list):
for release in release_list:
if isinstance(release, dict):
credit = release.get("artist-credit") or release.get("artist_credit") or []
if isinstance(credit, list) and credit:
names: List[str] = []
for part in credit:
if isinstance(part, dict):
artist = part.get("artist") or {}
if isinstance(artist, dict):
name = artist.get("name") or artist.get("sort-name")
if name:
names.append(str(name))
elif isinstance(part, str) and part.strip():
names.append(part.strip())
if names:
return " / ".join(dict.fromkeys(names))
artist_name = release.get("artist-credit-name") or release.get("artist")
if isinstance(artist_name, dict):
artist_name = artist_name.get("name")
if artist_name:
return str(artist_name)
return ""
def _extract_release_info(recording: Dict[str, Any]) -> Dict[str, str]:
release_list = recording.get("release-list") or recording.get("releases") or []
if not isinstance(release_list, list):
return {}
for release in release_list:
if not isinstance(release, dict):
continue
data = {
"album": str(release.get("title") or ""),
"date": str(release.get("date") or release.get("first-release-date") or ""),
"track_number": "",
"disc_number": "",
"genre": "",
}
if release.get("medium-list"):
medium_list = release.get("medium-list") or []
if isinstance(medium_list, list) and medium_list:
medium = medium_list[0]
if isinstance(medium, dict):
data["disc_number"] = str(medium.get("position") or "")
track_list = medium.get("track-list") or []
if isinstance(track_list, list) and track_list:
track = track_list[0]
if isinstance(track, dict):
data["track_number"] = str(track.get("position") or "")
release_group = release.get("release-group") or {}
if isinstance(release_group, dict):
genres = release_group.get("genre-list") or release_group.get("genres") or []
if isinstance(genres, list) and genres:
first = genres[0]
if isinstance(first, dict):
data["genre"] = str(first.get("name") or first.get("genre") or "")
elif isinstance(first, str):
data["genre"] = first
return data
return {}
def build_metadata_preview(recording: Dict[str, Any]) -> MetadataPreview:
title = str(recording.get("title") or "")
artists = _extract_artist_names(recording)
artist = " / ".join(artists)
album_artist = _extract_album_artist(recording)
if not album_artist:
album_artist = artist
release_info = _extract_release_info(recording)
album = release_info.get("album") or _extract_album_title(recording)
return MetadataPreview(
title=title,
artist=artist,
album_artist=album_artist,
album=album,
date=release_info.get("date", ""),
track_number=release_info.get("track_number", ""),
disc_number=release_info.get("disc_number", ""),
genre=release_info.get("genre", ""),
)
def fetch_recording(recording_id: str) -> Dict[str, Any]:
try:
import musicbrainzngs
except Exception as exc: # pragma: no cover - dependency error path
raise RuntimeError(f"missing dependency 'musicbrainzngs': {exc}") from exc
musicbrainzngs.set_useragent("moss-acoustid-mb-tagger", "1.0", "shenwei@example.com")
result = musicbrainzngs.get_recording_by_id(
recording_id,
includes=["artists", "releases", "artist-credits"],
)
recording = result.get("recording") if isinstance(result, dict) else None
if not isinstance(recording, dict):
raise RuntimeError(f"unexpected MusicBrainz response for {recording_id}")
return recording
def fetch_cover_art(release_mbid: str, timeout: int = 60) -> Tuple[Optional[bytes], Optional[str], Optional[str]]:
url = f"https://coverartarchive.org/release/{release_mbid}/front"
headers = {"User-Agent": "moss-acoustid-mb-tagger/1.0"}
try:
resp = requests.get(url, headers=headers, timeout=timeout)
if resp.status_code != 200:
return None, None, f"cover art http {resp.status_code}"
content_type = resp.headers.get("Content-Type", "")
return resp.content, content_type, None
except Exception as exc:
return None, None, str(exc)
def _mime_to_apic_type(mime: str) -> str:
m = (mime or "").lower()
if "png" in m:
return "image/png"
if "jpeg" in m or "jpg" in m:
return "image/jpeg"
if "webp" in m:
return "image/webp"
return "image/jpeg"
def _extract_release_mbid(recording: Dict[str, Any]) -> str:
releases = recording.get("release-list") or recording.get("releases") or []
if isinstance(releases, list):
for release in releases:
if isinstance(release, dict):
mbid = release.get("id") or release.get("mbid")
if mbid:
return str(mbid)
return ""
def _set_text_frame(tag, frame_cls, value: str) -> None:
if value:
tag.add(frame_cls(encoding=3, text=value))
def _set_numeric_frame(tag, frame_cls, value: str) -> None:
if value:
tag.add(frame_cls(encoding=3, text=value))
def write_tags(path: Path, preview: MetadataPreview, cover_art_bytes: Optional[bytes] = None, cover_art_mime: str = "image/jpeg") -> Dict[str, Any]:
try:
from mutagen.id3 import APIC, ID3, TALB, TCON, TDRC, TPE1, TPE2, TIT2, TRCK, TPOS
from mutagen.mp3 import MP3
except Exception as exc: # pragma: no cover - dependency error path
return {"written": False, "error": f"missing dependency 'mutagen': {exc}"}
def _add_text(tag_list, frame):
if frame:
tag_list.add(frame)
try:
audio = MP3(str(path), ID3=ID3)
if audio.tags is None:
audio.add_tags()
audio.tags.delall("TIT2")
audio.tags.delall("TPE1")
audio.tags.delall("TPE2")
audio.tags.delall("TALB")
audio.tags.delall("TDRC")
audio.tags.delall("TRCK")
audio.tags.delall("TPOS")
audio.tags.delall("TCON")
audio.tags.delall("APIC")
if preview.title:
_add_text(audio.tags, TIT2(encoding=3, text=preview.title))
if preview.artist:
_add_text(audio.tags, TPE1(encoding=3, text=preview.artist))
if preview.album_artist:
_add_text(audio.tags, TPE2(encoding=3, text=preview.album_artist))
if preview.album:
_add_text(audio.tags, TALB(encoding=3, text=preview.album))
if preview.date:
_add_text(audio.tags, TDRC(encoding=3, text=preview.date))
if preview.track_number:
_add_text(audio.tags, TRCK(encoding=3, text=preview.track_number))
if preview.disc_number:
_add_text(audio.tags, TPOS(encoding=3, text=preview.disc_number))
if preview.genre:
_add_text(audio.tags, TCON(encoding=3, text=preview.genre))
if cover_art_bytes:
audio.tags.add(
APIC(
encoding=3,
mime=_mime_to_apic_type(cover_art_mime),
type=3,
desc="Front Cover",
data=cover_art_bytes,
)
)
audio.save()
return {"written": True, "error": None, "cover_art_embedded": bool(cover_art_bytes)}
except Exception as exc:
return {"written": False, "error": f"tag write failed: {exc}"}
def process_file(path: Path, recording_id: Optional[str], write_tags_flag: bool, cache: Dict[str, Dict[str, Any]], cover_cache: Dict[str, Dict[str, Any]]) -> FileResult:
if not recording_id:
return FileResult(
file=str(path),
musicbrainz_recording_id=None,
musicbrainz_recording=None,
metadata_preview=None,
cover_art=None,
tag_write={"written": False, "error": "missing musicbrainz_recording_id"},
error=None,
)
try:
if recording_id not in cache:
cache[recording_id] = fetch_recording(recording_id)
recording = cache[recording_id]
preview = build_metadata_preview(recording)
release_mbid = _extract_release_mbid(recording)
cover_art_info = {
"release_mbid": release_mbid,
"embedded": False,
"mime": None,
"source": None,
"error": None,
}
cover_art_bytes: Optional[bytes] = None
cover_art_mime = "image/jpeg"
if release_mbid:
if release_mbid not in cover_cache:
blob, mime, err = fetch_cover_art(release_mbid)
cover_cache[release_mbid] = {"blob": blob, "mime": mime, "error": err}
cached_cover = cover_cache[release_mbid]
cover_art_bytes = cached_cover.get("blob")
cover_art_mime = str(cached_cover.get("mime") or "image/jpeg")
if cover_art_bytes:
cover_art_info["mime"] = cover_art_mime
cover_art_info["source"] = f"https://coverartarchive.org/release/{release_mbid}/front"
else:
cover_art_info["error"] = cached_cover.get("error") or "cover art not found"
tag_write = {"written": False, "error": None}
if write_tags_flag:
tag_write = write_tags(path, preview, cover_art_bytes=cover_art_bytes, cover_art_mime=cover_art_mime)
if tag_write.get("written") and tag_write.get("cover_art_embedded"):
cover_art_info["embedded"] = True
return FileResult(
file=str(path),
musicbrainz_recording_id=recording_id,
musicbrainz_recording=recording,
metadata_preview=asdict(preview),
cover_art=cover_art_info,
tag_write=tag_write,
error=None,
)
except Exception as exc:
return FileResult(
file=str(path),
musicbrainz_recording_id=recording_id,
musicbrainz_recording=None,
metadata_preview=None,
cover_art=None,
tag_write={"written": False, "error": None},
error=str(exc),
)
def build_report(scan_payload: Dict[str, Any], results: Sequence[FileResult]) -> Dict[str, Any]:
return {
"directory": scan_payload.get("directory", ""),
"source_scan": scan_payload,
"files": [asdict(result) for result in results],
}
def main(argv: Optional[Sequence[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Resolve MusicBrainz recording details for MP3s using acoustid_album_scan.py output and optionally update ID3 tags.",
)
parser.add_argument("path", help="Directory containing MP3 files")
parser.add_argument("--scan-json", help="Optional JSON output produced by acoustid_album_scan.py. If omitted, the script runs acoustid_album_scan.py itself.")
parser.add_argument("--output", default="", help="Write the JSON report to this file")
parser.add_argument("--json", action="store_true", help="Print JSON to stdout")
parser.add_argument("--write-tags", action="store_true", help="Actually write MP3 tags; default is preview-only")
parser.add_argument("--no-recursive", action="store_true", help="Do not recurse into subdirectories")
parser.add_argument("--mb-delay", type=float, default=0.0, help="Delay in seconds between MusicBrainz requests")
args = parser.parse_args(argv)
root = Path(args.path).expanduser().resolve()
scan_json = Path(args.scan_json).expanduser().resolve() if args.scan_json else None
if not root.exists():
print(f"Path not found: {root}", file=sys.stderr)
return 2
if scan_json is not None:
if not scan_json.exists():
print(f"Scan JSON not found: {scan_json}", file=sys.stderr)
return 2
scan_payload = json.loads(scan_json.read_text(encoding="utf-8"))
else:
scan_payload = run_album_scan(root, recursive=not args.no_recursive)
id_map = build_recording_id_map(scan_payload)
files = iter_mp3_files(root, recursive=not args.no_recursive)
cache: Dict[str, Dict[str, Any]] = {}
cover_cache: Dict[str, Dict[str, Any]] = {}
results: List[FileResult] = []
for index, path in enumerate(files, start=1):
print(f"[{index}/{len(files)}] {path}", file=sys.stderr)
recording_id = id_map.get(str(path))
result = process_file(path, recording_id, args.write_tags, cache, cover_cache)
results.append(result)
if args.mb_delay > 0 and index < len(files):
time.sleep(args.mb_delay)
report = build_report(scan_payload, results)
if args.output:
Path(args.output).expanduser().resolve().write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
rendered = json.dumps(report, ensure_ascii=False, indent=2)
if args.json or not args.output:
print(rendered)
return 0
if __name__ == "__main__":
raise SystemExit(main())