""" SYNOMAP V2 - MappingDB_v1 Module: mapping_store.py Role: Manages the 'synomap.db' SQLite database, acting as the Source-of-Truth for torrent-to-filesystem mappings. Replaces 'mapping_entries.txt'. Features: - Stores raw import events (Sonarr/Radarr) and legacy entries. - Consolidates the latest state per infohash (mapping_latest). - Diagnostics (MISSING, MULTI, PARTIAL, OK). - Strict schema compliance (A4) and transactional safety. Design: - Fusion of Code A and Code B based on QC Round 3 Audit. - No external dependencies (Standard Library only). - Thread-safe ephemeral connections. - JSON Logging on stdout. Author: AI Developer (Fused Version) Version: 1.0.0 (FUSION_FINAL) Python: 3.10+ """ import sqlite3 import json import os import sys from datetime import datetime from typing import Optional, Dict, List, Any class MappingStore: """ Main class for interacting with the Synomap SQLite database. Strictly follows FUSION_PLAN specifications. """ DB_SCHEMA = [ """ CREATE TABLE IF NOT EXISTS mapping_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, infohash TEXT NOT NULL, source TEXT, destination TEXT, type TEXT, timestamp TEXT, release_group TEXT, files_json TEXT, origin TEXT DEFAULT 'event' ); """, """ CREATE INDEX IF NOT EXISTS idx_events_infohash ON mapping_events(infohash); """, """ CREATE TABLE IF NOT EXISTS mapping_latest ( infohash TEXT PRIMARY KEY, source_path TEXT, dest_path TEXT, type TEXT, diagnostic_status TEXT, diagnostic_json TEXT, last_updated TEXT ); """ ] def __init__(self, db_path: str = "synomap.db"): """ Initializes the database path and ensures the environment is ready. Args: db_path (str): Path to the SQLite file. """ self.db_path = db_path self._ensure_directory() self._init_db() # ========================================================================= # PRIVATE HELPERS (Strictly per FUSION_PLAN) # ========================================================================= def _ensure_directory(self) -> None: """Ensures the parent directory of the database exists (Issue I2).""" dirname = os.path.dirname(self.db_path) if dirname and not os.path.exists(dirname): try: os.makedirs(dirname, exist_ok=True) except OSError as e: self._log("CRITICAL", f"Failed to create directory {dirname}: {e}") raise def _get_connection(self) -> sqlite3.Connection: """Factory for ephemeral database connections (Issue I4).""" try: conn = sqlite3.connect(self.db_path, timeout=10.0) conn.row_factory = sqlite3.Row return conn except sqlite3.Error as e: self._log("ERROR", f"DB Connection failed: {e}") raise def _log(self, level: str, msg: str, **kwargs) -> None: """Logs structured JSON to stdout (Issue I1).""" entry = { "timestamp": datetime.utcnow().isoformat() + "Z", "level": level, "message": msg, **kwargs } print(json.dumps(entry), file=sys.stdout) def _normalize_hash(self, infohash: str) -> str: """Standardizes infohash to lowercase and stripped (Issue I8).""" if not infohash: return "" return infohash.strip().lower() def _normalize_path(self, path: Optional[str]) -> Optional[str]: """Standardizes paths (strip whitespace and trailing slashes) (Issue I9).""" if path is None: return None cleaned = path.strip() if not cleaned: return None return cleaned.rstrip("/\\") def _init_db(self) -> None: """Creates tables and indexes if they do not exist.""" conn = self._get_connection() try: cursor = conn.cursor() for statement in self.DB_SCHEMA: cursor.execute(statement) conn.commit() except sqlite3.Error as e: self._log("CRITICAL", f"DB Init failed: {e}") conn.rollback() raise finally: conn.close() def _parse_timestamp(self, ts_str: str) -> datetime: """Robust ISO8601 parsing (handles Z for Python < 3.11).""" try: # Handle 'Z' manually for Python 3.10 compatibility clean_ts = ts_str.replace("Z", "+00:00") return datetime.fromisoformat(clean_ts) except (ValueError, TypeError): # Fallback for very old/broken timestamps: return min date return datetime.min # ========================================================================= # PUBLIC API # ========================================================================= def insert_event(self, event: Dict[str, Any], origin: str = 'event') -> None: """ Inserts a raw event and triggers consolidation. Args: event: Dict containing infohash, source, destination, type, timestamp. origin: 'event' (default) or 'legacy'. """ # 1. Validation and Normalization required = ["infohash", "source", "destination", "type", "timestamp"] for field in required: if field not in event or event[field] is None: self._log("WARN", f"Skipping event missing field: {field}", event=event) return infohash = self._normalize_hash(event["infohash"]) source = self._normalize_path(event["source"]) destination = self._normalize_path(event["destination"]) type_ = event["type"] timestamp = event["timestamp"] # Assumed ISO8601 (Issue I6) # Files handling files = event.get("files", []) files_json = json.dumps(files) # Type validation (Logging only, as per FUSION_PLAN) if type_ not in ('tv', 'movie'): self._log("WARN", f"Unknown media type: {type_}", infohash=infohash) conn = self._get_connection() try: cursor = conn.cursor() cursor.execute(""" INSERT INTO mapping_events (infohash, source, destination, type, timestamp, release_group, files_json, origin) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( infohash, source, destination, type_, timestamp, event.get("release_group"), files_json, origin )) conn.commit() self._log("INFO", f"Event inserted for {infohash}", origin=origin) except sqlite3.Error as e: self._log("ERROR", f"Insert failed: {e}", infohash=infohash) conn.rollback() raise finally: conn.close() # 2. Trigger Rebuild (Separate transaction as per design) self.rebuild_latest(infohash) def list_events(self, infohash: str) -> List[Dict[str, Any]]: """Returns raw events for an infohash.""" infohash = self._normalize_hash(infohash) conn = self._get_connection() try: cursor = conn.cursor() cursor.execute( "SELECT * FROM mapping_events WHERE infohash = ?", (infohash,) ) rows = cursor.fetchall() results = [] for row in rows: d = dict(row) d['files'] = json.loads(d['files_json']) if d['files_json'] else [] del d['files_json'] results.append(d) return results except sqlite3.Error as e: self._log("ERROR", f"List events failed: {e}", infohash=infohash) return [] finally: conn.close() def compute_diagnostic(self, events: List[Dict[str, Any]]) -> Dict[str, Any]: """ Analyzes events to determine diagnostic status (OK, MULTI, PARTIAL, MISSING). Performs analysis on Normalized paths. """ if not events: return { "status": "MISSING", "detail": "No events found", "candidates": [] } dest_candidates = set() type_candidates = set() for e in events: # Re-normalize just in case, though DB should be clean d = self._normalize_path(e.get("destination")) t = e.get("type") if t: type_candidates.add(t) # Issue I11: Consider PARTIAL if None or empty string if d: dest_candidates.add(d) candidates_list = sorted(list(dest_candidates)) # Check for Type Conflict if len(type_candidates) > 1: return { "status": "MULTI", "detail": f"TYPE_CONFLICT: {list(type_candidates)}", "candidates": candidates_list } # Check Destinations if len(dest_candidates) == 0: return { "status": "PARTIAL", "detail": "Events exist but no valid destination path", "candidates": [] } if len(dest_candidates) > 1: return { "status": "MULTI", "detail": "Multiple divergent destination paths", "candidates": candidates_list } return { "status": "OK", "detail": "Unique consolidated mapping", "candidates": candidates_list } def rebuild_latest(self, infohash: str) -> None: """ Consolidates events into the 'mapping_latest' table. Logic strictly follows FUSION_PLAN Point 4. """ infohash = self._normalize_hash(infohash) # 1. Fetch all events (Raw) events = self.list_events(infohash) conn = self._get_connection() try: cursor = conn.cursor() # 2. Case: Empty list (Issue I7) if not events: cursor.execute(""" INSERT INTO mapping_latest (infohash, source_path, dest_path, type, diagnostic_status, diagnostic_json, last_updated) VALUES (?, NULL, NULL, NULL, 'MISSING', ?, ?) ON CONFLICT(infohash) DO UPDATE SET source_path=NULL, dest_path=NULL, type=NULL, diagnostic_status='MISSING', diagnostic_json=excluded.diagnostic_json, last_updated=excluded.last_updated """, ( infohash, json.dumps({"status": "MISSING"}), datetime.utcnow().isoformat() + "Z" )) conn.commit() return # 3. Case: Events exist # Parse dates and sort in Python (Issue I10) for e in events: e['_dt'] = self._parse_timestamp(e['timestamp']) # Sort by date ASC events.sort(key=lambda x: x['_dt']) # Compute diagnostic diag = self.compute_diagnostic(events) # Identify Winning Event (Last one) winner = events[-1] # Determine consolidated fields # If MULTI, dest_path is ambiguous (None), otherwise takes candidate final_dest = None if diag['status'] == 'OK' and diag['candidates']: final_dest = diag['candidates'][0] # Upsert cursor.execute(""" INSERT INTO mapping_latest (infohash, source_path, dest_path, type, diagnostic_status, diagnostic_json, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(infohash) DO UPDATE SET source_path=excluded.source_path, dest_path=excluded.dest_path, type=excluded.type, diagnostic_status=excluded.diagnostic_status, diagnostic_json=excluded.diagnostic_json, last_updated=excluded.last_updated """, ( infohash, winner.get('source'), final_dest, winner.get('type'), diag['status'], json.dumps(diag), datetime.utcnow().isoformat() + "Z" )) conn.commit() except sqlite3.Error as e: self._log("ERROR", f"Rebuild failed: {e}", infohash=infohash) conn.rollback() raise finally: conn.close() def get_mapping(self, infohash: str) -> Optional[Dict[str, Any]]: """ Retrieves the consolidated mapping for an infohash. Returns strict A4 structure. """ infohash = self._normalize_hash(infohash) conn = self._get_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM mapping_latest WHERE infohash = ?", (infohash,)) row = cursor.fetchone() if not row: return None data = dict(row) diag_data = json.loads(data['diagnostic_json']) if data['diagnostic_json'] else {} # Return strict A4 compatible partial structure return { "infohash": data['infohash'], "source_path": data['source_path'], "dest_path": data['dest_path'], "type": data['type'], "events": self.list_events(infohash), # Optional but requested in Spec 5 "diagnostic": diag_data } except sqlite3.Error as e: self._log("ERROR", f"Get mapping failed: {e}", infohash=infohash) return None finally: conn.close() def import_legacy_file(self, filepath: str) -> None: """ Imports legacy 'mapping_entries.txt'. Format: INFOHASH|SRC|DST|TYPE|TIMESTAMP """ if not os.path.exists(filepath): self._log("ERROR", f"Legacy file not found: {filepath}") return self._log("INFO", f"Starting legacy import from {filepath}") count = 0 errors = 0 try: with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue parts = line.split('|') if len(parts) < 5: self._log("WARN", "Invalid legacy line format", line=line) errors += 1 continue # Construct event dict # Normalize timestamp to ISO-Z (Issue I6) roughly # Assuming legacy format is close enough or simply stored as string # Ideally, we should ensure it looks like a timestamp. ts = parts[4].strip() if not ts.endswith("Z") and "T" in ts: ts += "Z" # Basic fix attempt event = { "infohash": parts[0], "source": parts[1], "destination": parts[2], "type": parts[3], "timestamp": ts, "release_group": None, "files": [] } try: self.insert_event(event, origin='legacy') count += 1 except Exception as e: self._log("ERROR", f"Failed to insert legacy event: {e}", line=line) errors += 1 self._log("INFO", "Legacy import completed", imported=count, errors=errors) except Exception as e: self._log("CRITICAL", f"Legacy import crashed: {e}") raise