diff --git a/mapping_store.py b/mapping_store.py new file mode 100644 index 0000000000000000000000000000000000000000..7ed6897c0e885295ec68dc3a35ec77666454b67d --- /dev/null +++ b/mapping_store.py @@ -0,0 +1,364 @@ +import json +import os +import sqlite3 +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + + +class MappingStore: + def __init__(self, db_path: str = "synomap.db"): + self.db_path = db_path + self._ensure_directory() + self._init_db() + + def _ensure_directory(self) -> None: + directory = os.path.dirname(os.path.abspath(self.db_path)) + if directory and not os.path.exists(directory): + os.makedirs(directory, exist_ok=True) + + def _get_connection(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def _log(self, level: str, msg: str, **kwargs: Any) -> None: + payload = {"level": level, "msg": msg} + if kwargs: + payload.update(kwargs) + try: + print(json.dumps(payload)) + except Exception: + print(json.dumps({"level": level, "msg": msg, "error": "log_failure"})) + + def _normalize_hash(self, infohash: Optional[str]) -> Optional[str]: + return infohash.strip().lower() if infohash is not None else None + + def _normalize_path(self, path: Optional[str]) -> Optional[str]: + if path is None: + return None + return path.strip().rstrip("/\\") + + def _format_timestamp(self, ts: Optional[str] = None) -> str: + if ts is None: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + parsed = self._parse_timestamp(ts) + dt = parsed if parsed is not None else datetime.now(timezone.utc) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + + def _parse_timestamp(self, ts: Optional[str]) -> Optional[datetime]: + if not ts: + return None + try: + if ts.endswith("Z"): + ts = ts[:-1] + "+00:00" + parsed = datetime.fromisoformat(ts) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed + except Exception: + return None + + def _init_db(self) -> None: + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS mapping_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + infohash TEXT NOT NULL, + source TEXT, + destination TEXT, + type TEXT, + timestamp TEXT, + release_group TEXT, + files_json TEXT, + origin TEXT DEFAULT 'event' + ) + """ + ) + cursor.execute("CREATE INDEX IF NOT EXISTS idx_mapping_events_infohash ON mapping_events(infohash)") + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS mapping_latest ( + infohash TEXT PRIMARY KEY, + source_path TEXT, + dest_path TEXT, + type TEXT, + diagnostic_status TEXT, + diagnostic_json TEXT, + last_updated TEXT + ) + """ + ) + conn.commit() + except Exception as exc: + conn.rollback() + self._log("error", "init_db_failed", error=str(exc)) + raise + finally: + conn.close() + + def insert_event(self, event: Dict[str, Any]) -> None: + infohash = self._normalize_hash(event.get("infohash")) + if not infohash: + raise ValueError("infohash is required") + + source = self._normalize_path(event.get("source")) + destination = self._normalize_path(event.get("destination")) + event_type = event.get("type") + timestamp = self._format_timestamp(event.get("timestamp")) + release_group = event.get("release_group") + files = event.get("files") + origin = event.get("origin", "event") + + if event_type not in {"tv", "movie"}: + self._log("warn", "invalid_type", infohash=infohash, type=event_type) + + files_json = None + if files is not None: + try: + files_json = json.dumps(files) + except Exception: + self._log("warn", "files_dump_failed", infohash=infohash) + + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO mapping_events (infohash, source, destination, type, timestamp, release_group, files_json, origin) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + (infohash, source, destination, event_type, timestamp, release_group, files_json, origin), + ) + conn.commit() + except Exception as exc: + conn.rollback() + self._log("error", "insert_event_failed", infohash=infohash, error=str(exc)) + raise + finally: + conn.close() + + self.rebuild_latest(infohash) + + def list_events(self, infohash: str) -> List[Dict[str, Any]]: + normalized_hash = self._normalize_hash(infohash) + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute( + "SELECT id, infohash, source, destination, type, timestamp, release_group, files_json, origin FROM mapping_events WHERE infohash = ?", + (normalized_hash,), + ) + rows = cursor.fetchall() + finally: + conn.close() + + events: List[Dict[str, Any]] = [] + for row in rows: + files_value = row[7] + try: + files_data = json.loads(files_value) if files_value is not None else None + except Exception: + files_data = None + events.append( + { + "id": row[0], + "infohash": row[1], + "source": self._normalize_path(row[2]), + "destination": self._normalize_path(row[3]), + "type": row[4], + "timestamp": row[5], + "release_group": row[6], + "files": files_data, + "origin": row[8], + } + ) + return events + + def compute_diagnostic(self, events: List[Dict[str, Any]]) -> Dict[str, Any]: + if not events: + return {"status": "MISSING", "detail": "no events"} + + normalized_dests = [] + status = "OK" + detail = "ok" + candidates: List[str] = [] + corrupt_reasons: List[str] = [] + + for event in events: + dest_norm = self._normalize_path(event.get("destination")) + if dest_norm: + normalized_dests.append(dest_norm) + event_type = event.get("type") + if event_type not in {"tv", "movie"}: + corrupt_reasons.append(f"invalid type: {event_type}") + + unique_dests = set(normalized_dests) + + if len(unique_dests) > 1: + status = "MULTI" + detail = "multiple destinations" + candidates = sorted(unique_dests) + elif len(unique_dests) == 1: + dest = next(iter(unique_dests)) + if dest is None or dest == "": + status = "PARTIAL" + detail = "missing destination" + else: + # no non-empty destinations found + status = "PARTIAL" + detail = "missing destination" + + if corrupt_reasons and status == "OK": + status = "CORRUPT" + detail = "; ".join(corrupt_reasons) + elif corrupt_reasons: + detail = f"{detail}; corrupt: {'; '.join(corrupt_reasons)}" + + diagnostic = {"status": status, "detail": detail} + if candidates: + diagnostic["candidates"] = candidates + return diagnostic + + def rebuild_latest(self, infohash: str) -> None: + normalized_hash = self._normalize_hash(infohash) + events = self.list_events(normalized_hash) + + diagnostic = self.compute_diagnostic(events) + now_ts = self._format_timestamp() + + if not events: + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO mapping_latest (infohash, source_path, dest_path, type, diagnostic_status, diagnostic_json, last_updated) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(infohash) DO UPDATE SET + source_path=excluded.source_path, + dest_path=excluded.dest_path, + type=excluded.type, + diagnostic_status=excluded.diagnostic_status, + diagnostic_json=excluded.diagnostic_json, + last_updated=excluded.last_updated + """, + (normalized_hash, None, None, None, diagnostic.get("status"), json.dumps(diagnostic), now_ts), + ) + conn.commit() + except Exception as exc: + conn.rollback() + self._log("error", "rebuild_latest_failed", infohash=normalized_hash, error=str(exc)) + raise + finally: + conn.close() + return + + enriched_events = [] + for event in events: + parsed_ts = self._parse_timestamp(event.get("timestamp")) + enriched_events.append((parsed_ts, event)) + + enriched_events.sort(key=lambda item: item[0] or datetime.min.replace(tzinfo=timezone.utc)) + winner = enriched_events[-1][1] + + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO mapping_latest (infohash, source_path, dest_path, type, diagnostic_status, diagnostic_json, last_updated) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(infohash) DO UPDATE SET + source_path=excluded.source_path, + dest_path=excluded.dest_path, + type=excluded.type, + diagnostic_status=excluded.diagnostic_status, + diagnostic_json=excluded.diagnostic_json, + last_updated=excluded.last_updated + """, + ( + normalized_hash, + self._normalize_path(winner.get("source")), + self._normalize_path(winner.get("destination")), + winner.get("type"), + diagnostic.get("status"), + json.dumps(diagnostic), + now_ts, + ), + ) + conn.commit() + except Exception as exc: + conn.rollback() + self._log("error", "rebuild_latest_failed", infohash=normalized_hash, error=str(exc)) + raise + finally: + conn.close() + + def get_mapping(self, infohash: str) -> Optional[Dict[str, Any]]: + normalized_hash = self._normalize_hash(infohash) + conn = self._get_connection() + try: + cursor = conn.cursor() + cursor.execute( + "SELECT infohash, source_path, dest_path, type, diagnostic_status, diagnostic_json, last_updated FROM mapping_latest WHERE infohash = ?", + (normalized_hash,), + ) + row = cursor.fetchone() + finally: + conn.close() + + if row is None: + return None + + diagnostic_data = None + if row[5]: + try: + diagnostic_data = json.loads(row[5]) + except Exception: + diagnostic_data = {"status": row[4], "detail": "diagnostic_parse_error"} + + mapping: Dict[str, Any] = { + "infohash": row[0], + "source_path": self._normalize_path(row[1]), + "dest_path": self._normalize_path(row[2]), + "type": row[3], + "diagnostic": diagnostic_data or {"status": row[4]}, + "last_updated": row[6], + "events": self.list_events(normalized_hash), + } + + if mapping["diagnostic"].get("status") == "MISSING": + mapping = {"infohash": row[0], "diagnostic": mapping["diagnostic"]} + + return mapping + + def import_legacy_file(self, filepath: str) -> None: + if not os.path.isfile(filepath): + raise FileNotFoundError(filepath) + + with open(filepath, "r", encoding="utf-8") as legacy_file: + for line in legacy_file: + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + parts = stripped.split("|") + if len(parts) < 5: + self._log("warn", "legacy_line_invalid", line=stripped) + continue + legacy_event = { + "infohash": parts[0], + "source": parts[1], + "destination": parts[2], + "type": parts[3], + "timestamp": parts[4], + "origin": "legacy", + } + self.insert_event(legacy_event) + + +__all__ = ["MappingStore"]