diff --git a/mmp_logger.py b/mmp_logger.py new file mode 100644 index 0000000..526e262 --- /dev/null +++ b/mmp_logger.py @@ -0,0 +1,473 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import argparse +import datetime as dt +import json +import os +import re +import sqlite3 +import telnetlib +import time +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple + +PROMPT_RE = re.compile(rb".*> ?$") # Prompt endet mit '>' +NUM_RE = re.compile(r"([-+]?\d+(\.\d+)?)") + +@dataclass +class DeviceCfg: + name: str + host: str + port: int + enabled: bool = True + username: Optional[str] = None + password: Optional[str] = None + +def utc_now_iso() -> str: + return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat() + +def load_json(path: str) -> dict: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + +def ensure_dir(path: str) -> None: + d = os.path.dirname(path) + if d: + os.makedirs(d, exist_ok=True) + +def db_connect(db_path: str) -> sqlite3.Connection: + ensure_dir(db_path) + con = sqlite3.connect(db_path) + con.execute("PRAGMA journal_mode=WAL;") + con.execute("PRAGMA synchronous=NORMAL;") + return con + +def db_init(con: sqlite3.Connection) -> None: + con.executescript(""" + CREATE TABLE IF NOT EXISTS device ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + host TEXT NOT NULL, + port INTEGER NOT NULL, + created_at TEXT NOT NULL, + last_seen_at TEXT + ); + + CREATE TABLE IF NOT EXISTS outlet ( + id INTEGER PRIMARY KEY, + device_id INTEGER NOT NULL, + outlet_name TEXT NOT NULL, + cost_code TEXT NOT NULL, + cost_name TEXT NOT NULL, + created_at TEXT NOT NULL, + last_seen_at TEXT, + UNIQUE(device_id, outlet_name), + FOREIGN KEY(device_id) REFERENCES device(id) + ); + + CREATE TABLE IF NOT EXISTS poll_run ( + id INTEGER PRIMARY KEY, + device_id INTEGER NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT, + ok INTEGER NOT NULL DEFAULT 0, + error TEXT, + outlets_received INTEGER NOT NULL DEFAULT 0, + outlets_filled INTEGER NOT NULL DEFAULT 0, + fields_filled INTEGER NOT NULL DEFAULT 0, + duration_ms INTEGER NOT NULL DEFAULT 0, + FOREIGN KEY(device_id) REFERENCES device(id) + ); + + CREATE TABLE IF NOT EXISTS reading ( + id INTEGER PRIMARY KEY, + poll_run_id INTEGER NOT NULL, + device_id INTEGER NOT NULL, + outlet_id INTEGER NOT NULL, + ts TEXT NOT NULL, -- UTC ISO + current_a REAL, + peak_a REAL, + voltage_v REAL, + power_w REAL, + va REAL, + state TEXT, + filled INTEGER NOT NULL DEFAULT 0, -- 1 wenn (teilweise/komplett) aus letztem Messwert gefüllt + filled_fields INTEGER NOT NULL DEFAULT 0, -- Anzahl gefüllter Felder + FOREIGN KEY(poll_run_id) REFERENCES poll_run(id), + FOREIGN KEY(device_id) REFERENCES device(id), + FOREIGN KEY(outlet_id) REFERENCES outlet(id) + ); + + CREATE INDEX IF NOT EXISTS idx_reading_ts ON reading(ts); + CREATE INDEX IF NOT EXISTS idx_reading_outlet_ts ON reading(outlet_id, ts); + """) + con.commit() + +def get_or_create_device(con: sqlite3.Connection, d: DeviceCfg) -> int: + now = utc_now_iso() + cur = con.execute("SELECT id FROM device WHERE name=?", (d.name,)) + row = cur.fetchone() + if row: + device_id = row[0] + con.execute("UPDATE device SET host=?, port=?, last_seen_at=? WHERE id=?", + (d.host, d.port, now, device_id)) + con.commit() + return device_id + cur = con.execute( + "INSERT INTO device(name,host,port,created_at,last_seen_at) VALUES(?,?,?,?,?)", + (d.name, d.host, d.port, now, None) + ) + con.commit() + return int(cur.lastrowid) + +def cost_center_for(outlet_name: str, cc_map: Dict[str, str]) -> Tuple[str, str]: + outlet_name = outlet_name.strip() + code = outlet_name[:1].upper() if outlet_name else "_" + name = cc_map.get(code) or cc_map.get("_default", "Unbekannt") + return code, name + +def telnet_fetch_ostatus(dev: DeviceCfg, cmd: str, connect_timeout: int, read_timeout: int) -> bytes: + """ + Öffnet Telnet, wartet auf Prompt (endet mit '>'), sendet cmd, liest bis Prompt. + Unterstützt optional simple Login-Prompts (best effort). + """ + tn = telnetlib.Telnet() + tn.open(dev.host, dev.port, timeout=connect_timeout) + + # Erstes Prompt / Banner + buf = tn.read_until(b">", timeout=read_timeout) + + # Best-effort Login, falls vorhanden + # (Viele Geräte zeigen "login:" / "Password:" - nicht garantiert) + if dev.username: + if b"login" in buf.lower() or b"user" in buf.lower(): + tn.write(dev.username.encode("utf-8") + b"\n") + buf += tn.read_until(b":", timeout=read_timeout) + if dev.password and (b"password" in buf.lower()): + tn.write(dev.password.encode("utf-8") + b"\n") + tn.read_until(b">", timeout=read_timeout) + + # Kommando senden + tn.write(cmd.encode("utf-8") + b"\n") + + # Lesen bis nächstes Prompt + out = tn.read_until(b">", timeout=read_timeout) + tn.close() + return out + +def parse_ostatus(text: str) -> List[dict]: + """ + Parst ostatus Tabellenzeilen: + | | 0.0 A | 0.1 A | 230.2 V | 21 W | 26 VA | On | + Rückgabe: Liste dicts + """ + lines = text.splitlines() + rows = [] + for ln in lines: + if "|" not in ln: + continue + # Tabellenheader / Trenner wegfiltern + if "Outlet" in ln and "True RMS" in ln: + continue + if set(ln.strip()) <= set("-| "): + continue + + parts = [p.strip() for p in ln.split("|")] + # typisches Format hat leere Ränder: ['', name, cur, peak, volt, power, va, state, ''] + parts = [p for p in parts if p != ""] + if len(parts) < 7: + continue + + outlet_name = parts[0] + cur_s = parts[1] + peak_s = parts[2] + volt_s = parts[3] + power_s = parts[4] + va_s = parts[5] + state = parts[6] + + def parse_num(s: str) -> Optional[float]: + m = NUM_RE.search(s.replace(",", ".")) + return float(m.group(1)) if m else None + + rows.append({ + "outlet_name": outlet_name, + "current_a": parse_num(cur_s), + "peak_a": parse_num(peak_s), + "voltage_v": parse_num(volt_s), + "power_w": parse_num(power_s), + "va": parse_num(va_s), + "state": state + }) + return rows + +def last_reading_for_outlet(con: sqlite3.Connection, outlet_id: int) -> Optional[dict]: + cur = con.execute(""" + SELECT current_a, peak_a, voltage_v, power_w, va, state + FROM reading + WHERE outlet_id=? + ORDER BY ts DESC + LIMIT 1 + """, (outlet_id,)) + row = cur.fetchone() + if not row: + return None + return { + "current_a": row[0], + "peak_a": row[1], + "voltage_v": row[2], + "power_w": row[3], + "va": row[4], + "state": row[5] + } + +def get_or_create_outlet(con: sqlite3.Connection, device_id: int, outlet_name: str, + cost_code: str, cost_name: str) -> int: + now = utc_now_iso() + cur = con.execute(""" + SELECT id FROM outlet WHERE device_id=? AND outlet_name=? + """, (device_id, outlet_name)) + row = cur.fetchone() + if row: + outlet_id = row[0] + con.execute(""" + UPDATE outlet + SET cost_code=?, cost_name=?, last_seen_at=? + WHERE id=? + """, (cost_code, cost_name, now, outlet_id)) + con.commit() + return int(outlet_id) + + cur = con.execute(""" + INSERT INTO outlet(device_id,outlet_name,cost_code,cost_name,created_at,last_seen_at) + VALUES(?,?,?,?,?,?) + """, (device_id, outlet_name, cost_code, cost_name, now, now)) + con.commit() + return int(cur.lastrowid) + +def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]: + """ + Füllt fehlende Werte (None) aus last. + Returns: (new_row, filled_flag, filled_fields_count) + """ + if not last: + return row, 0, 0 + filled_fields = 0 + for k in ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"]: + if row.get(k) is None and last.get(k) is not None: + row[k] = last[k] + filled_fields += 1 + filled_flag = 1 if filled_fields > 0 else 0 + return row, filled_flag, filled_fields + +def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str], + cmd: str, connect_timeout: int, read_timeout: int) -> None: + device_id = get_or_create_device(con, dev) + started = utc_now_iso() + t0 = time.time() + run_id = con.execute(""" + INSERT INTO poll_run(device_id, started_at, ok) + VALUES(?,?,0) + """, (device_id, started)).lastrowid + con.commit() + + outlets_received = 0 + outlets_filled = 0 + fields_filled = 0 + + try: + raw = telnet_fetch_ostatus(dev, cmd, connect_timeout, read_timeout) + text = raw.decode("utf-8", errors="replace") + rows = parse_ostatus(text) + ts = utc_now_iso() + + outlets_received = len(rows) + + for r in rows: + outlet_name = r["outlet_name"].strip() + cost_code, cost_name = cost_center_for(outlet_name, cc_map) + outlet_id = get_or_create_outlet(con, device_id, outlet_name, cost_code, cost_name) + + last = last_reading_for_outlet(con, outlet_id) + r2, filled_flag, filled_fields_cnt = apply_fill(r, last) + + if filled_flag: + outlets_filled += 1 + fields_filled += filled_fields_cnt + + con.execute(""" + INSERT INTO reading( + poll_run_id, device_id, outlet_id, ts, + current_a, peak_a, voltage_v, power_w, va, state, + filled, filled_fields + ) + VALUES(?,?,?,?,?,?,?,?,?,?,?,?) + """, ( + run_id, device_id, outlet_id, ts, + r2.get("current_a"), r2.get("peak_a"), r2.get("voltage_v"), + r2.get("power_w"), r2.get("va"), r2.get("state"), + filled_flag, filled_fields_cnt + )) + + duration_ms = int((time.time() - t0) * 1000) + con.execute(""" + UPDATE poll_run + SET finished_at=?, ok=1, error=NULL, + outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=? + WHERE id=? + """, (utc_now_iso(), outlets_received, outlets_filled, fields_filled, duration_ms, run_id)) + con.commit() + + except Exception as e: + duration_ms = int((time.time() - t0) * 1000) + con.execute(""" + UPDATE poll_run + SET finished_at=?, ok=0, error=?, outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=? + WHERE id=? + """, (utc_now_iso(), str(e), outlets_received, outlets_filled, fields_filled, duration_ms, run_id)) + con.commit() + raise + +def parse_period_args(args) -> Tuple[str, str]: + """ + Liefert (from_iso, to_iso) in UTC. + """ + now = dt.datetime.now(dt.timezone.utc).replace(microsecond=0) + if args.last_days is not None: + start = now - dt.timedelta(days=args.last_days) + return start.isoformat(), now.isoformat() + + if args.from_iso and args.to_iso: + return args.from_iso, args.to_iso + + # vordefinierte Perioden + if args.period == "weekly": + start = now - dt.timedelta(days=7) + elif args.period == "monthly": + start = now - dt.timedelta(days=30) + elif args.period == "yearly": + start = now - dt.timedelta(days=365) + else: + start = now - dt.timedelta(days=7) + return start.isoformat(), now.isoformat() + +def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Optional[str]) -> None: + # Energie grob aus average power * 1h (weil stündlich) -> Wh je Messpunkt + # Wenn du mal nicht exakt stündlich bist, kann man über delta(ts) verbessern; fürs erste: 1 Messung = 1 Stunde. + params = [from_iso, to_iso] + dev_filter = "" + if device_name: + dev_filter = "AND d.name = ?" + params.append(device_name) + + # Kostenstellen-Übersicht + q = f""" + SELECT + d.name AS device, + o.cost_code, + o.cost_name, + COUNT(*) AS samples, + SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS samples_with_fill, + ROUND(AVG(r.power_w), 2) AS avg_power_w, + ROUND(SUM(COALESCE(r.power_w,0.0)), 2) AS sum_power_w, + ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh + FROM reading r + JOIN outlet o ON o.id = r.outlet_id + JOIN device d ON d.id = r.device_id + WHERE r.ts >= ? AND r.ts <= ? + {dev_filter} + GROUP BY d.name, o.cost_code, o.cost_name + ORDER BY d.name, o.cost_code + """ + cur = con.execute(q, params) + rows = cur.fetchall() + + print(f"\nReport UTC: {from_iso} .. {to_iso}") + if device_name: + print(f"Device: {device_name}") + print("\nKostenstellen:") + print("device | code | name | samples | filled | avg_W | approx_Wh") + for device, code, name, samples, filled, avg_w, sum_w, wh in rows: + print(f"{device} | {code} | {name} | {samples} | {filled} | {avg_w} | {wh}") + + # Fehler-/Qualitätsstatistik aus poll_run + q2 = f""" + SELECT + d.name, + COUNT(*) AS runs, + SUM(CASE WHEN pr.ok=1 THEN 1 ELSE 0 END) AS ok_runs, + SUM(CASE WHEN pr.ok=0 THEN 1 ELSE 0 END) AS failed_runs, + SUM(pr.outlets_filled) AS outlets_filled_total, + SUM(pr.fields_filled) AS fields_filled_total, + ROUND(AVG(pr.duration_ms), 1) AS avg_duration_ms + FROM poll_run pr + JOIN device d ON d.id = pr.device_id + WHERE pr.started_at >= ? AND pr.started_at <= ? + {dev_filter} + GROUP BY d.name + ORDER BY d.name + """ + cur = con.execute(q2, params) + rows = cur.fetchall() + + print("\nJob-Statistik:") + print("device | runs | ok | failed | outlets_filled | fields_filled | avg_ms") + for r in rows: + print(" | ".join(str(x) for x in r)) + print() + +def main(): + ap = argparse.ArgumentParser(description="MMP (BayTech) ostatus Logger -> SQLite") + ap.add_argument("--config", required=True, help="Pfad zur config.json") + sub = ap.add_subparsers(dest="cmd", required=True) + + sp_poll = sub.add_parser("poll", help="Pollt alle enabled Devices und schreibt Messwerte") + sp_poll.add_argument("--device", default=None, help="Optional: nur ein Device-Name") + + sp_rep = sub.add_parser("report", help="Auswertung (weekly/monthly/yearly oder von-bis / last-days)") + sp_rep.add_argument("--device", default=None, help="Optional: nur ein Device-Name") + sp_rep.add_argument("--period", choices=["weekly","monthly","yearly"], default="weekly") + sp_rep.add_argument("--last-days", type=int, default=None, help="Letzte N Tage") + sp_rep.add_argument("--from-iso", default=None, help="UTC ISO, z.B. 2026-02-01T00:00:00+00:00") + sp_rep.add_argument("--to-iso", default=None, help="UTC ISO, z.B. 2026-02-08T00:00:00+00:00") + + args = ap.parse_args() + cfg = load_json(args.config) + cc_map = load_json(cfg["cost_center_map"]) + db_path = cfg["db_path"] + + con = db_connect(db_path) + db_init(con) + + tel = cfg.get("telnet", {}) + read_timeout = int(tel.get("read_timeout_sec", 25)) + connect_timeout = int(tel.get("connect_timeout_sec", 10)) + cmd = tel.get("command", "ostatus") + + devices = [] + for d in cfg.get("devices", []): + devices.append(DeviceCfg( + name=d["name"], + host=d["host"], + port=int(d.get("port", 20108)), + enabled=bool(d.get("enabled", True)), + username=d.get("username"), + password=d.get("password"), + )) + + if args.cmd == "poll": + for d in devices: + if not d.enabled: + continue + if args.device and d.name != args.device: + continue + poll_device(con, d, cc_map, cmd, connect_timeout, read_timeout) + print("OK") + + elif args.cmd == "report": + from_iso, to_iso = parse_period_args(args) + report(con, from_iso, to_iso, args.device) + +if __name__ == "__main__": + main() \ No newline at end of file