#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ sonarr_fallback.py — v5.0 stable Analyse les .torrent, stocke en SQLite, construit le JSON /api/v3/release/push """ import argparse, datetime as dt, hashlib, io, json, os, re, sqlite3, sys from typing import Any, Dict ISO = "%Y-%m-%dT%H:%M:%S" def log(msg: str): if os.getenv("SONARR_DEBUG"): print(f"[DBG] {msg}", file=sys.stderr) # ---------- Mini bdecode ---------- def bdecode(data: bytes): s=io.BytesIO(data); return _dec(s) def _dec(s): c=s.read(1) if not c: raise ValueError("EOF") if c==b'i': num=b''; while True: b=s.read(1) if b==b'e': return int(num) num+=b if c==b'l': lst=[] while True: p=s.read(1) if p==b'e': return lst s.seek(-1,1); lst.append(_dec(s)) if c==b'd': d={}; while True: p=s.read(1) if p==b'e': return d s.seek(-1,1) k=_dec(s) d[k]=_dec(s) if c.isdigit(): l=c while True: b=s.read(1) if b==b':': break l+=b ln=int(l); return s.read(ln) raise ValueError("Invalid") # ---------- Helpers ---------- def compute_infohash(path:str)->tuple[str,str]: with open(path,"rb") as f: data=f.read() d=bdecode(data) info=d.get(b"info") raw=bencode(info) h=hashlib.sha1(raw).hexdigest() name=info.get(b"name",b"").decode(errors="ignore") return h,name def bencode(x): if isinstance(x,int): return b"i"+str(x).encode()+b"e" if isinstance(x,bytes): return str(len(x)).encode()+b":"+x if isinstance(x,str): return bencode(x.encode()) if isinstance(x,list): return b"l"+b"".join(bencode(i) for i in x)+b"e" if isinstance(x,dict): out=[] for k in sorted(x.keys()): kb=k if isinstance(k,bytes) else str(k).encode() out.append(bencode(kb)+bencode(x[k])) return b"d"+b"".join(out)+b"e" raise TypeError RE_Sxe=re.compile(r'(?i)\bS(?P\d{1,2})E(?P\d{1,3})\b') RE_RES=re.compile(r'(?i)\b(2160p|1080p|720p|480p)\b') def extract_meta(name:str)->Dict[str,Any]: s=e=None; m=RE_Sxe.search(name) if m: s,e=int(m.group('s')),int(m.group('e')) res=(RE_RES.search(name) or [None]) res=res.group(1) if hasattr(res,"group") else None return {"season":s,"episode":e,"resolution":res} # ---------- SQLite ---------- DDL=""" CREATE TABLE IF NOT EXISTS torrents( id INTEGER PRIMARY KEY, infohash TEXT UNIQUE, name TEXT, release_name TEXT, season INTEGER, episode INTEGER, resolution TEXT, size_bytes INTEGER, indexer TEXT, download_url TEXT, first_seen TEXT, last_seen TEXT ); """ def db_init(db): c=sqlite3.connect(db); c.executescript(DDL); c.commit(); return c # ---------- Build JSON ---------- def build_payload(r:Dict[str,Any])->Dict[str,Any]: pub=dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") guid=f"torrent:{r['infohash']}" return { "title":r["release_name"], "downloadUrl":r["download_url"], "protocol":"torrent", "indexer":r["indexer"], "size":r.get("size_bytes",0) or 0, "publishDate":pub, "guid":guid } # ---------- Main ---------- def main(): p=argparse.ArgumentParser() p.add_argument("--db",required=True) p.add_argument("--torrent-file",default="") p.add_argument("--torrent-url",default="") p.add_argument("--release-name",default="") p.add_argument("--indexer",default="autobrr") p.add_argument("--size",default="") p.add_argument("--info-url",default="") args=p.parse_args() db=db_init(args.db) infohash=name=None if args.torrent_file and os.path.exists(args.torrent_file): try: infohash,name=compute_infohash(args.torrent_file) log(f"Parsed torrent {args.torrent_file} -> {infohash}") except Exception as e: log(f"Parse error: {e}") rel=args.release_name or name or "unknown" meta=extract_meta(rel) size=int(args.size) if str(args.size).isdigit() else 0 base=os.getenv("AUTOBRR_WEB_URL","http://localhost") cat="sonarr" fname=os.path.basename(args.torrent_file or rel)+".torrent" durl=f"{base}/torrents/{cat}/{fname}" row={ "infohash":infohash or "", "release_name":rel, "indexer":args.indexer, "size_bytes":size, "download_url":durl, "season":meta["season"], "episode":meta["episode"], "resolution":meta["resolution"] } db.execute("""INSERT OR REPLACE INTO torrents (infohash,release_name,season,episode,resolution,size_bytes,indexer,download_url,first_seen,last_seen) VALUES(?,?,?,?,?,?,?,?,?,?)""", (row["infohash"],row["release_name"],row["season"],row["episode"],row["resolution"], row["size_bytes"],row["indexer"],row["download_url"], dt.datetime.utcnow().strftime(ISO),dt.datetime.utcnow().strftime(ISO))) db.commit() print(json.dumps(build_payload(row),ensure_ascii=False)) if __name__=="__main__": main()