+"""MappingStore module for Synomap V2. + +This module implements a lightweight SQLite-backed store to record raw +Sonarr/Radarr mapping events, consolidate the latest known mapping state, and +produce diagnostics compatible with the Synomap A4 model. It is intentionally +self-contained, uses only the Python standard library, and avoids any +filesystem or external service side effects. +""" +from __future__ import annotations + +import json +import os +import sqlite3 +from datetime import datetime +from typing import Any, Iterable + + +class MappingStore: + """SQLite-backed store for Synomap mapping events and diagnostics.""" + + REQUIRED_FIELDS = ["infohash", "source", "destination", "type", "timestamp"] + VALID_TYPES = {"tv", "movie"} + + def __init__(self, db_path: str = "synomap.db") -> None: + self.db_path = db_path + self._ensure_directory() + self.conn = sqlite3.connect(self.db_path, check_same_thread=False) + self.conn.row_factory = sqlite3.Row + self._init_db() + + def _ensure_directory(self) -> None: + directory = os.path.dirname(os.path.abspath(self.db_path)) + if directory and not os.path.exists(directory): + os.makedirs(directory, exist_ok=True) + + def _init_db(self) -> None: + cursor = self.conn.cursor() + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS mapping_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + infohash TEXT NOT NULL, + source_path TEXT, + dest_path TEXT, + type TEXT, + timestamp TEXT, + release_group TEXT, + files TEXT + ) + """ + ) + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS mapping_latest ( + infohash TEXT PRIMARY KEY, + source_path TEXT, + dest_path TEXT, + type TEXT, + diagnostic_status TEXT, + diagnostic_detail TEXT, + candidates TEXT, + updated_at TEXT + ) + """ + ) + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS mapping_diagnostics ( + infohash TEXT PRIMARY KEY, + status TEXT, + detail TEXT, + candidates TEXT, + updated_at TEXT + ) + """ + ) + self.conn.commit() + + def _log(self, level: str, message: str, **extra: Any) -> None: + payload = {"level": level, "message": message} + if extra: + payload.update(extra) + print(json.dumps(payload, ensure_ascii=False)) + + def _parse_timestamp(self, value: str | None) -> datetime | None: + if not value: + return None + normalized = value.replace("Z", "+00:00") + try: + return datetime.fromisoformat(normalized) + except ValueError: + return None + + def _serialize_files(self, files: Any) -> str | None: + if files is None: + return None + try: + return json.dumps(files) + except (TypeError, ValueError): + return None + + def insert_event(self, event: dict) -> None: + infohash = (event.get("infohash") or "").strip() + if not infohash: + raise ValueError("infohash is required") + + event_data = { + "infohash": infohash, + "source": event.get("source"), + "destination": event.get("destination"), + "type": event.get("type"), + "timestamp": event.get("timestamp"), + "release_group": event.get("release_group"), + "files": event.get("files"), + } + + missing_required = [k for k in self.REQUIRED_FIELDS if not event_data.get(k)] + if missing_required: + self._log( + "WARN", + "event missing required fields", + infohash=infohash, + missing_fields=missing_required, + ) + + if event_data["type"] and event_data["type"] not in self.VALID_TYPES: + self._log( + "WARN", + "invalid type provided", + infohash=infohash, + type=event_data["type"], + ) + + cursor = self.conn.cursor() + cursor.execute( + """ + SELECT 1 FROM mapping_events + WHERE infohash = ? AND source_path = ? AND dest_path = ? AND type = ? AND timestamp = ? + LIMIT 1 + """, + ( + infohash, + event_data["source"], + event_data["destination"], + event_data["type"], + event_data["timestamp"], + ), + ) + if cursor.fetchone(): + self._log("INFO", "duplicate event ignored", infohash=infohash) + return + + cursor.execute( + """ + INSERT INTO mapping_events ( + infohash, source_path, dest_path, type, timestamp, release_group, files + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + infohash, + event_data["source"], + event_data["destination"], + event_data["type"], + event_data["timestamp"], + event_data["release_group"], + self._serialize_files(event_data["files"]), + ), + ) + self.conn.commit() + self._log("INFO", "event inserted", infohash=infohash) + self.rebuild_latest(infohash) + + def list_events(self, infohash: str) -> list[dict]: + cursor = self.conn.cursor() + cursor.execute( + """ + SELECT * FROM mapping_events + WHERE infohash = ? + ORDER BY + COALESCE(timestamp, '') DESC, + id DESC + """, + (infohash,), + ) + rows = cursor.fetchall() + events = [] + for row in rows: + data = dict(row) + files_raw = data.pop("files", None) + if files_raw: + try: + data["files"] = json.loads(files_raw) + except json.JSONDecodeError: + data["files"] = None + else: + data["files"] = None + events.append( + { + "infohash": data.get("infohash"), + "source": data.get("source_path"), + "destination": data.get("dest_path"), + "type": data.get("type"), + "timestamp": data.get("timestamp"), + "release_group": data.get("release_group"), + "files": data.get("files"), + "id": data.get("id"), + } + ) + return events + + def compute_diagnostic(self, events: list[dict]) -> dict: + if not events: + return {"status": "MISSING", "detail": "no events recorded", "candidates": []} + + destinations = [e.get("destination") for e in events if e.get("destination")] + types = [e.get("type") for e in events if e.get("type")] + unique_dest = sorted({d for d in destinations if d}) + unique_types = sorted({t for t in types if t}) + + if len(unique_dest) > 1: + return { + "status": "MULTI", + "detail": "multiple destination candidates", + "candidates": unique_dest, + } + + if len(unique_types) > 1: + return { + "status": "MULTI", + "detail": "type conflict detected", + "candidates": unique_types, + } + + latest_event = self._select_latest_event(events) + missing_fields = [ + field + for field in ["source", "destination", "type", "timestamp"] + if not latest_event.get(field) + ] + if latest_event.get("type") and latest_event.get("type") not in self.VALID_TYPES: + return { + "status": "CORRUPT", + "detail": f"invalid type {latest_event.get('type')}", + "candidates": [], + } + + if missing_fields: + return { + "status": "PARTIAL", + "detail": f"missing fields: {', '.join(missing_fields)}", + "candidates": [], + } + + return { + "status": "OK", + "detail": "unique mapping consolidated", + "candidates": unique_dest or destinations, + } + + def _select_latest_event(self, events: Iterable[dict]) -> dict: + latest: dict | None = None + latest_ts: datetime | None = None + for event in events: + ts = self._parse_timestamp(event.get("timestamp")) + if ts is None and latest is None: + latest = event + elif ts is not None: + if latest_ts is None or ts > latest_ts: + latest = event + latest_ts = ts + return latest or {} + + def rebuild_latest(self, infohash: str) -> None: + events = self.list_events(infohash) + diagnostic = self.compute_diagnostic(events) + + cursor = self.conn.cursor() + if not events: + cursor.execute("DELETE FROM mapping_latest WHERE infohash = ?", (infohash,)) + cursor.execute("DELETE FROM mapping_diagnostics WHERE infohash = ?", (infohash,)) + self.conn.commit() + return + + latest_event = self._select_latest_event(events) + consolidated_dest = ( + None if diagnostic["status"] == "MULTI" else latest_event.get("destination") + ) + consolidated_source = latest_event.get("source") + consolidated_type = None if diagnostic["status"] == "MULTI" else latest_event.get("type") + + now_iso = datetime.utcnow().replace(microsecond=0).isoformat() + "Z" + + cursor.execute( + """ + INSERT INTO mapping_latest ( + infohash, source_path, dest_path, type, diagnostic_status, diagnostic_detail, + candidates, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(infohash) DO UPDATE SET + source_path=excluded.source_path, + dest_path=excluded.dest_path, + type=excluded.type, + diagnostic_status=excluded.diagnostic_status, + diagnostic_detail=excluded.diagnostic_detail, + candidates=excluded.candidates, + updated_at=excluded.updated_at + """, + ( + infohash, + consolidated_source, + consolidated_dest, + consolidated_type, + diagnostic.get("status"), + diagnostic.get("detail"), + json.dumps(diagnostic.get("candidates") or []), + now_iso, + ), + ) + + cursor.execute( + """ + INSERT INTO mapping_diagnostics (infohash, status, detail, candidates, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(infohash) DO UPDATE SET + status=excluded.status, + detail=excluded.detail, + candidates=excluded.candidates, + updated_at=excluded.updated_at + """, + ( + infohash, + diagnostic.get("status"), + diagnostic.get("detail"), + json.dumps(diagnostic.get("candidates") or []), + now_iso, + ), + ) + self.conn.commit() + + def get_mapping(self, infohash: str) -> dict | None: + events = self.list_events(infohash) + diagnostic = self.compute_diagnostic(events) + if diagnostic["status"] == "MISSING": + return {"infohash": infohash, "diagnostic": diagnostic, "events": []} + + self.rebuild_latest(infohash) + cursor = self.conn.cursor() + cursor.execute( + "SELECT * FROM mapping_latest WHERE infohash = ?", + (infohash,), + ) + row = cursor.fetchone() + if not row: + return {"infohash": infohash, "diagnostic": diagnostic, "events": events} + + mapping = dict(row) + candidates_raw = mapping.get("candidates") + try: + candidates = json.loads(candidates_raw) if candidates_raw else [] + except json.JSONDecodeError: + candidates = [] + + result = { + "infohash": mapping.get("infohash"), + "source_path": mapping.get("source_path"), + "dest_path": mapping.get("dest_path"), + "type": mapping.get("type"), + "diagnostic": { + "status": mapping.get("diagnostic_status"), + "detail": mapping.get("diagnostic_detail"), + "candidates": candidates, + }, + "events": events, + } + return result + + def import_legacy_file(self, filepath: str) -> None: + if not os.path.exists(filepath): + raise FileNotFoundError(filepath) + + with open(filepath, "r", encoding="utf-8") as handle: + for line_no, line in enumerate(handle, start=1): + stripped = line.strip() + if not stripped: + continue + parts = stripped.split("|") + if len(parts) < 5: + self._log( + "ERROR", + "invalid legacy line", + line_number=line_no, + line_content=stripped, + ) + continue + infohash, source, dest, type_value, timestamp = parts[:5] + event = { + "infohash": infohash.strip(), + "source": source.strip() or None, + "destination": dest.strip() or None, + "type": type_value.strip() or None, + "timestamp": timestamp.strip() or None, + "release_group": None, + "files": None, + } + try: + self.insert_event(event) + except Exception as exc: # pragma: no cover - defensive logging + self._log( + "ERROR", + "failed to import legacy line", + line_number=line_no, + line_content=stripped, + error=str(exc), + ) +