Files
mmp_logger/mmp_logger.py
2026-02-08 19:03:00 +01:00

484 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import datetime as dt
import json
import os
import re
import sqlite3
import telnetlib
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from pathlib import Path
PROMPT_RE = re.compile(rb".*> ?$") # Prompt endet mit '>'
NUM_RE = re.compile(r"([-+]?\d+(\.\d+)?)")
@dataclass
class DeviceCfg:
name: str
host: str
port: int
enabled: bool = True
username: Optional[str] = None
password: Optional[str] = None
def resolve_path(base_file: str, maybe_rel: str) -> str:
p = Path(maybe_rel)
if p.is_absolute():
return str(p)
return str(Path(base_file).resolve().parent / p)
def utc_now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()
def load_json(path: str) -> dict:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def ensure_dir(path: str) -> None:
d = os.path.dirname(path)
if d:
os.makedirs(d, exist_ok=True)
def db_connect(db_path: str) -> sqlite3.Connection:
ensure_dir(db_path)
con = sqlite3.connect(db_path)
con.execute("PRAGMA journal_mode=WAL;")
con.execute("PRAGMA synchronous=NORMAL;")
return con
def db_init(con: sqlite3.Connection) -> None:
con.executescript("""
CREATE TABLE IF NOT EXISTS device (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
host TEXT NOT NULL,
port INTEGER NOT NULL,
created_at TEXT NOT NULL,
last_seen_at TEXT
);
CREATE TABLE IF NOT EXISTS outlet (
id INTEGER PRIMARY KEY,
device_id INTEGER NOT NULL,
outlet_name TEXT NOT NULL,
cost_code TEXT NOT NULL,
cost_name TEXT NOT NULL,
created_at TEXT NOT NULL,
last_seen_at TEXT,
UNIQUE(device_id, outlet_name),
FOREIGN KEY(device_id) REFERENCES device(id)
);
CREATE TABLE IF NOT EXISTS poll_run (
id INTEGER PRIMARY KEY,
device_id INTEGER NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
ok INTEGER NOT NULL DEFAULT 0,
error TEXT,
outlets_received INTEGER NOT NULL DEFAULT 0,
outlets_filled INTEGER NOT NULL DEFAULT 0,
fields_filled INTEGER NOT NULL DEFAULT 0,
duration_ms INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY(device_id) REFERENCES device(id)
);
CREATE TABLE IF NOT EXISTS reading (
id INTEGER PRIMARY KEY,
poll_run_id INTEGER NOT NULL,
device_id INTEGER NOT NULL,
outlet_id INTEGER NOT NULL,
ts TEXT NOT NULL, -- UTC ISO
current_a REAL,
peak_a REAL,
voltage_v REAL,
power_w REAL,
va REAL,
state TEXT,
filled INTEGER NOT NULL DEFAULT 0, -- 1 wenn (teilweise/komplett) aus letztem Messwert gefüllt
filled_fields INTEGER NOT NULL DEFAULT 0, -- Anzahl gefüllter Felder
FOREIGN KEY(poll_run_id) REFERENCES poll_run(id),
FOREIGN KEY(device_id) REFERENCES device(id),
FOREIGN KEY(outlet_id) REFERENCES outlet(id)
);
CREATE INDEX IF NOT EXISTS idx_reading_ts ON reading(ts);
CREATE INDEX IF NOT EXISTS idx_reading_outlet_ts ON reading(outlet_id, ts);
""")
con.commit()
def get_or_create_device(con: sqlite3.Connection, d: DeviceCfg) -> int:
now = utc_now_iso()
cur = con.execute("SELECT id FROM device WHERE name=?", (d.name,))
row = cur.fetchone()
if row:
device_id = row[0]
con.execute("UPDATE device SET host=?, port=?, last_seen_at=? WHERE id=?",
(d.host, d.port, now, device_id))
con.commit()
return device_id
cur = con.execute(
"INSERT INTO device(name,host,port,created_at,last_seen_at) VALUES(?,?,?,?,?)",
(d.name, d.host, d.port, now, None)
)
con.commit()
return int(cur.lastrowid)
def cost_center_for(outlet_name: str, cc_map: Dict[str, str]) -> Tuple[str, str]:
outlet_name = outlet_name.strip()
code = outlet_name[:1].upper() if outlet_name else "_"
name = cc_map.get(code) or cc_map.get("_default", "Unbekannt")
return code, name
def telnet_fetch_ostatus(dev: DeviceCfg, cmd: str, connect_timeout: int, read_timeout: int) -> bytes:
"""
Öffnet Telnet, wartet auf Prompt (endet mit '>'), sendet cmd, liest bis Prompt.
Unterstützt optional simple Login-Prompts (best effort).
"""
tn = telnetlib.Telnet()
tn.open(dev.host, dev.port, timeout=connect_timeout)
# Erstes Prompt / Banner
buf = tn.read_until(b">", timeout=read_timeout)
# Best-effort Login, falls vorhanden
# (Viele Geräte zeigen "login:" / "Password:" - nicht garantiert)
if dev.username:
if b"login" in buf.lower() or b"user" in buf.lower():
tn.write(dev.username.encode("utf-8") + b"\n")
buf += tn.read_until(b":", timeout=read_timeout)
if dev.password and (b"password" in buf.lower()):
tn.write(dev.password.encode("utf-8") + b"\n")
tn.read_until(b">", timeout=read_timeout)
# Kommando senden
tn.write(cmd.encode("utf-8") + b"\n")
# Lesen bis nächstes Prompt
out = tn.read_until(b">", timeout=read_timeout)
tn.close()
return out
def parse_ostatus(text: str) -> List[dict]:
"""
Parst ostatus Tabellenzeilen:
| <Outlet Name> | 0.0 A | 0.1 A | 230.2 V | 21 W | 26 VA | On |
Rückgabe: Liste dicts
"""
lines = text.splitlines()
rows = []
for ln in lines:
if "|" not in ln:
continue
# Tabellenheader / Trenner wegfiltern
if "Outlet" in ln and "True RMS" in ln:
continue
if set(ln.strip()) <= set("-| "):
continue
parts = [p.strip() for p in ln.split("|")]
# typisches Format hat leere Ränder: ['', name, cur, peak, volt, power, va, state, '']
parts = [p for p in parts if p != ""]
if len(parts) < 7:
continue
outlet_name = parts[0]
cur_s = parts[1]
peak_s = parts[2]
volt_s = parts[3]
power_s = parts[4]
va_s = parts[5]
state = parts[6]
def parse_num(s: str) -> Optional[float]:
m = NUM_RE.search(s.replace(",", "."))
return float(m.group(1)) if m else None
rows.append({
"outlet_name": outlet_name,
"current_a": parse_num(cur_s),
"peak_a": parse_num(peak_s),
"voltage_v": parse_num(volt_s),
"power_w": parse_num(power_s),
"va": parse_num(va_s),
"state": state
})
return rows
def last_reading_for_outlet(con: sqlite3.Connection, outlet_id: int) -> Optional[dict]:
cur = con.execute("""
SELECT current_a, peak_a, voltage_v, power_w, va, state
FROM reading
WHERE outlet_id=?
ORDER BY ts DESC
LIMIT 1
""", (outlet_id,))
row = cur.fetchone()
if not row:
return None
return {
"current_a": row[0],
"peak_a": row[1],
"voltage_v": row[2],
"power_w": row[3],
"va": row[4],
"state": row[5]
}
def get_or_create_outlet(con: sqlite3.Connection, device_id: int, outlet_name: str,
cost_code: str, cost_name: str) -> int:
now = utc_now_iso()
cur = con.execute("""
SELECT id FROM outlet WHERE device_id=? AND outlet_name=?
""", (device_id, outlet_name))
row = cur.fetchone()
if row:
outlet_id = row[0]
con.execute("""
UPDATE outlet
SET cost_code=?, cost_name=?, last_seen_at=?
WHERE id=?
""", (cost_code, cost_name, now, outlet_id))
con.commit()
return int(outlet_id)
cur = con.execute("""
INSERT INTO outlet(device_id,outlet_name,cost_code,cost_name,created_at,last_seen_at)
VALUES(?,?,?,?,?,?)
""", (device_id, outlet_name, cost_code, cost_name, now, now))
con.commit()
return int(cur.lastrowid)
def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]:
"""
Füllt fehlende Werte (None) aus last.
Returns: (new_row, filled_flag, filled_fields_count)
"""
if not last:
return row, 0, 0
filled_fields = 0
for k in ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"]:
if row.get(k) is None and last.get(k) is not None:
row[k] = last[k]
filled_fields += 1
filled_flag = 1 if filled_fields > 0 else 0
return row, filled_flag, filled_fields
def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str],
cmd: str, connect_timeout: int, read_timeout: int) -> None:
device_id = get_or_create_device(con, dev)
started = utc_now_iso()
t0 = time.time()
run_id = con.execute("""
INSERT INTO poll_run(device_id, started_at, ok)
VALUES(?,?,0)
""", (device_id, started)).lastrowid
con.commit()
outlets_received = 0
outlets_filled = 0
fields_filled = 0
try:
raw = telnet_fetch_ostatus(dev, cmd, connect_timeout, read_timeout)
text = raw.decode("utf-8", errors="replace")
rows = parse_ostatus(text)
ts = utc_now_iso()
outlets_received = len(rows)
for r in rows:
outlet_name = r["outlet_name"].strip()
cost_code, cost_name = cost_center_for(outlet_name, cc_map)
outlet_id = get_or_create_outlet(con, device_id, outlet_name, cost_code, cost_name)
last = last_reading_for_outlet(con, outlet_id)
r2, filled_flag, filled_fields_cnt = apply_fill(r, last)
if filled_flag:
outlets_filled += 1
fields_filled += filled_fields_cnt
con.execute("""
INSERT INTO reading(
poll_run_id, device_id, outlet_id, ts,
current_a, peak_a, voltage_v, power_w, va, state,
filled, filled_fields
)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?)
""", (
run_id, device_id, outlet_id, ts,
r2.get("current_a"), r2.get("peak_a"), r2.get("voltage_v"),
r2.get("power_w"), r2.get("va"), r2.get("state"),
filled_flag, filled_fields_cnt
))
duration_ms = int((time.time() - t0) * 1000)
con.execute("""
UPDATE poll_run
SET finished_at=?, ok=1, error=NULL,
outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=?
WHERE id=?
""", (utc_now_iso(), outlets_received, outlets_filled, fields_filled, duration_ms, run_id))
con.commit()
except Exception as e:
duration_ms = int((time.time() - t0) * 1000)
con.execute("""
UPDATE poll_run
SET finished_at=?, ok=0, error=?, outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=?
WHERE id=?
""", (utc_now_iso(), str(e), outlets_received, outlets_filled, fields_filled, duration_ms, run_id))
con.commit()
raise
def parse_period_args(args) -> Tuple[str, str]:
"""
Liefert (from_iso, to_iso) in UTC.
"""
now = dt.datetime.now(dt.timezone.utc).replace(microsecond=0)
if args.last_days is not None:
start = now - dt.timedelta(days=args.last_days)
return start.isoformat(), now.isoformat()
if args.from_iso and args.to_iso:
return args.from_iso, args.to_iso
# vordefinierte Perioden
if args.period == "weekly":
start = now - dt.timedelta(days=7)
elif args.period == "monthly":
start = now - dt.timedelta(days=30)
elif args.period == "yearly":
start = now - dt.timedelta(days=365)
else:
start = now - dt.timedelta(days=7)
return start.isoformat(), now.isoformat()
def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Optional[str]) -> None:
# Energie grob aus average power * 1h (weil stündlich) -> Wh je Messpunkt
# Wenn du mal nicht exakt stündlich bist, kann man über delta(ts) verbessern; fürs erste: 1 Messung = 1 Stunde.
params = [from_iso, to_iso]
dev_filter = ""
if device_name:
dev_filter = "AND d.name = ?"
params.append(device_name)
# Kostenstellen-Übersicht
q = f"""
SELECT
d.name AS device,
o.cost_code,
o.cost_name,
COUNT(*) AS samples,
SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS samples_with_fill,
ROUND(AVG(r.power_w), 2) AS avg_power_w,
ROUND(SUM(COALESCE(r.power_w,0.0)), 2) AS sum_power_w,
ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh
FROM reading r
JOIN outlet o ON o.id = r.outlet_id
JOIN device d ON d.id = r.device_id
WHERE r.ts >= ? AND r.ts <= ?
{dev_filter}
GROUP BY d.name, o.cost_code, o.cost_name
ORDER BY d.name, o.cost_code
"""
cur = con.execute(q, params)
rows = cur.fetchall()
print(f"\nReport UTC: {from_iso} .. {to_iso}")
if device_name:
print(f"Device: {device_name}")
print("\nKostenstellen:")
print("device | code | name | samples | filled | avg_W | approx_Wh")
for device, code, name, samples, filled, avg_w, sum_w, wh in rows:
print(f"{device} | {code} | {name} | {samples} | {filled} | {avg_w} | {wh}")
# Fehler-/Qualitätsstatistik aus poll_run
q2 = f"""
SELECT
d.name,
COUNT(*) AS runs,
SUM(CASE WHEN pr.ok=1 THEN 1 ELSE 0 END) AS ok_runs,
SUM(CASE WHEN pr.ok=0 THEN 1 ELSE 0 END) AS failed_runs,
SUM(pr.outlets_filled) AS outlets_filled_total,
SUM(pr.fields_filled) AS fields_filled_total,
ROUND(AVG(pr.duration_ms), 1) AS avg_duration_ms
FROM poll_run pr
JOIN device d ON d.id = pr.device_id
WHERE pr.started_at >= ? AND pr.started_at <= ?
{dev_filter}
GROUP BY d.name
ORDER BY d.name
"""
cur = con.execute(q2, params)
rows = cur.fetchall()
print("\nJob-Statistik:")
print("device | runs | ok | failed | outlets_filled | fields_filled | avg_ms")
for r in rows:
print(" | ".join(str(x) for x in r))
print()
def main():
ap = argparse.ArgumentParser(description="MMP (BayTech) ostatus Logger -> SQLite")
ap.add_argument("--config", required=True, help="Pfad zur config.json")
sub = ap.add_subparsers(dest="cmd", required=True)
sp_poll = sub.add_parser("poll", help="Pollt alle enabled Devices und schreibt Messwerte")
sp_poll.add_argument("--device", default=None, help="Optional: nur ein Device-Name")
sp_rep = sub.add_parser("report", help="Auswertung (weekly/monthly/yearly oder von-bis / last-days)")
sp_rep.add_argument("--device", default=None, help="Optional: nur ein Device-Name")
sp_rep.add_argument("--period", choices=["weekly","monthly","yearly"], default="weekly")
sp_rep.add_argument("--last-days", type=int, default=None, help="Letzte N Tage")
sp_rep.add_argument("--from-iso", default=None, help="UTC ISO, z.B. 2026-02-01T00:00:00+00:00")
sp_rep.add_argument("--to-iso", default=None, help="UTC ISO, z.B. 2026-02-08T00:00:00+00:00")
args = ap.parse_args()
cfg = load_json(args.config)
cfg_path = args.config
db_path = resolve_path(cfg_path, cfg["db_path"])
cc_path = resolve_path(cfg_path, cfg["cost_center_map"])
cc_map = load_json(cc_path)
cc_map = load_json(cfg["cost_center_map"])
db_path = cfg["db_path"]
con = db_connect(db_path)
db_init(con)
tel = cfg.get("telnet", {})
read_timeout = int(tel.get("read_timeout_sec", 25))
connect_timeout = int(tel.get("connect_timeout_sec", 10))
cmd = tel.get("command", "ostatus")
devices = []
for d in cfg.get("devices", []):
devices.append(DeviceCfg(
name=d["name"],
host=d["host"],
port=int(d.get("port", 20108)),
enabled=bool(d.get("enabled", True)),
username=d.get("username"),
password=d.get("password"),
))
if args.cmd == "poll":
for d in devices:
if not d.enabled:
continue
if args.device and d.name != args.device:
continue
poll_device(con, d, cc_map, cmd, connect_timeout, read_timeout)
print("OK")
elif args.cmd == "report":
from_iso, to_iso = parse_period_args(args)
report(con, from_iso, to_iso, args.device)
if __name__ == "__main__":
main()