#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import datetime as dt import json import os import re import sqlite3 import telnetlib import time from dataclasses import dataclass from typing import Dict, List, Optional, Tuple PROMPT_RE = re.compile(rb".*> ?$") # Prompt endet mit '>' NUM_RE = re.compile(r"([-+]?\d+(\.\d+)?)") @dataclass class DeviceCfg: name: str host: str port: int enabled: bool = True username: Optional[str] = None password: Optional[str] = None def utc_now_iso() -> str: return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat() def load_json(path: str) -> dict: with open(path, "r", encoding="utf-8") as f: return json.load(f) def ensure_dir(path: str) -> None: d = os.path.dirname(path) if d: os.makedirs(d, exist_ok=True) def db_connect(db_path: str) -> sqlite3.Connection: ensure_dir(db_path) con = sqlite3.connect(db_path) con.execute("PRAGMA journal_mode=WAL;") con.execute("PRAGMA synchronous=NORMAL;") return con def db_init(con: sqlite3.Connection) -> None: con.executescript(""" CREATE TABLE IF NOT EXISTS device ( id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, host TEXT NOT NULL, port INTEGER NOT NULL, created_at TEXT NOT NULL, last_seen_at TEXT ); CREATE TABLE IF NOT EXISTS outlet ( id INTEGER PRIMARY KEY, device_id INTEGER NOT NULL, outlet_name TEXT NOT NULL, cost_code TEXT NOT NULL, cost_name TEXT NOT NULL, created_at TEXT NOT NULL, last_seen_at TEXT, UNIQUE(device_id, outlet_name), FOREIGN KEY(device_id) REFERENCES device(id) ); CREATE TABLE IF NOT EXISTS poll_run ( id INTEGER PRIMARY KEY, device_id INTEGER NOT NULL, started_at TEXT NOT NULL, finished_at TEXT, ok INTEGER NOT NULL DEFAULT 0, error TEXT, outlets_received INTEGER NOT NULL DEFAULT 0, outlets_filled INTEGER NOT NULL DEFAULT 0, fields_filled INTEGER NOT NULL DEFAULT 0, duration_ms INTEGER NOT NULL DEFAULT 0, FOREIGN KEY(device_id) REFERENCES device(id) ); CREATE TABLE IF NOT EXISTS reading ( id INTEGER PRIMARY KEY, poll_run_id INTEGER NOT NULL, device_id INTEGER NOT NULL, outlet_id INTEGER NOT NULL, ts TEXT NOT NULL, -- UTC ISO current_a REAL, peak_a REAL, voltage_v REAL, power_w REAL, va REAL, state TEXT, filled INTEGER NOT NULL DEFAULT 0, -- 1 wenn (teilweise/komplett) aus letztem Messwert gefüllt filled_fields INTEGER NOT NULL DEFAULT 0, -- Anzahl gefüllter Felder FOREIGN KEY(poll_run_id) REFERENCES poll_run(id), FOREIGN KEY(device_id) REFERENCES device(id), FOREIGN KEY(outlet_id) REFERENCES outlet(id) ); CREATE INDEX IF NOT EXISTS idx_reading_ts ON reading(ts); CREATE INDEX IF NOT EXISTS idx_reading_outlet_ts ON reading(outlet_id, ts); """) con.commit() def get_or_create_device(con: sqlite3.Connection, d: DeviceCfg) -> int: now = utc_now_iso() cur = con.execute("SELECT id FROM device WHERE name=?", (d.name,)) row = cur.fetchone() if row: device_id = row[0] con.execute("UPDATE device SET host=?, port=?, last_seen_at=? WHERE id=?", (d.host, d.port, now, device_id)) con.commit() return device_id cur = con.execute( "INSERT INTO device(name,host,port,created_at,last_seen_at) VALUES(?,?,?,?,?)", (d.name, d.host, d.port, now, None) ) con.commit() return int(cur.lastrowid) def cost_center_for(outlet_name: str, cc_map: Dict[str, str]) -> Tuple[str, str]: outlet_name = outlet_name.strip() code = outlet_name[:1].upper() if outlet_name else "_" name = cc_map.get(code) or cc_map.get("_default", "Unbekannt") return code, name def telnet_fetch_ostatus(dev: DeviceCfg, cmd: str, connect_timeout: int, read_timeout: int) -> bytes: """ Öffnet Telnet, wartet auf Prompt (endet mit '>'), sendet cmd, liest bis Prompt. Unterstützt optional simple Login-Prompts (best effort). """ tn = telnetlib.Telnet() tn.open(dev.host, dev.port, timeout=connect_timeout) # Erstes Prompt / Banner buf = tn.read_until(b">", timeout=read_timeout) # Best-effort Login, falls vorhanden # (Viele Geräte zeigen "login:" / "Password:" - nicht garantiert) if dev.username: if b"login" in buf.lower() or b"user" in buf.lower(): tn.write(dev.username.encode("utf-8") + b"\n") buf += tn.read_until(b":", timeout=read_timeout) if dev.password and (b"password" in buf.lower()): tn.write(dev.password.encode("utf-8") + b"\n") tn.read_until(b">", timeout=read_timeout) # Kommando senden tn.write(cmd.encode("utf-8") + b"\n") # Lesen bis nächstes Prompt out = tn.read_until(b">", timeout=read_timeout) tn.close() return out def parse_ostatus(text: str) -> List[dict]: """ Parst ostatus Tabellenzeilen: | | 0.0 A | 0.1 A | 230.2 V | 21 W | 26 VA | On | Rückgabe: Liste dicts """ lines = text.splitlines() rows = [] for ln in lines: if "|" not in ln: continue # Tabellenheader / Trenner wegfiltern if "Outlet" in ln and "True RMS" in ln: continue if set(ln.strip()) <= set("-| "): continue parts = [p.strip() for p in ln.split("|")] # typisches Format hat leere Ränder: ['', name, cur, peak, volt, power, va, state, ''] parts = [p for p in parts if p != ""] if len(parts) < 7: continue outlet_name = parts[0] cur_s = parts[1] peak_s = parts[2] volt_s = parts[3] power_s = parts[4] va_s = parts[5] state = parts[6] def parse_num(s: str) -> Optional[float]: m = NUM_RE.search(s.replace(",", ".")) return float(m.group(1)) if m else None rows.append({ "outlet_name": outlet_name, "current_a": parse_num(cur_s), "peak_a": parse_num(peak_s), "voltage_v": parse_num(volt_s), "power_w": parse_num(power_s), "va": parse_num(va_s), "state": state }) return rows def last_reading_for_outlet(con: sqlite3.Connection, outlet_id: int) -> Optional[dict]: cur = con.execute(""" SELECT current_a, peak_a, voltage_v, power_w, va, state FROM reading WHERE outlet_id=? ORDER BY ts DESC LIMIT 1 """, (outlet_id,)) row = cur.fetchone() if not row: return None return { "current_a": row[0], "peak_a": row[1], "voltage_v": row[2], "power_w": row[3], "va": row[4], "state": row[5] } def get_or_create_outlet(con: sqlite3.Connection, device_id: int, outlet_name: str, cost_code: str, cost_name: str) -> int: now = utc_now_iso() cur = con.execute(""" SELECT id FROM outlet WHERE device_id=? AND outlet_name=? """, (device_id, outlet_name)) row = cur.fetchone() if row: outlet_id = row[0] con.execute(""" UPDATE outlet SET cost_code=?, cost_name=?, last_seen_at=? WHERE id=? """, (cost_code, cost_name, now, outlet_id)) con.commit() return int(outlet_id) cur = con.execute(""" INSERT INTO outlet(device_id,outlet_name,cost_code,cost_name,created_at,last_seen_at) VALUES(?,?,?,?,?,?) """, (device_id, outlet_name, cost_code, cost_name, now, now)) con.commit() return int(cur.lastrowid) def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]: """ Füllt fehlende Werte (None) aus last. Returns: (new_row, filled_flag, filled_fields_count) """ if not last: return row, 0, 0 filled_fields = 0 for k in ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"]: if row.get(k) is None and last.get(k) is not None: row[k] = last[k] filled_fields += 1 filled_flag = 1 if filled_fields > 0 else 0 return row, filled_flag, filled_fields def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str], cmd: str, connect_timeout: int, read_timeout: int) -> None: device_id = get_or_create_device(con, dev) started = utc_now_iso() t0 = time.time() run_id = con.execute(""" INSERT INTO poll_run(device_id, started_at, ok) VALUES(?,?,0) """, (device_id, started)).lastrowid con.commit() outlets_received = 0 outlets_filled = 0 fields_filled = 0 try: raw = telnet_fetch_ostatus(dev, cmd, connect_timeout, read_timeout) text = raw.decode("utf-8", errors="replace") rows = parse_ostatus(text) ts = utc_now_iso() outlets_received = len(rows) for r in rows: outlet_name = r["outlet_name"].strip() cost_code, cost_name = cost_center_for(outlet_name, cc_map) outlet_id = get_or_create_outlet(con, device_id, outlet_name, cost_code, cost_name) last = last_reading_for_outlet(con, outlet_id) r2, filled_flag, filled_fields_cnt = apply_fill(r, last) if filled_flag: outlets_filled += 1 fields_filled += filled_fields_cnt con.execute(""" INSERT INTO reading( poll_run_id, device_id, outlet_id, ts, current_a, peak_a, voltage_v, power_w, va, state, filled, filled_fields ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?) """, ( run_id, device_id, outlet_id, ts, r2.get("current_a"), r2.get("peak_a"), r2.get("voltage_v"), r2.get("power_w"), r2.get("va"), r2.get("state"), filled_flag, filled_fields_cnt )) duration_ms = int((time.time() - t0) * 1000) con.execute(""" UPDATE poll_run SET finished_at=?, ok=1, error=NULL, outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=? WHERE id=? """, (utc_now_iso(), outlets_received, outlets_filled, fields_filled, duration_ms, run_id)) con.commit() except Exception as e: duration_ms = int((time.time() - t0) * 1000) con.execute(""" UPDATE poll_run SET finished_at=?, ok=0, error=?, outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=? WHERE id=? """, (utc_now_iso(), str(e), outlets_received, outlets_filled, fields_filled, duration_ms, run_id)) con.commit() raise def parse_period_args(args) -> Tuple[str, str]: """ Liefert (from_iso, to_iso) in UTC. """ now = dt.datetime.now(dt.timezone.utc).replace(microsecond=0) if args.last_days is not None: start = now - dt.timedelta(days=args.last_days) return start.isoformat(), now.isoformat() if args.from_iso and args.to_iso: return args.from_iso, args.to_iso # vordefinierte Perioden if args.period == "weekly": start = now - dt.timedelta(days=7) elif args.period == "monthly": start = now - dt.timedelta(days=30) elif args.period == "yearly": start = now - dt.timedelta(days=365) else: start = now - dt.timedelta(days=7) return start.isoformat(), now.isoformat() def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Optional[str]) -> None: # Energie grob aus average power * 1h (weil stündlich) -> Wh je Messpunkt # Wenn du mal nicht exakt stündlich bist, kann man über delta(ts) verbessern; fürs erste: 1 Messung = 1 Stunde. params = [from_iso, to_iso] dev_filter = "" if device_name: dev_filter = "AND d.name = ?" params.append(device_name) # Kostenstellen-Übersicht q = f""" SELECT d.name AS device, o.cost_code, o.cost_name, COUNT(*) AS samples, SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS samples_with_fill, ROUND(AVG(r.power_w), 2) AS avg_power_w, ROUND(SUM(COALESCE(r.power_w,0.0)), 2) AS sum_power_w, ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh FROM reading r JOIN outlet o ON o.id = r.outlet_id JOIN device d ON d.id = r.device_id WHERE r.ts >= ? AND r.ts <= ? {dev_filter} GROUP BY d.name, o.cost_code, o.cost_name ORDER BY d.name, o.cost_code """ cur = con.execute(q, params) rows = cur.fetchall() print(f"\nReport UTC: {from_iso} .. {to_iso}") if device_name: print(f"Device: {device_name}") print("\nKostenstellen:") print("device | code | name | samples | filled | avg_W | approx_Wh") for device, code, name, samples, filled, avg_w, sum_w, wh in rows: print(f"{device} | {code} | {name} | {samples} | {filled} | {avg_w} | {wh}") # Fehler-/Qualitätsstatistik aus poll_run q2 = f""" SELECT d.name, COUNT(*) AS runs, SUM(CASE WHEN pr.ok=1 THEN 1 ELSE 0 END) AS ok_runs, SUM(CASE WHEN pr.ok=0 THEN 1 ELSE 0 END) AS failed_runs, SUM(pr.outlets_filled) AS outlets_filled_total, SUM(pr.fields_filled) AS fields_filled_total, ROUND(AVG(pr.duration_ms), 1) AS avg_duration_ms FROM poll_run pr JOIN device d ON d.id = pr.device_id WHERE pr.started_at >= ? AND pr.started_at <= ? {dev_filter} GROUP BY d.name ORDER BY d.name """ cur = con.execute(q2, params) rows = cur.fetchall() print("\nJob-Statistik:") print("device | runs | ok | failed | outlets_filled | fields_filled | avg_ms") for r in rows: print(" | ".join(str(x) for x in r)) print() def main(): ap = argparse.ArgumentParser(description="MMP (BayTech) ostatus Logger -> SQLite") ap.add_argument("--config", required=True, help="Pfad zur config.json") sub = ap.add_subparsers(dest="cmd", required=True) sp_poll = sub.add_parser("poll", help="Pollt alle enabled Devices und schreibt Messwerte") sp_poll.add_argument("--device", default=None, help="Optional: nur ein Device-Name") sp_rep = sub.add_parser("report", help="Auswertung (weekly/monthly/yearly oder von-bis / last-days)") sp_rep.add_argument("--device", default=None, help="Optional: nur ein Device-Name") sp_rep.add_argument("--period", choices=["weekly","monthly","yearly"], default="weekly") sp_rep.add_argument("--last-days", type=int, default=None, help="Letzte N Tage") sp_rep.add_argument("--from-iso", default=None, help="UTC ISO, z.B. 2026-02-01T00:00:00+00:00") sp_rep.add_argument("--to-iso", default=None, help="UTC ISO, z.B. 2026-02-08T00:00:00+00:00") args = ap.parse_args() cfg = load_json(args.config) cc_map = load_json(cfg["cost_center_map"]) db_path = cfg["db_path"] con = db_connect(db_path) db_init(con) tel = cfg.get("telnet", {}) read_timeout = int(tel.get("read_timeout_sec", 25)) connect_timeout = int(tel.get("connect_timeout_sec", 10)) cmd = tel.get("command", "ostatus") devices = [] for d in cfg.get("devices", []): devices.append(DeviceCfg( name=d["name"], host=d["host"], port=int(d.get("port", 20108)), enabled=bool(d.get("enabled", True)), username=d.get("username"), password=d.get("password"), )) if args.cmd == "poll": for d in devices: if not d.enabled: continue if args.device and d.name != args.device: continue poll_device(con, d, cc_map, cmd, connect_timeout, read_timeout) print("OK") elif args.cmd == "report": from_iso, to_iso = parse_period_args(args) report(con, from_iso, to_iso, args.device) if __name__ == "__main__": main()