diff --git a/config.json b/config.json index 3fbc167..5dcf458 100644 --- a/config.json +++ b/config.json @@ -5,7 +5,7 @@ "read_timeout_sec": 25, "connect_timeout_sec": 10, "command": "ostatus", - "prompt_timeout_sec": 25 + "prompt_timeout_sec": 8 }, "report": { "write_markdown": true, diff --git a/data/mmp.sqlite b/data/mmp.sqlite new file mode 100644 index 0000000..9c0b549 Binary files /dev/null and b/data/mmp.sqlite differ diff --git a/mmp_logger.py b/mmp_logger.py index b3609f7..f76add0 100755 --- a/mmp_logger.py +++ b/mmp_logger.py @@ -1,21 +1,40 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +""" +mmp_logger.py +- Pollt BayTech MMP (seriell->TCP bridge, raw TCP) per Kommando "ostatus" +- Schreibt Messwerte in SQLite +- Fehlende Felder werden mit letztem Messwert ersetzt (filled + filled_fields werden protokolliert) +- Report als Markdown (und optional HTML/PDF) nach ./reports/ + +Projektidee: Alles in EINEM Verzeichnis. +""" + import argparse import datetime as dt import json import os import re -import sqlite3 import socket +import sqlite3 import time from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional, Tuple -# Prompt endet mit '>' (laut deiner Beschreibung) +# ------------------------- +# Parsing / Prompt +# ------------------------- + PROMPT_END = b">" NUM_RE = re.compile(r"([-+]?\d+(\.\d+)?)") +FILL_KEYS = ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"] + + +# ------------------------- +# Data classes +# ------------------------- @dataclass class DeviceCfg: @@ -23,7 +42,7 @@ class DeviceCfg: host: str port: int enabled: bool = True - username: Optional[str] = None + username: Optional[str] = None # nicht genutzt (kein Login), bleibt für Zukunft password: Optional[str] = None @@ -111,7 +130,7 @@ def db_init(con: sqlite3.Connection) -> None: poll_run_id INTEGER NOT NULL, device_id INTEGER NOT NULL, outlet_id INTEGER NOT NULL, - ts TEXT NOT NULL, -- UTC ISO + ts TEXT NOT NULL, current_a REAL, peak_a REAL, voltage_v REAL, @@ -135,11 +154,11 @@ def get_or_create_device(con: sqlite3.Connection, d: DeviceCfg) -> int: cur = con.execute("SELECT id FROM device WHERE name=?", (d.name,)) row = cur.fetchone() if row: - device_id = row[0] + device_id = int(row[0]) con.execute("UPDATE device SET host=?, port=?, last_seen_at=? WHERE id=?", (d.host, d.port, now, device_id)) con.commit() - return int(device_id) + return device_id cur = con.execute( "INSERT INTO device(name,host,port,created_at,last_seen_at) VALUES(?,?,?,?,?)", @@ -204,17 +223,15 @@ def cost_center_for(outlet_name: str, cc_map: Dict[str, str]) -> Tuple[str, str] # ------------------------- -# Telnet / Fetch / Parse +# Raw TCP (seriell->TCP bridge) # ------------------------- -PROMPT_END = b">" - -def tcp_read_all(sock: socket.socket, read_timeout: int) -> bytes: +def tcp_read_all(sock: socket.socket, read_timeout: int, stop_on_prompt: bool = True) -> bytes: """ - Liest bis: - - Prompt '>' erkannt wird, ODER - - Gegenseite schließt (recv == b''), ODER - - read_timeout abläuft (socket.timeout) + Liest Daten bis: + - prompt '>' gesehen (optional) + - remote close (recv == b'') + - timeout """ sock.settimeout(read_timeout) buf = bytearray() @@ -224,11 +241,9 @@ def tcp_read_all(sock: socket.socket, read_timeout: int) -> bytes: except socket.timeout: break if not chunk: - # Session geschlossen break buf += chunk - # Prompt irgendwo im Buffer? - if PROMPT_END in buf: + if stop_on_prompt and (PROMPT_END in buf): break return bytes(buf) @@ -242,46 +257,49 @@ def tcp_fetch_ostatus_raw( enter_first: bool = True, prompt_pause_sec: float = 2.0, cmd_pause_sec: float = 0.2, + stop_on_prompt: bool = True ) -> bytes: """ - Raw TCP (seriell->TCP). Ablauf: + Ablauf (weil Session ggf. schließt): 1) connect - 2) optional: ENTER senden, kurz warten - 3) kurz lesen (prompt_timeout) - - falls kein '>' kommt: nochmal ENTER, Pause + 2) ENTER senden (wecken), Pause + 3) kurz lesen (prompt_timeout) -> wenn kein Prompt: nochmal ENTER+Pause+kurz lesen 4) cmd senden, kurze Pause - 5) lesen bis prompt / close / timeout + 5) lesen bis prompt/close/timeout """ with socket.create_connection((host, port), timeout=connect_timeout) as s: - # 1) "Session wecken" + pre = b"" if enter_first: s.sendall(b"\r\n") time.sleep(prompt_pause_sec) - # 2) kurzer Versuch: gibt's schon Output/Prompt? - pre = tcp_read_all(s, read_timeout=prompt_timeout) + pre += tcp_read_all(s, read_timeout=prompt_timeout, stop_on_prompt=stop_on_prompt) - # 3) falls kein Prompt: nochmal Enter + Pause - if PROMPT_END not in pre: + if stop_on_prompt and (PROMPT_END not in pre): + # nochmal wecken s.sendall(b"\r\n") time.sleep(prompt_pause_sec) - pre += tcp_read_all(s, read_timeout=prompt_timeout) + pre += tcp_read_all(s, read_timeout=prompt_timeout, stop_on_prompt=stop_on_prompt) - # 4) Kommando senden + # Kommando s.sendall(cmd.encode("utf-8") + b"\r\n") time.sleep(cmd_pause_sec) - # 5) Antwort lesen (Bridge kann danach schließen) - out = tcp_read_all(s, read_timeout=read_timeout) + out = tcp_read_all(s, read_timeout=read_timeout, stop_on_prompt=stop_on_prompt) return pre + out + +# ------------------------- +# Parse ostatus output +# ------------------------- + def parse_ostatus(text: str) -> List[dict]: """ - Parst Tabellenzeilen mit '|' (dein Format). - Beispielzeile: + Erwartet Tabellenzeilen mit '|'. + Beispiel: | W Power1 | 0.0 A | 0.0 A | 230.3 V | 0 W | 4 VA | On | """ - rows = [] + rows: List[dict] = [] for ln in text.splitlines(): if "|" not in ln: continue @@ -319,8 +337,6 @@ def parse_ostatus(text: str) -> List[dict]: # Fill logic # ------------------------- -FILL_KEYS = ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"] - def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]: if not last: return row, 0, 0 @@ -338,7 +354,11 @@ def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]: # ------------------------- def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str], - cmd: str, connect_timeout: int, prompt_timeout: int, read_timeout: int) -> None: + cmd: str, connect_timeout: int, prompt_timeout: int, read_timeout: int, + enter_first: bool, prompt_pause_sec: float, + stop_on_prompt: bool, + debug_dump_raw: bool, + project_root: str) -> None: device_id = get_or_create_device(con, dev) started = utc_now_iso() t0 = time.time() @@ -361,10 +381,22 @@ def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str], connect_timeout=connect_timeout, prompt_timeout=prompt_timeout, read_timeout=read_timeout, - enter_first=True, - prompt_pause_sec=2.0 + enter_first=enter_first, + prompt_pause_sec=prompt_pause_sec, + stop_on_prompt=stop_on_prompt ) + text = raw.decode("utf-8", errors="replace") + + if debug_dump_raw: + logs_dir = Path(project_root) / "logs" + logs_dir.mkdir(parents=True, exist_ok=True) + (logs_dir / f"raw_{dev.name}.txt").write_text(text, encoding="utf-8") + + rows = parse_ostatus(text) + ts = utc_now_iso() + outlets_received = len(rows) + for r in rows: outlet_name = r["outlet_name"].strip() cost_code, cost_name = cost_center_for(outlet_name, cc_map) @@ -400,6 +432,9 @@ def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str], """, (utc_now_iso(), outlets_received, outlets_filled, fields_filled, duration_ms, run_id)) con.commit() + # kleine Info für CLI + print(f"{dev.name}: OK outlets={outlets_received} filled_outlets={outlets_filled} filled_fields={fields_filled}") + except Exception as e: duration_ms = int((time.time() - t0) * 1000) con.execute(""" @@ -473,7 +508,6 @@ def parse_period_args(args) -> Tuple[str, str, str]: return start.isoformat(), now.isoformat(), f"last{args.last_days}d" if args.from_iso and args.to_iso: - # Name aus from/to (gekürzt) return args.from_iso, args.to_iso, "custom" if args.period == "weekly": @@ -493,8 +527,8 @@ def parse_period_args(args) -> Tuple[str, str, str]: def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Optional[str]) -> str: """ - Erzeugt Markdown-Report als String (kann gedruckt + in Dateien geschrieben werden). - Energie: grob approx_Wh = Sum(power_w) * 1h (bei stündlichem Poll). + Markdown Report als String. + Energie: approx_Wh = Sum(power_w)*1h (bei stündlichem Poll). """ params = [from_iso, to_iso] dev_filter = "" @@ -540,7 +574,7 @@ def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Opt """ job_rows = con.execute(q_job, params).fetchall() - md = [] + md: List[str] = [] md.append("# MMP Report") md.append("") md.append(f"- Zeitraum (UTC): **{from_iso}** .. **{to_iso}**") @@ -572,7 +606,7 @@ def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Opt # ------------------------- def main(): - ap = argparse.ArgumentParser(description="MMP ostatus Logger -> SQLite + Reports (md/html/pdf)") + ap = argparse.ArgumentParser(description="MMP ostatus Logger (raw TCP) -> SQLite + Reports (md/html/pdf)") ap.add_argument("--config", required=True, help="Pfad zur config.json") sub = ap.add_subparsers(dest="cmd", required=True) @@ -591,31 +625,39 @@ def main(): cfg = load_json(args.config) proj_root = project_root_from_config(args.config) - # Projekt-Unterordner sicherstellen (alles im selben Verzeichnis) + # Projekt-Unterordner sicherstellen safe_mkdir(proj_root, "data") safe_mkdir(proj_root, "logs") safe_mkdir(proj_root, "reports") + # Pfade relativ zur config.json db_path = resolve_path(args.config, cfg["db_path"]) cc_path = resolve_path(args.config, cfg["cost_center_map"]) - cc_map = load_json(cc_path) con = db_connect(db_path) db_init(con) - tel = cfg.get("telnet", {}) - read_timeout = int(tel.get("read_timeout_sec", 25)) - connect_timeout = int(tel.get("connect_timeout_sec", 10)) - prompt_timeout = int(tel.get("prompt_timeout_sec", 25)) - cmd = tel.get("command", "ostatus") + # TCP / Timeouts / Verhalten + tcp_cfg = cfg.get("tcp", cfg.get("telnet", {})) # erlaubt "telnet" alt, oder "tcp" neu + read_timeout = int(tcp_cfg.get("read_timeout_sec", 35)) + connect_timeout = int(tcp_cfg.get("connect_timeout_sec", 10)) + prompt_timeout = int(tcp_cfg.get("prompt_timeout_sec", 8)) + cmd = tcp_cfg.get("command", "ostatus") + enter_first = bool(tcp_cfg.get("enter_first", True)) + prompt_pause_sec = float(tcp_cfg.get("prompt_pause_sec", 2.0)) + stop_on_prompt = bool(tcp_cfg.get("stop_on_prompt", True)) + debug_dump_raw = bool(tcp_cfg.get("debug_dump_raw", False)) + + # Report-Ausgabe rep_cfg = cfg.get("report", {}) write_md = bool(rep_cfg.get("write_markdown", True)) write_html = bool(rep_cfg.get("write_html", True)) write_pdf = bool(rep_cfg.get("write_pdf", False)) name_prefix = rep_cfg.get("report_name_prefix", "report") + # Devices devices: List[DeviceCfg] = [] for d in cfg.get("devices", []): devices.append(DeviceCfg( @@ -628,13 +670,24 @@ def main(): )) if args.cmd == "poll": + any_ran = False for d in devices: if not d.enabled: continue if args.device and d.name != args.device: continue - poll_device(con, d, cc_map, cmd, connect_timeout, prompt_timeout, read_timeout) - print("OK") + any_ran = True + poll_device( + con, d, cc_map, + cmd, connect_timeout, prompt_timeout, read_timeout, + enter_first=enter_first, + prompt_pause_sec=prompt_pause_sec, + stop_on_prompt=stop_on_prompt, + debug_dump_raw=debug_dump_raw, + project_root=proj_root + ) + if not any_ran: + print("Hinweis: Kein enabled device gematcht (check config.json / --device).") elif args.cmd == "report": from_iso, to_iso, suffix = parse_period_args(args) @@ -642,7 +695,6 @@ def main(): print(md_text) - # Dateiname: z.B. report_weekly, report_monthly, report_last14d, report_custom name = f"{name_prefix}_{suffix}" if args.device: name = f"{name}_{args.device}" @@ -656,6 +708,6 @@ def main(): write_pdf=write_pdf ) + if __name__ == "__main__": - main() - \ No newline at end of file + main() \ No newline at end of file