#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ mmp_logger.py - Pollt BayTech MMP (seriell->TCP bridge, raw TCP) per Kommando "ostatus" - Schreibt Messwerte in SQLite - Fehlende Felder werden mit letztem Messwert ersetzt (filled + filled_fields werden protokolliert) - Report als Markdown (und optional HTML/PDF) nach ./reports/ Projektidee: Alles in EINEM Verzeichnis. """ import argparse import datetime as dt import json import os import re import socket import sqlite3 import time from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional, Tuple # ------------------------- # Parsing / Prompt # ------------------------- PROMPT_END = b">" NUM_RE = re.compile(r"([-+]?\d+(\.\d+)?)") FILL_KEYS = ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"] # ------------------------- # Data classes # ------------------------- @dataclass class DeviceCfg: name: str host: str port: int enabled: bool = True username: Optional[str] = None # nicht genutzt (kein Login), bleibt für Zukunft password: Optional[str] = None # ------------------------- # Utils / Time / Paths # ------------------------- def utc_now_iso() -> str: return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat() def load_json(path: str) -> dict: with open(path, "r", encoding="utf-8") as f: return json.load(f) def ensure_dir(path: str) -> None: d = os.path.dirname(path) if d: os.makedirs(d, exist_ok=True) def resolve_path(config_file: str, maybe_rel: str) -> str: p = Path(maybe_rel) if p.is_absolute(): return str(p) return str(Path(config_file).resolve().parent / p) def project_root_from_config(config_file: str) -> str: return str(Path(config_file).resolve().parent) def safe_mkdir(root: str, sub: str) -> str: p = Path(root) / sub p.mkdir(parents=True, exist_ok=True) return str(p) # ------------------------- # SQLite # ------------------------- def db_connect(db_path: str) -> sqlite3.Connection: ensure_dir(db_path) con = sqlite3.connect(db_path) con.execute("PRAGMA journal_mode=WAL;") con.execute("PRAGMA synchronous=NORMAL;") return con def db_init(con: sqlite3.Connection) -> None: con.executescript(""" CREATE TABLE IF NOT EXISTS device ( id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, host TEXT NOT NULL, port INTEGER NOT NULL, created_at TEXT NOT NULL, last_seen_at TEXT ); CREATE TABLE IF NOT EXISTS outlet ( id INTEGER PRIMARY KEY, device_id INTEGER NOT NULL, outlet_name TEXT NOT NULL, cost_code TEXT NOT NULL, cost_name TEXT NOT NULL, created_at TEXT NOT NULL, last_seen_at TEXT, UNIQUE(device_id, outlet_name), FOREIGN KEY(device_id) REFERENCES device(id) ); CREATE TABLE IF NOT EXISTS poll_run ( id INTEGER PRIMARY KEY, device_id INTEGER NOT NULL, started_at TEXT NOT NULL, finished_at TEXT, ok INTEGER NOT NULL DEFAULT 0, error TEXT, outlets_received INTEGER NOT NULL DEFAULT 0, outlets_filled INTEGER NOT NULL DEFAULT 0, fields_filled INTEGER NOT NULL DEFAULT 0, duration_ms INTEGER NOT NULL DEFAULT 0, FOREIGN KEY(device_id) REFERENCES device(id) ); CREATE TABLE IF NOT EXISTS reading ( id INTEGER PRIMARY KEY, poll_run_id INTEGER NOT NULL, device_id INTEGER NOT NULL, outlet_id INTEGER NOT NULL, ts TEXT NOT NULL, current_a REAL, peak_a REAL, voltage_v REAL, power_w REAL, va REAL, state TEXT, filled INTEGER NOT NULL DEFAULT 0, filled_fields INTEGER NOT NULL DEFAULT 0, FOREIGN KEY(poll_run_id) REFERENCES poll_run(id), FOREIGN KEY(device_id) REFERENCES device(id), FOREIGN KEY(outlet_id) REFERENCES outlet(id) ); CREATE INDEX IF NOT EXISTS idx_reading_ts ON reading(ts); CREATE INDEX IF NOT EXISTS idx_reading_outlet_ts ON reading(outlet_id, ts); """) con.commit() def get_or_create_device(con: sqlite3.Connection, d: DeviceCfg) -> int: now = utc_now_iso() cur = con.execute("SELECT id FROM device WHERE name=?", (d.name,)) row = cur.fetchone() if row: device_id = int(row[0]) con.execute("UPDATE device SET host=?, port=?, last_seen_at=? WHERE id=?", (d.host, d.port, now, device_id)) con.commit() return device_id cur = con.execute( "INSERT INTO device(name,host,port,created_at,last_seen_at) VALUES(?,?,?,?,?)", (d.name, d.host, d.port, now, None) ) con.commit() return int(cur.lastrowid) def get_or_create_outlet(con: sqlite3.Connection, device_id: int, outlet_name: str, cost_code: str, cost_name: str) -> int: now = utc_now_iso() cur = con.execute("SELECT id FROM outlet WHERE device_id=? AND outlet_name=?", (device_id, outlet_name)) row = cur.fetchone() if row: outlet_id = int(row[0]) con.execute(""" UPDATE outlet SET cost_code=?, cost_name=?, last_seen_at=? WHERE id=? """, (cost_code, cost_name, now, outlet_id)) con.commit() return outlet_id cur = con.execute(""" INSERT INTO outlet(device_id,outlet_name,cost_code,cost_name,created_at,last_seen_at) VALUES(?,?,?,?,?,?) """, (device_id, outlet_name, cost_code, cost_name, now, now)) con.commit() return int(cur.lastrowid) def last_reading_for_outlet(con: sqlite3.Connection, outlet_id: int) -> Optional[dict]: cur = con.execute(""" SELECT current_a, peak_a, voltage_v, power_w, va, state FROM reading WHERE outlet_id=? ORDER BY ts DESC LIMIT 1 """, (outlet_id,)) row = cur.fetchone() if not row: return None return { "current_a": row[0], "peak_a": row[1], "voltage_v": row[2], "power_w": row[3], "va": row[4], "state": row[5] } # ------------------------- # Cost center mapping # ------------------------- def cost_center_for(outlet_name: str, cc_map: Dict[str, str]) -> Tuple[str, str]: outlet_name = outlet_name.strip() code = outlet_name[:1].upper() if outlet_name else "_" name = cc_map.get(code) or cc_map.get("_default", "Unbekannt") return code, name # ------------------------- # Raw TCP (seriell->TCP bridge) # ------------------------- def tcp_read_all(sock: socket.socket, read_timeout: int, stop_on_prompt: bool = True) -> bytes: """ Liest Daten bis: - prompt '>' gesehen (optional) - remote close (recv == b'') - timeout """ sock.settimeout(read_timeout) buf = bytearray() while True: try: chunk = sock.recv(4096) except socket.timeout: break if not chunk: break buf += chunk if stop_on_prompt and (PROMPT_END in buf): break return bytes(buf) def tcp_fetch_ostatus_raw( host: str, port: int, cmd: str, connect_timeout: int, prompt_timeout: int, read_timeout: int, enter_first: bool = True, prompt_pause_sec: float = 2.0, cmd_pause_sec: float = 0.2, stop_on_prompt: bool = True ) -> bytes: """ Ablauf (weil Session ggf. schließt): 1) connect 2) ENTER senden (wecken), Pause 3) kurz lesen (prompt_timeout) -> wenn kein Prompt: nochmal ENTER+Pause+kurz lesen 4) cmd senden, kurze Pause 5) lesen bis prompt/close/timeout """ with socket.create_connection((host, port), timeout=connect_timeout) as s: pre = b"" if enter_first: s.sendall(b"\r\n") time.sleep(prompt_pause_sec) pre += tcp_read_all(s, read_timeout=prompt_timeout, stop_on_prompt=stop_on_prompt) if stop_on_prompt and (PROMPT_END not in pre): # nochmal wecken s.sendall(b"\r\n") time.sleep(prompt_pause_sec) pre += tcp_read_all(s, read_timeout=prompt_timeout, stop_on_prompt=stop_on_prompt) # Kommando s.sendall(cmd.encode("utf-8") + b"\r\n") time.sleep(cmd_pause_sec) out = tcp_read_all(s, read_timeout=read_timeout, stop_on_prompt=stop_on_prompt) return pre + out # ------------------------- # Parse ostatus output # ------------------------- def parse_ostatus(text: str) -> List[dict]: """ Erwartet Tabellenzeilen mit '|'. Beispiel: | W Power1 | 0.0 A | 0.0 A | 230.3 V | 0 W | 4 VA | On | """ rows: List[dict] = [] for ln in text.splitlines(): if "|" not in ln: continue # Header / Trenner filtern if "Outlet" in ln and "True RMS" in ln: continue if set(ln.strip()) <= set("-| "): continue parts = [p.strip() for p in ln.split("|")] parts = [p for p in parts if p != ""] if len(parts) < 7: continue outlet_name = parts[0] cur_s, peak_s, volt_s, power_s, va_s, state = parts[1:7] def parse_num(s: str) -> Optional[float]: m = NUM_RE.search(s.replace(",", ".")) return float(m.group(1)) if m else None rows.append({ "outlet_name": outlet_name, "current_a": parse_num(cur_s), "peak_a": parse_num(peak_s), "voltage_v": parse_num(volt_s), "power_w": parse_num(power_s), "va": parse_num(va_s), "state": state if state else None }) return rows # ------------------------- # Fill logic # ------------------------- def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]: if not last: return row, 0, 0 filled_fields = 0 for k in FILL_KEYS: if row.get(k) is None and last.get(k) is not None: row[k] = last[k] filled_fields += 1 filled_flag = 1 if filled_fields > 0 else 0 return row, filled_flag, filled_fields # ------------------------- # Polling # ------------------------- def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str], cmd: str, connect_timeout: int, prompt_timeout: int, read_timeout: int, enter_first: bool, prompt_pause_sec: float, stop_on_prompt: bool, debug_dump_raw: bool, project_root: str) -> None: device_id = get_or_create_device(con, dev) started = utc_now_iso() t0 = time.time() run_id = con.execute(""" INSERT INTO poll_run(device_id, started_at, ok) VALUES(?,?,0) """, (device_id, started)).lastrowid con.commit() outlets_received = 0 outlets_filled = 0 fields_filled = 0 try: raw = tcp_fetch_ostatus_raw( host=dev.host, port=dev.port, cmd=cmd, connect_timeout=connect_timeout, prompt_timeout=prompt_timeout, read_timeout=read_timeout, enter_first=enter_first, prompt_pause_sec=prompt_pause_sec, stop_on_prompt=stop_on_prompt ) text = raw.decode("utf-8", errors="replace") if debug_dump_raw: logs_dir = Path(project_root) / "logs" logs_dir.mkdir(parents=True, exist_ok=True) (logs_dir / f"raw_{dev.name}.txt").write_text(text, encoding="utf-8") rows = parse_ostatus(text) ts = utc_now_iso() outlets_received = len(rows) for r in rows: outlet_name = r["outlet_name"].strip() cost_code, cost_name = cost_center_for(outlet_name, cc_map) outlet_id = get_or_create_outlet(con, device_id, outlet_name, cost_code, cost_name) last = last_reading_for_outlet(con, outlet_id) r2, filled_flag, filled_fields_cnt = apply_fill(r, last) if filled_flag: outlets_filled += 1 fields_filled += filled_fields_cnt con.execute(""" INSERT INTO reading( poll_run_id, device_id, outlet_id, ts, current_a, peak_a, voltage_v, power_w, va, state, filled, filled_fields ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?) """, ( run_id, device_id, outlet_id, ts, r2.get("current_a"), r2.get("peak_a"), r2.get("voltage_v"), r2.get("power_w"), r2.get("va"), r2.get("state"), filled_flag, filled_fields_cnt )) duration_ms = int((time.time() - t0) * 1000) con.execute(""" UPDATE poll_run SET finished_at=?, ok=1, error=NULL, outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=? WHERE id=? """, (utc_now_iso(), outlets_received, outlets_filled, fields_filled, duration_ms, run_id)) con.commit() # kleine Info für CLI print(f"{dev.name}: OK outlets={outlets_received} filled_outlets={outlets_filled} filled_fields={fields_filled}") except Exception as e: duration_ms = int((time.time() - t0) * 1000) con.execute(""" UPDATE poll_run SET finished_at=?, ok=0, error=?, outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=? WHERE id=? """, (utc_now_iso(), str(e), outlets_received, outlets_filled, fields_filled, duration_ms, run_id)) con.commit() raise # ------------------------- # Report output (md/html/pdf) # ------------------------- def write_report_files(project_root: str, name: str, md_text: str, write_md: bool, write_html: bool, write_pdf: bool): root = Path(project_root) reports_dir = root / "reports" reports_dir.mkdir(parents=True, exist_ok=True) md_path = reports_dir / f"{name}.md" html_path = reports_dir / f"{name}.html" pdf_path = reports_dir / f"{name}.pdf" if write_md: md_path.write_text(md_text, encoding="utf-8") if write_html: try: import markdown as md html = md.markdown(md_text, extensions=["tables"]) except Exception: html = "
" + (md_text
                              .replace("&", "&")
                              .replace("<", "<")
                              .replace(">", ">")) + "
" html_full = ( "" "" f"{html}" ) html_path.write_text(html_full, encoding="utf-8") if write_pdf: # robust über pandoc (muss installiert sein) # sudo apt-get install -y pandoc if not write_md: md_path.write_text(md_text, encoding="utf-8") os.system(f"pandoc '{md_path}' -o '{pdf_path}'") # ------------------------- # Reporting logic # ------------------------- def parse_period_args(args) -> Tuple[str, str, str]: """ Returns (from_iso, to_iso, suffix_name) in UTC. """ now = dt.datetime.now(dt.timezone.utc).replace(microsecond=0) if args.last_days is not None: start = now - dt.timedelta(days=args.last_days) return start.isoformat(), now.isoformat(), f"last{args.last_days}d" if args.from_iso and args.to_iso: return args.from_iso, args.to_iso, "custom" if args.period == "weekly": start = now - dt.timedelta(days=7) suffix = "weekly" elif args.period == "monthly": start = now - dt.timedelta(days=30) suffix = "monthly" elif args.period == "yearly": start = now - dt.timedelta(days=365) suffix = "yearly" else: start = now - dt.timedelta(days=7) suffix = "weekly" return start.isoformat(), now.isoformat(), suffix def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Optional[str]) -> str: """ Markdown Report als String. Energie: approx_Wh = Sum(power_w)*1h (bei stündlichem Poll). """ params = [from_iso, to_iso] dev_filter = "" if device_name: dev_filter = "AND d.name = ?" params.append(device_name) q_cost = f""" SELECT d.name AS device, o.cost_code, o.cost_name, COUNT(*) AS samples, SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS samples_with_fill, ROUND(AVG(r.power_w), 2) AS avg_power_w, ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh FROM reading r JOIN outlet o ON o.id = r.outlet_id JOIN device d ON d.id = r.device_id WHERE r.ts >= ? AND r.ts <= ? {dev_filter} GROUP BY d.name, o.cost_code, o.cost_name ORDER BY d.name, o.cost_code """ cost_rows = con.execute(q_cost, params).fetchall() q_job = f""" SELECT d.name, COUNT(*) AS runs, SUM(CASE WHEN pr.ok=1 THEN 1 ELSE 0 END) AS ok_runs, SUM(CASE WHEN pr.ok=0 THEN 1 ELSE 0 END) AS failed_runs, SUM(pr.outlets_received) AS outlets_received_total, SUM(pr.outlets_filled) AS outlets_filled_total, SUM(pr.fields_filled) AS fields_filled_total, ROUND(AVG(pr.duration_ms), 1) AS avg_duration_ms FROM poll_run pr JOIN device d ON d.id = pr.device_id WHERE pr.started_at >= ? AND pr.started_at <= ? {dev_filter} GROUP BY d.name ORDER BY d.name """ job_rows = con.execute(q_job, params).fetchall() md: List[str] = [] md.append("# MMP Report") md.append("") md.append(f"- Zeitraum (UTC): **{from_iso}** .. **{to_iso}**") if device_name: md.append(f"- Device: **{device_name}**") md.append("") md.append("## Kostenstellen") md.append("") md.append("| device | code | name | samples | filled_samples | avg_W | approx_Wh |") md.append("|---|---:|---|---:|---:|---:|---:|") for device, code, name, samples, filled, avg_w, wh in cost_rows: md.append(f"| {device} | {code} | {name} | {samples} | {filled} | {avg_w} | {wh} |") md.append("") md.append("## Job-Statistik") md.append("") md.append("| device | runs | ok | failed | outlets_total | outlets_filled | fields_filled | avg_ms |") md.append("|---|---:|---:|---:|---:|---:|---:|---:|") for device, runs, ok_runs, failed_runs, outlets_total, outlets_filled, fields_filled, avg_ms in job_rows: md.append(f"| {device} | {runs} | {ok_runs} | {failed_runs} | {outlets_total} | {outlets_filled} | {fields_filled} | {avg_ms} |") md.append("") return "\n".join(md) # ------------------------- # CLI # ------------------------- def main(): ap = argparse.ArgumentParser(description="MMP ostatus Logger (raw TCP) -> SQLite + Reports (md/html/pdf)") ap.add_argument("--config", required=True, help="Pfad zur config.json") sub = ap.add_subparsers(dest="cmd", required=True) sp_poll = sub.add_parser("poll", help="Pollt alle enabled Devices und schreibt Messwerte") sp_poll.add_argument("--device", default=None, help="Optional: nur ein Device-Name") sp_rep = sub.add_parser("report", help="Auswertung (weekly/monthly/yearly oder from/to / last-days)") sp_rep.add_argument("--device", default=None, help="Optional: nur ein Device-Name") sp_rep.add_argument("--period", choices=["weekly", "monthly", "yearly"], default="weekly") sp_rep.add_argument("--last-days", type=int, default=None, help="Letzte N Tage") sp_rep.add_argument("--from-iso", default=None, help="UTC ISO, z.B. 2026-02-01T00:00:00+00:00") sp_rep.add_argument("--to-iso", default=None, help="UTC ISO, z.B. 2026-02-08T00:00:00+00:00") args = ap.parse_args() cfg = load_json(args.config) proj_root = project_root_from_config(args.config) # Projekt-Unterordner sicherstellen safe_mkdir(proj_root, "data") safe_mkdir(proj_root, "logs") safe_mkdir(proj_root, "reports") # Pfade relativ zur config.json db_path = resolve_path(args.config, cfg["db_path"]) cc_path = resolve_path(args.config, cfg["cost_center_map"]) cc_map = load_json(cc_path) con = db_connect(db_path) db_init(con) # TCP / Timeouts / Verhalten tcp_cfg = cfg.get("tcp", cfg.get("telnet", {})) # erlaubt "telnet" alt, oder "tcp" neu read_timeout = int(tcp_cfg.get("read_timeout_sec", 35)) connect_timeout = int(tcp_cfg.get("connect_timeout_sec", 10)) prompt_timeout = int(tcp_cfg.get("prompt_timeout_sec", 8)) cmd = tcp_cfg.get("command", "ostatus") enter_first = bool(tcp_cfg.get("enter_first", True)) prompt_pause_sec = float(tcp_cfg.get("prompt_pause_sec", 2.0)) stop_on_prompt = bool(tcp_cfg.get("stop_on_prompt", True)) debug_dump_raw = bool(tcp_cfg.get("debug_dump_raw", False)) # Report-Ausgabe rep_cfg = cfg.get("report", {}) write_md = bool(rep_cfg.get("write_markdown", True)) write_html = bool(rep_cfg.get("write_html", True)) write_pdf = bool(rep_cfg.get("write_pdf", False)) name_prefix = rep_cfg.get("report_name_prefix", "report") # Devices devices: List[DeviceCfg] = [] for d in cfg.get("devices", []): devices.append(DeviceCfg( name=d["name"], host=d["host"], port=int(d.get("port", 20108)), enabled=bool(d.get("enabled", True)), username=d.get("username"), password=d.get("password"), )) if args.cmd == "poll": any_ran = False for d in devices: if not d.enabled: continue if args.device and d.name != args.device: continue any_ran = True poll_device( con, d, cc_map, cmd, connect_timeout, prompt_timeout, read_timeout, enter_first=enter_first, prompt_pause_sec=prompt_pause_sec, stop_on_prompt=stop_on_prompt, debug_dump_raw=debug_dump_raw, project_root=proj_root ) if not any_ran: print("Hinweis: Kein enabled device gematcht (check config.json / --device).") elif args.cmd == "report": from_iso, to_iso, suffix = parse_period_args(args) md_text = report(con, from_iso, to_iso, args.device) print(md_text) name = f"{name_prefix}_{suffix}" if args.device: name = f"{name}_{args.device}" write_report_files( project_root=proj_root, name=name, md_text=md_text, write_md=write_md, write_html=write_html, write_pdf=write_pdf ) if __name__ == "__main__": main()