Files
moss/acoustid_album_scan.py

468 lines
17 KiB
Python

#!/usr/bin/env python3
"""Scan a directory of MP3s with AcoustID only.
This tool fingerprints every MP3 in a directory, queries AcoustID for each
track, and prints the per-track identification evidence plus an aggregate
directory-level candidate summary.
It intentionally does not call MusicBrainz. The output is meant to be reviewed
as a batch so a downstream AI or human can decide which release the directory
belongs to.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter, defaultdict
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
import requests
ACOUSTID_CLIENT_ID = "JIvtbG79eAg"
ACOUSTID_LOOKUP_URL = "https://api.acoustid.org/v2/lookup"
DEFAULT_META = "recordings releasegroups compress"
DEFAULT_MAXLEN = 120
@dataclass
class Candidate:
recording_id: str
title: str
artists: Tuple[str, ...]
releases: Tuple[str, ...]
score: float
@property
def artist_key(self) -> str:
return " / ".join(self.artists) if self.artists else ""
@property
def release_key(self) -> str:
return " / ".join(self.releases) if self.releases else ""
@property
def pair_key(self) -> Tuple[str, str]:
return (self.artist_key, self.release_key)
@dataclass
class TrackResult:
file: str
duration: Optional[float]
fingerprint: Optional[str]
fingerprint_error: Optional[str]
lookup_error: Optional[str]
candidates: List[Dict[str, Any]]
best_guess: Optional[Dict[str, Any]]
ambiguous: bool
def iter_mp3_files(root: Path, recursive: bool = True) -> List[Path]:
if root.is_file():
return [root] if root.suffix.lower() == ".mp3" else []
if recursive:
return sorted(p for p in root.rglob("*.mp3") if p.is_file())
return sorted(p for p in root.glob("*.mp3") if p.is_file())
def normalize_fingerprint(fp: Any) -> Optional[str]:
if fp is None:
return None
if isinstance(fp, bytes):
return fp.decode("ascii", errors="ignore")
return str(fp)
def fingerprint_mp3(path: Path, maxlength: int) -> Tuple[Optional[float], Optional[str], Optional[str]]:
try:
import acoustid
except Exception as exc: # pragma: no cover - dependency error path
return None, None, f"missing dependency 'acoustid': {exc}"
try:
duration, fingerprint = acoustid.fingerprint_file(str(path), maxlength=maxlength)
return float(duration) if duration is not None else None, normalize_fingerprint(fingerprint), None
except Exception as exc:
return None, None, f"fingerprint failed: {exc}"
def acoustid_lookup(duration: float, fingerprint: str, retries: int = 2, timeout: int = 60) -> Dict[str, Any]:
params = {
"client": ACOUSTID_CLIENT_ID,
"meta": DEFAULT_META,
"duration": int(round(duration)),
"fingerprint": fingerprint,
}
last_error: Optional[Exception] = None
for attempt in range(retries + 1):
try:
resp = requests.get(ACOUSTID_LOOKUP_URL, params=params, timeout=timeout)
resp.raise_for_status()
payload = resp.json()
if payload.get("status") != "ok":
raise RuntimeError(f"AcoustID status not ok: {payload}")
return payload
except Exception as exc:
last_error = exc
if attempt < retries:
time.sleep(1.5 * (attempt + 1))
continue
raise RuntimeError(f"AcoustID lookup failed: {last_error}") from exc
raise RuntimeError(f"AcoustID lookup failed: {last_error}")
def _names_from_people(items: Any) -> List[str]:
names: List[str] = []
if isinstance(items, list):
for item in items:
if isinstance(item, str):
if item.strip():
names.append(item.strip())
elif isinstance(item, dict):
name = item.get("name") or item.get("artist", {}).get("name")
if name and str(name).strip():
names.append(str(name).strip())
elif isinstance(items, dict):
name = items.get("name")
if name:
names.append(str(name).strip())
return names
def _recording_artists(recording: Dict[str, Any]) -> Tuple[str, ...]:
artists = _names_from_people(recording.get("artists"))
if not artists:
artists = _names_from_people(recording.get("artist-credit"))
if not artists:
artist = recording.get("artist", {})
if isinstance(artist, dict) and artist.get("name"):
artists = [str(artist["name"]).strip()]
return tuple(dict.fromkeys(a for a in artists if a))
def _recording_releases(recording: Dict[str, Any]) -> Tuple[str, ...]:
titles: List[str] = []
for key in ("releasegroups", "releasegroups", "release-group", "release-list", "releases"):
values = recording.get(key)
if isinstance(values, list):
for item in values:
if isinstance(item, dict):
title = item.get("title") or item.get("name")
if title and str(title).strip():
titles.append(str(title).strip())
elif isinstance(item, str) and item.strip():
titles.append(item.strip())
elif isinstance(values, dict):
title = values.get("title") or values.get("name")
if title and str(title).strip():
titles.append(str(title).strip())
return tuple(dict.fromkeys(titles))
def extract_candidates(payload: Dict[str, Any]) -> List[Candidate]:
candidates: Dict[Tuple[str, str, Tuple[str, ...], Tuple[str, ...]], Candidate] = {}
for result in payload.get("results", []):
try:
score = float(result.get("score", 0.0))
except Exception:
score = 0.0
for recording in result.get("recordings", []):
if not isinstance(recording, dict):
continue
recording_id = str(recording.get("id") or recording.get("mbid") or "")
title = str(recording.get("title") or "").strip()
artists = _recording_artists(recording)
releases = _recording_releases(recording)
key = (recording_id, title, artists, releases)
existing = candidates.get(key)
if existing is None or score > existing.score:
candidates[key] = Candidate(
recording_id=recording_id,
title=title,
artists=artists,
releases=releases,
score=score,
)
return sorted(candidates.values(), key=lambda c: c.score, reverse=True)
def candidate_to_dict(candidate: Candidate) -> Dict[str, Any]:
return {
"musicbrainz_recording_id": candidate.recording_id,
"title": candidate.title,
"artists": list(candidate.artists),
"releases": list(candidate.releases),
"artist": candidate.artist_key,
"release": candidate.release_key,
"score": candidate.score,
}
def choose_best_guess(candidates: Sequence[Candidate]) -> Optional[Dict[str, Any]]:
if not candidates:
return None
return candidate_to_dict(candidates[0])
def track_is_ambiguous(candidates: Sequence[Candidate]) -> bool:
if len(candidates) <= 1:
return False
artists = {c.artist_key for c in candidates if c.artist_key}
releases = {c.release_key for c in candidates if c.release_key}
return len(artists) > 1 or len(releases) > 1
def summarize_album(tracks: Sequence[TrackResult]) -> Dict[str, Any]:
pair_stats: Dict[Tuple[str, str], Dict[str, Any]] = defaultdict(lambda: {"tracks": 0, "score_sum": 0.0, "files": []})
artist_counter: Counter[str] = Counter()
release_counter: Counter[str] = Counter()
ambiguous_files: List[str] = []
for track in tracks:
if track.ambiguous:
ambiguous_files.append(track.file)
if not track.best_guess:
continue
artist = str(track.best_guess.get("artist") or "")
release = str(track.best_guess.get("release") or "")
score = float(track.best_guess.get("score") or 0.0)
key = (artist, release)
pair_stats[key]["tracks"] += 1
pair_stats[key]["score_sum"] += score
pair_stats[key]["files"].append(track.file)
if artist:
artist_counter[artist] += 1
if release:
release_counter[release] += 1
ranked_pairs = sorted(
(
{
"artist": artist,
"release": release,
"tracks": stats["tracks"],
"score_sum": round(stats["score_sum"], 4),
"files": stats["files"],
}
for (artist, release), stats in pair_stats.items()
),
key=lambda item: (item["tracks"], item["score_sum"]),
reverse=True,
)
majority_pair = ranked_pairs[0] if ranked_pairs else None
majority_track_count = majority_pair["tracks"] if majority_pair else 0
return {
"track_count": len(tracks),
"ambiguous_track_count": len(ambiguous_files),
"ambiguous_files": ambiguous_files,
"top_artist_candidates": artist_counter.most_common(10),
"top_release_candidates": release_counter.most_common(10),
"candidate_pairs": ranked_pairs,
"majority_pair": majority_pair,
"majority_track_count": majority_track_count,
}
def choose_majority_album(summary: Dict[str, Any]) -> Tuple[str, str]:
pair = summary.get("majority_pair") or {}
return str(pair.get("artist") or ""), str(pair.get("release") or "")
def _match_candidate_to_album(track: TrackResult, artist: str, release: str) -> Optional[str]:
for candidate in track.candidates:
if str(candidate.get("artist") or "") == artist and str(candidate.get("release") or "") == release:
return str(candidate.get("musicbrainz_recording_id") or "") or None
return None
def resolve_majority_recording_ids(tracks: Sequence[TrackResult], summary: Dict[str, Any]) -> Dict[str, Any]:
artist, release = choose_majority_album(summary)
selected_tracks: List[Dict[str, Any]] = []
unresolved_tracks: List[str] = []
for track in tracks:
recording_id = _match_candidate_to_album(track, artist, release)
if recording_id is None and track.best_guess:
bg_artist = str(track.best_guess.get("artist") or "")
bg_release = str(track.best_guess.get("release") or "")
if bg_artist == artist and bg_release == release:
recording_id = str(track.best_guess.get("musicbrainz_recording_id") or "") or None
if recording_id:
selected_tracks.append({"file": track.file, "musicbrainz_recording_id": recording_id})
else:
unresolved_tracks.append(track.file)
return {
"artist": artist,
"release": release,
"tracks": selected_tracks,
"track_count": len(selected_tracks),
"unresolved_files": unresolved_tracks,
}
def scan_directory(directory: Path, recursive: bool, maxlength: int, delay: float) -> Dict[str, Any]:
files = iter_mp3_files(directory, recursive=recursive)
results: List[TrackResult] = []
if not files:
return {
"directory": str(directory),
"tracks": [],
"album_summary": {
"track_count": 0,
"ambiguous_track_count": 0,
"ambiguous_files": [],
"top_artist_candidates": [],
"top_release_candidates": [],
"candidate_pairs": [],
"majority_pair": None,
"majority_track_count": 0,
},
"majority_album": {"artist": "", "release": "", "tracks": [], "track_count": 0, "unresolved_files": []},
}
for index, path in enumerate(files, start=1):
print(f"[{index}/{len(files)}] {path}", file=sys.stderr)
duration, fingerprint, fp_error = fingerprint_mp3(path, maxlength=maxlength)
lookup_error = None
candidates: List[Candidate] = []
if fingerprint and duration is not None:
try:
payload = acoustid_lookup(duration=duration, fingerprint=fingerprint)
candidates = extract_candidates(payload)
except Exception as exc:
lookup_error = str(exc)
else:
lookup_error = fp_error or "missing fingerprint"
track = TrackResult(
file=str(path),
duration=duration,
fingerprint=fingerprint,
fingerprint_error=fp_error,
lookup_error=lookup_error,
candidates=[candidate_to_dict(c) for c in candidates],
best_guess=choose_best_guess(candidates),
ambiguous=track_is_ambiguous(candidates),
)
results.append(track)
if delay > 0 and index < len(files):
time.sleep(delay)
summary = summarize_album(results)
majority_album = resolve_majority_recording_ids(results, summary)
return {
"directory": str(directory),
"tracks": [asdict(track) for track in results],
"album_summary": summary,
"majority_album": majority_album,
}
def format_human_report(payload: Dict[str, Any]) -> str:
lines: List[str] = []
lines.append(f"Directory: {payload['directory']}")
lines.append(f"Tracks: {len(payload.get('tracks', []))}")
summary = payload.get("album_summary", {})
lines.append(f"Ambiguous tracks: {summary.get('ambiguous_track_count', 0)}")
majority = payload.get("majority_album", {})
lines.append(
f"Majority album: artist={majority.get('artist') or '-'} | release={majority.get('release') or '-'} | matched_tracks={majority.get('track_count', 0)}"
)
lines.append("")
for i, track in enumerate(payload.get("tracks", []), start=1):
lines.append(f"{i}. {track['file']}")
lines.append(f" duration: {track.get('duration')}")
if track.get("fingerprint_error"):
lines.append(f" fingerprint_error: {track['fingerprint_error']}")
if track.get("lookup_error"):
lines.append(f" lookup_error: {track['lookup_error']}")
if track.get("best_guess"):
bg = track["best_guess"]
lines.append(
f" best_guess: artist={bg.get('artist') or '-'} | release={bg.get('release') or '-'} | title={bg.get('title') or '-'} | score={bg.get('score')}"
)
else:
lines.append(" best_guess: -")
if track.get("ambiguous"):
lines.append(" ambiguous: yes")
if track.get("candidates"):
lines.append(" candidates:")
for cand in track["candidates"]:
lines.append(
f" - score={cand.get('score')} | artist={cand.get('artist') or '-'} | release={cand.get('release') or '-'} | title={cand.get('title') or '-'} | musicbrainz_recording_id={cand.get('musicbrainz_recording_id') or '-'}"
)
lines.append("")
lines.append("Album-level candidate pairs:")
for cand in summary.get("candidate_pairs", []):
lines.append(
f"- tracks={cand['tracks']} | score_sum={cand['score_sum']} | artist={cand['artist'] or '-'} | release={cand['release'] or '-'}"
)
if not summary.get("candidate_pairs"):
lines.append("- none")
lines.append("")
lines.append("Majority album recording IDs:")
majority_tracks = majority.get("tracks", [])
if majority_tracks:
for item in majority_tracks:
lines.append(f"- {item['file']} -> {item['recording_id']}")
else:
lines.append("- none")
unresolved = majority.get("unresolved_files", [])
if unresolved:
lines.append("Unresolved files:")
for file in unresolved:
lines.append(f"- {file}")
return "\n".join(lines)
def main(argv: Optional[Sequence[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Fingerprint a directory of MP3s with AcoustID and aggregate candidate releases/artists without calling MusicBrainz.",
)
parser.add_argument("path", help="MP3 file or directory to scan")
parser.add_argument("--no-recursive", action="store_true", help="Do not recurse into subdirectories")
parser.add_argument("--maxlength", type=int, default=DEFAULT_MAXLEN, help="Maximum audio length passed to fingerprinting")
parser.add_argument("--delay", type=float, default=0.0, help="Delay in seconds between AcoustID lookups")
parser.add_argument("--json", action="store_true", help="Print JSON output instead of human-readable text")
parser.add_argument("--output", type=str, default="", help="Write JSON output to a file")
args = parser.parse_args(argv)
root = Path(args.path).expanduser().resolve()
if not root.exists():
print(f"Path not found: {root}", file=sys.stderr)
return 2
payload = scan_directory(root, recursive=not args.no_recursive, maxlength=args.maxlength, delay=args.delay)
if args.output:
Path(args.output).expanduser().resolve().write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
if args.json:
print(json.dumps(payload, ensure_ascii=False, indent=2))
else:
print(format_human_report(payload))
return 0
if __name__ == "__main__":
raise SystemExit(main())