""" SYNOMAP V2 - MappingDB_v1 Module : mapping_store.py Rôle : Gère la base de données SQLite 'synomap.db' qui agit comme Source-of-Truth pour les mappings entre les torrents (Infohash) et le système de fichiers. Remplace l'ancien fichier mapping_entries.txt. Responsabilités : 1. Stocker les événements d'import bruts (Sonarr/Radarr). 2. Consolider l'état courant (Latest) par Infohash. 3. Diagnostiquer les incohérences (MULTI, MISSING, PARTIAL). 4. Fournir une API compatible avec le schéma A4 (Identity, Paths, Diagnostic). Auteur : IA Développeuse (d'après DEV_BRIEF MappingDB_v1) Version : 1.0.0 Python : 3.10+ """ import sqlite3 import json import logging import os from datetime import datetime from typing import Optional, Dict, List, Any # Configuration du logging par défaut (peut être surchargé par l'appelant) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("MappingStore") class MappingStore: """ Classe principale d'interaction avec la base de données de mapping Synomap. """ DB_SCHEMA = [ """ CREATE TABLE IF NOT EXISTS mapping_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, infohash TEXT NOT NULL, source TEXT NOT NULL, destination TEXT, type TEXT NOT NULL, timestamp TEXT NOT NULL, release_group TEXT, files_json TEXT, -- Stockage JSON de la liste des fichiers origin TEXT DEFAULT 'event' -- 'event' (api) ou 'legacy' (import) ); """, """ CREATE INDEX IF NOT EXISTS idx_events_infohash ON mapping_events(infohash); """, """ CREATE TABLE IF NOT EXISTS mapping_latest ( infohash TEXT PRIMARY KEY, source_path TEXT, dest_path TEXT, type TEXT, diagnostic_status TEXT NOT NULL, -- OK, MULTI, PARTIAL, MISSING, CORRUPT diagnostic_json TEXT, -- Détails complets du diagnostic au format JSON last_updated TEXT NOT NULL ); """ ] def __init__(self, db_path: str = "synomap.db"): """ Initialise la connexion à la DB et crée le schéma si nécessaire. """ self.db_path = db_path self._init_db() def _get_connection(self) -> sqlite3.Connection: """Crée une connexion avec gestion des lignes en mode dictionnaire.""" try: conn = sqlite3.connect(self.db_path, timeout=10.0) conn.row_factory = sqlite3.Row return conn except sqlite3.Error as e: logger.error(f"DB_CONNECTION_ERROR: {e}") raise def _init_db(self) -> None: """Crée les tables et index si la DB est nouvelle.""" conn = self._get_connection() cursor = conn.cursor() try: for statement in self.DB_SCHEMA: cursor.execute(statement) conn.commit() logger.debug(f"Database initialized at {self.db_path}") except sqlite3.Error as e: logger.critical(f"DB_INIT_FAILED: {e}") conn.rollback() raise finally: conn.close() def insert_event(self, event: Dict[str, Any], origin: str = 'event') -> None: """ Insère un événement brut (Sonarr/Radarr) et déclenche la consolidation. Args: event (dict): Doit contenir infohash, source, destination, type, timestamp. origin (str): 'event' par défaut, ou 'legacy' pour les imports. """ # 1. Validation basique required_keys = ["infohash", "source", "destination", "type", "timestamp"] for key in required_keys: if key not in event or not event[key]: logger.warning(f"INVALID_EVENT: Missing mandatory field '{key}' in {event}") return # On ignore l'événement invalide mais on ne crash pas infohash = event["infohash"].lower().strip() files_json = json.dumps(event.get("files", [])) conn = self._get_connection() cursor = conn.cursor() try: # 2. Insertion cursor.execute(""" INSERT INTO mapping_events (infohash, source, destination, type, timestamp, release_group, files_json, origin) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( infohash, event["source"], event["destination"], event["type"], event["timestamp"], event.get("release_group"), files_json, origin )) conn.commit() # 3. Consolidation immédiate (Trigger applicatif) self.rebuild_latest(infohash, conn) except sqlite3.Error as e: logger.error(f"DB_INSERT_ERROR: {e} | Event: {event}") conn.rollback() raise finally: conn.close() def list_events(self, infohash: str) -> List[Dict[str, Any]]: """Retourne la liste brute des événements pour un infohash donné (debug/analysis).""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "SELECT * FROM mapping_events WHERE infohash = ? ORDER BY timestamp ASC", (infohash.lower(),) ) rows = cursor.fetchall() events = [] for row in rows: evt = dict(row) # Parse JSON files back to list try: evt['files'] = json.loads(evt['files_json']) if evt['files_json'] else [] except json.JSONDecodeError: evt['files'] = [] del evt['files_json'] events.append(evt) return events finally: conn.close() def compute_diagnostic(self, events: List[Dict[str, Any]]) -> Dict[str, Any]: """ Analyse une liste d'événements pour déterminer l'état du mapping. Logique A4 State Minimal : - MISSING: Aucun événement. - MULTI: Plusieurs destinations distinctes trouvées. - PARTIAL: Destination vide ou None. - TYPE_CONFLICT: Conflit sur le champ 'type' (tv vs movie). - OK: Un seul chemin destination cohérent. """ if not events: return { "status": "MISSING", "detail": "No recorded events for this infohash", "candidates": [] } # Collecter les candidats uniques (Destination et Type) dest_candidates = set() type_candidates = set() valid_dest_events = [] for e in events: d = e.get("destination") t = e.get("type") if t: type_candidates.add(t) if d: # Normalisation simple pour éviter les faux positifs sur les slashes de fin d_norm = d.rstrip("/\\") dest_candidates.add(d_norm) valid_dest_events.append(e) # Analyse des conflits de TYPE if len(type_candidates) > 1: return { "status": "MULTI", "detail": f"TYPE_CONFLICT: Detected types {list(type_candidates)}", "candidates": list(dest_candidates) } # Analyse des destinations if len(dest_candidates) == 0: return { "status": "PARTIAL", "detail": "Events exist but contain no destination path", "candidates": [] } if len(dest_candidates) > 1: return { "status": "MULTI", "detail": "Multiple divergent destination paths detected", "candidates": list(dest_candidates) } # Si on arrive ici : 1 seul type, 1 seule destination unique return { "status": "OK", "detail": "Unique consolidated mapping", "candidates": list(dest_candidates) } def rebuild_latest(self, infohash: str, conn: Optional[sqlite3.Connection] = None) -> None: """ Recalcule l'état 'mapping_latest' pour un infohash. Peut utiliser une connexion existante (pour être dans la même transaction). """ should_close = False if conn is None: conn = self._get_connection() should_close = True try: # 1. Récupérer tous les événements via SQL direct (évite d'ouvrir une autre co) cursor = conn.cursor() cursor.execute( "SELECT * FROM mapping_events WHERE infohash = ? ORDER BY timestamp ASC", (infohash,) ) rows = cursor.fetchall() events = [dict(row) for row in rows] # Conversion simple pour compute_diagnostic # 2. Calculer le diagnostic diag = self.compute_diagnostic(events) # 3. Déterminer les valeurs consolidées source_path = None dest_path = None type_ = None # On prend le dernier événement qui a du sens pour remplir les champs if events: last_event = events[-1] source_path = last_event['source'] type_ = last_event['type'] # Pour la destination, si OK, on prend le candidat unique if diag['status'] == 'OK' and diag['candidates']: dest_path = diag['candidates'][0] elif diag['status'] == 'MULTI': # En cas de MULTI, on laisse dest_path NULL ou on met "AMBIGUOUS" ? # Spec: "dest_path stable". Si MULTI, pas de path stable garanti. dest_path = None # 4. Upsert dans mapping_latest cursor.execute(""" INSERT INTO mapping_latest (infohash, source_path, dest_path, type, diagnostic_status, diagnostic_json, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(infohash) DO UPDATE SET source_path=excluded.source_path, dest_path=excluded.dest_path, type=excluded.type, diagnostic_status=excluded.diagnostic_status, diagnostic_json=excluded.diagnostic_json, last_updated=excluded.last_updated """, ( infohash, source_path, dest_path, type_, diag['status'], json.dumps(diag), datetime.utcnow().isoformat() + "Z" )) # Commit seulement si c'est nous qui avons ouvert la connexion if should_close: conn.commit() except Exception as e: logger.error(f"REBUILD_ERROR for {infohash}: {e}") if should_close: conn.rollback() raise finally: if should_close: conn.close() def get_mapping(self, infohash: str) -> Optional[Dict[str, Any]]: """ Retourne l'objet mapping consolidé (format A4 partiel). Retourne None si l'infohash est inconnu. """ conn = self._get_connection() cursor = conn.cursor() try: cursor.execute("SELECT * FROM mapping_latest WHERE infohash = ?", (infohash.lower(),)) row = cursor.fetchone() if not row: return None data = dict(row) # Reconstruction de la structure de sortie spécifiée # Récupération des raw events pour enrichir la sortie (optionnel mais demandé dans Spec 5) raw_events = self.list_events(infohash) return { "infohash": data["infohash"], "source_path": data["source_path"], "dest_path": data["dest_path"], "type": data["type"], "events": raw_events, "diagnostic": json.loads(data["diagnostic_json"]) if data["diagnostic_json"] else {} } finally: conn.close() def import_legacy_file(self, filepath: str) -> None: """ Importe un fichier texte legacy (mapping_entries.txt). Format attendu par ligne : INFOHASH|SRC_PATH|DEST_PATH|TYPE|TIMESTAMP (Note : Gère les erreurs de format gracieusement) """ if not os.path.exists(filepath): logger.error(f"LEGACY_IMPORT: File not found {filepath}") return logger.info(f"Starting legacy import from {filepath}") count_ok = 0 count_err = 0 with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue parts = line.split('|') # Robustesse : Tentative de parsing selon spécification # Spec: |||| if len(parts) >= 5: event = { "infohash": parts[0], "source": parts[1], "destination": parts[2], "type": parts[3], "timestamp": parts[4], "release_group": None, "files": [] } try: # Vérification doublon inutile car insert_event empile l'historique. # Cependant, pour ne pas polluer si on lance l'import 2 fois, # on pourrait vérifier l'existence. Ici on suit la spec: "Migration idempotente". # Idempotence simple : Si un event Legacy identique existe déjà, on l'ignore ? # Pour simplifier et garantir la perf, on insère. # Le rebuild_latest consolidera. self.insert_event(event, origin='legacy') count_ok += 1 except Exception as e: logger.warning(f"LEGACY_FAIL: Error inserting line '{line}': {e}") count_err += 1 else: logger.warning(f"LEGACY_INVALID_FORMAT: Line skipped (expected 5 parts): {line}") count_err += 1 logger.info(f"Legacy import finished. OK={count_ok}, ERR={count_err}") # --- Block de démonstration / Test manuel --- if __name__ == "__main__": # Script de démo simple import shutil # Clean old db for test if os.path.exists("synomap_test.db"): os.remove("synomap_test.db") print("--- DEMO MappingDB_v1 ---") store = MappingStore("synomap_test.db") # 1. Insertion Event Nominal hash1 = "abc1234567890abcdef1234567890abcdef12" print(f"\n1. Inserting nominal event for {hash1}") store.insert_event({ "infohash": hash1, "source": "/mnt/dl/MyMovie", "destination": "/mnt/media/Movies/MyMovie (2025)", "type": "movie", "timestamp": datetime.utcnow().isoformat(), "release_group": "TEST" }) mapping = store.get_mapping(hash1) print(f" -> Result: Status={mapping['diagnostic']['status']}, Dest={mapping['dest_path']}") # 2. Cas Multi (Conflict) print(f"\n2. Inserting conflicting event for {hash1}") store.insert_event({ "infohash": hash1, "source": "/mnt/dl/MyMovie", "destination": "/mnt/media/Movies/MyMovie_REPACK (2025)", # Chemin différent "type": "movie", "timestamp": datetime.utcnow().isoformat() }) mapping = store.get_mapping(hash1) print(f" -> Result: Status={mapping['diagnostic']['status']}") print(f" -> Candidates: {mapping['diagnostic']['candidates']}") # 3. Cas Missing hash_missing = "deadbeef000000000000000000000000000000" print(f"\n3. Querying missing hash {hash_missing}") res = store.get_mapping(hash_missing) print(f" -> Result: {res}") # 4. Simulation Import Legacy # Création d'un fichier dummy with open("dummy_legacy.txt", "w") as f: f.write(f"legacyhash123|/src/leg|/dst/leg|tv|2020-01-01T12:00:00\n") f.write(f"legacyhash123|/src/leg|/dst/leg_v2|tv|2020-01-02T12:00:00\n") # Créera un MULTI print(f"\n4. Importing Legacy file") store.import_legacy_file("dummy_legacy.txt") m_leg = store.get_mapping("legacyhash123") print(f" -> Legacy Hash Status: {m_leg['diagnostic']['status']}") # Nettoyage if os.path.exists("dummy_legacy.txt"): os.remove("dummy_legacy.txt") if os.path.exists("synomap_test.db"): os.remove("synomap_test.db") print("\n--- END DEMO ---")