786 lines
26 KiB
Python
Executable File
786 lines
26 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
mmp_logger.py
|
|
- Pollt BayTech MMP (seriell->TCP bridge, raw TCP) per Kommando "ostatus"
|
|
- Schreibt Messwerte in SQLite
|
|
- Fehlende Felder werden mit letztem Messwert ersetzt (filled + filled_fields werden protokolliert)
|
|
- Report als Markdown (und optional HTML/PDF) nach ./reports/
|
|
- Outlets im Report in physischer Reihenfolge (port_index 1..N, wie von der MMP geliefert)
|
|
- Kostenstellen device-übergreifend aggregiert; nur Codes aus cost_centers.json (ohne _default)
|
|
"""
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import sqlite3
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
# -------------------------
|
|
# Parsing / Prompt
|
|
# -------------------------
|
|
|
|
PROMPT_END = b">"
|
|
NUM_RE = re.compile(r"([-+]?\d+(\.\d+)?)")
|
|
FILL_KEYS = ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"]
|
|
|
|
|
|
# -------------------------
|
|
# Data classes
|
|
# -------------------------
|
|
|
|
@dataclass
|
|
class DeviceCfg:
|
|
name: str
|
|
host: str
|
|
port: int
|
|
enabled: bool = True
|
|
|
|
|
|
# -------------------------
|
|
# Utils / Time / Paths
|
|
# -------------------------
|
|
|
|
def utc_now_iso() -> str:
|
|
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()
|
|
|
|
def load_json(path: str) -> dict:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
def ensure_dir(path: str) -> None:
|
|
d = os.path.dirname(path)
|
|
if d:
|
|
os.makedirs(d, exist_ok=True)
|
|
|
|
def resolve_path(config_file: str, maybe_rel: str) -> str:
|
|
p = Path(maybe_rel)
|
|
if p.is_absolute():
|
|
return str(p)
|
|
return str(Path(config_file).resolve().parent / p)
|
|
|
|
def project_root_from_config(config_file: str) -> str:
|
|
return str(Path(config_file).resolve().parent)
|
|
|
|
def safe_mkdir(root: str, sub: str) -> str:
|
|
p = Path(root) / sub
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
return str(p)
|
|
|
|
|
|
# -------------------------
|
|
# SQLite
|
|
# -------------------------
|
|
|
|
def db_connect(db_path: str) -> sqlite3.Connection:
|
|
ensure_dir(db_path)
|
|
con = sqlite3.connect(db_path)
|
|
con.execute("PRAGMA journal_mode=WAL;")
|
|
con.execute("PRAGMA synchronous=NORMAL;")
|
|
return con
|
|
|
|
def db_init(con: sqlite3.Connection) -> None:
|
|
# Hinweis: wenn du schon eine alte DB hast, ist ein Schema-Migration nötig.
|
|
# Da du ohnehin "rm -f data/mmp.sqlite" nutzt, ist das hier ok.
|
|
con.executescript("""
|
|
CREATE TABLE IF NOT EXISTS device (
|
|
id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL UNIQUE,
|
|
host TEXT NOT NULL,
|
|
port INTEGER NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
last_seen_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS outlet (
|
|
id INTEGER PRIMARY KEY,
|
|
device_id INTEGER NOT NULL,
|
|
port_index INTEGER NOT NULL,
|
|
outlet_name TEXT NOT NULL,
|
|
cost_code TEXT NOT NULL,
|
|
cost_name TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
last_seen_at TEXT,
|
|
UNIQUE(device_id, port_index),
|
|
FOREIGN KEY(device_id) REFERENCES device(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS poll_run (
|
|
id INTEGER PRIMARY KEY,
|
|
device_id INTEGER NOT NULL,
|
|
started_at TEXT NOT NULL,
|
|
finished_at TEXT,
|
|
ok INTEGER NOT NULL DEFAULT 0,
|
|
error TEXT,
|
|
outlets_received INTEGER NOT NULL DEFAULT 0,
|
|
outlets_filled INTEGER NOT NULL DEFAULT 0,
|
|
fields_filled INTEGER NOT NULL DEFAULT 0,
|
|
duration_ms INTEGER NOT NULL DEFAULT 0,
|
|
FOREIGN KEY(device_id) REFERENCES device(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS reading (
|
|
id INTEGER PRIMARY KEY,
|
|
poll_run_id INTEGER NOT NULL,
|
|
device_id INTEGER NOT NULL,
|
|
outlet_id INTEGER NOT NULL,
|
|
ts TEXT NOT NULL,
|
|
current_a REAL,
|
|
peak_a REAL,
|
|
voltage_v REAL,
|
|
power_w REAL,
|
|
va REAL,
|
|
state TEXT,
|
|
filled INTEGER NOT NULL DEFAULT 0,
|
|
filled_fields INTEGER NOT NULL DEFAULT 0,
|
|
FOREIGN KEY(poll_run_id) REFERENCES poll_run(id),
|
|
FOREIGN KEY(device_id) REFERENCES device(id),
|
|
FOREIGN KEY(outlet_id) REFERENCES outlet(id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_reading_ts ON reading(ts);
|
|
CREATE INDEX IF NOT EXISTS idx_reading_outlet_ts ON reading(outlet_id, ts);
|
|
CREATE INDEX IF NOT EXISTS idx_outlet_device_port ON outlet(device_id, port_index);
|
|
""")
|
|
con.commit()
|
|
|
|
def get_or_create_device(con: sqlite3.Connection, d: DeviceCfg) -> int:
|
|
now = utc_now_iso()
|
|
cur = con.execute("SELECT id FROM device WHERE name=?", (d.name,))
|
|
row = cur.fetchone()
|
|
if row:
|
|
device_id = int(row[0])
|
|
con.execute("UPDATE device SET host=?, port=?, last_seen_at=? WHERE id=?",
|
|
(d.host, d.port, now, device_id))
|
|
con.commit()
|
|
return device_id
|
|
|
|
cur = con.execute(
|
|
"INSERT INTO device(name,host,port,created_at,last_seen_at) VALUES(?,?,?,?,?)",
|
|
(d.name, d.host, d.port, now, None)
|
|
)
|
|
con.commit()
|
|
return int(cur.lastrowid)
|
|
|
|
def get_or_create_outlet(con: sqlite3.Connection, device_id: int, port_index: int, outlet_name: str,
|
|
cost_code: str, cost_name: str) -> int:
|
|
now = utc_now_iso()
|
|
if port_index <= 0:
|
|
raise ValueError(f"port_index invalid: {port_index} for outlet_name={outlet_name!r}")
|
|
|
|
cur = con.execute("SELECT id FROM outlet WHERE device_id=? AND port_index=?",
|
|
(device_id, port_index))
|
|
row = cur.fetchone()
|
|
if row:
|
|
outlet_id = int(row[0])
|
|
# Name darf sich ändern -> immer aktualisieren
|
|
con.execute("""
|
|
UPDATE outlet
|
|
SET outlet_name=?, cost_code=?, cost_name=?, last_seen_at=?
|
|
WHERE id=?
|
|
""", (outlet_name, cost_code, cost_name, now, outlet_id))
|
|
con.commit()
|
|
return outlet_id
|
|
|
|
cur = con.execute("""
|
|
INSERT INTO outlet(device_id,port_index,outlet_name,cost_code,cost_name,created_at,last_seen_at)
|
|
VALUES(?,?,?,?,?,?,?)
|
|
""", (device_id, port_index, outlet_name, cost_code, cost_name, now, now))
|
|
con.commit()
|
|
return int(cur.lastrowid)
|
|
|
|
cur = con.execute("""
|
|
INSERT INTO outlet(device_id,port_index,outlet_name,cost_code,cost_name,created_at,last_seen_at)
|
|
VALUES(?,?,?,?,?,?,?)
|
|
""", (device_id, port_index, outlet_name, cost_code, cost_name, now, now))
|
|
con.commit()
|
|
return int(cur.lastrowid)
|
|
|
|
def last_reading_for_outlet(con: sqlite3.Connection, outlet_id: int) -> Optional[dict]:
|
|
cur = con.execute("""
|
|
SELECT current_a, peak_a, voltage_v, power_w, va, state
|
|
FROM reading
|
|
WHERE outlet_id=?
|
|
ORDER BY ts DESC
|
|
LIMIT 1
|
|
""", (outlet_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"current_a": row[0],
|
|
"peak_a": row[1],
|
|
"voltage_v": row[2],
|
|
"power_w": row[3],
|
|
"va": row[4],
|
|
"state": row[5]
|
|
}
|
|
|
|
|
|
# -------------------------
|
|
# Cost center mapping
|
|
# -------------------------
|
|
|
|
def cost_center_for(outlet_name: str, cc_map: Dict[str, str]) -> Tuple[str, str]:
|
|
outlet_name = outlet_name.strip()
|
|
code = outlet_name[:1].upper() if outlet_name else "_"
|
|
name = cc_map.get(code) or cc_map.get("_default", "Unbekannt")
|
|
return code, name
|
|
|
|
|
|
# -------------------------
|
|
# Raw TCP (seriell->TCP bridge)
|
|
# -------------------------
|
|
|
|
def tcp_read_all(sock: socket.socket, read_timeout: int, stop_on_prompt: bool = True) -> bytes:
|
|
sock.settimeout(read_timeout)
|
|
buf = bytearray()
|
|
while True:
|
|
try:
|
|
chunk = sock.recv(4096)
|
|
except socket.timeout:
|
|
break
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
if stop_on_prompt and (PROMPT_END in buf):
|
|
break
|
|
return bytes(buf)
|
|
|
|
def tcp_fetch_ostatus_raw(
|
|
host: str,
|
|
port: int,
|
|
cmd: str,
|
|
connect_timeout: int,
|
|
prompt_timeout: int,
|
|
read_timeout: int,
|
|
enter_first: bool = True,
|
|
prompt_pause_sec: float = 2.0,
|
|
cmd_pause_sec: float = 0.2,
|
|
stop_on_prompt: bool = True
|
|
) -> bytes:
|
|
"""
|
|
Bridge-Verhalten:
|
|
- Session evtl. geschlossen -> recv==b'' beachten
|
|
- Kein Login
|
|
- ENTER senden, ggf. Pause, ggf. nochmal ENTER
|
|
"""
|
|
with socket.create_connection((host, port), timeout=connect_timeout) as s:
|
|
pre = b""
|
|
|
|
if enter_first:
|
|
s.sendall(b"\r\n")
|
|
time.sleep(prompt_pause_sec)
|
|
|
|
pre += tcp_read_all(s, read_timeout=prompt_timeout, stop_on_prompt=stop_on_prompt)
|
|
|
|
if stop_on_prompt and (PROMPT_END not in pre):
|
|
s.sendall(b"\r\n")
|
|
time.sleep(prompt_pause_sec)
|
|
pre += tcp_read_all(s, read_timeout=prompt_timeout, stop_on_prompt=stop_on_prompt)
|
|
|
|
s.sendall(cmd.encode("utf-8") + b"\r\n")
|
|
time.sleep(cmd_pause_sec)
|
|
|
|
out = tcp_read_all(s, read_timeout=read_timeout, stop_on_prompt=stop_on_prompt)
|
|
return pre + out
|
|
|
|
|
|
# -------------------------
|
|
# Parse ostatus output (physische Reihenfolge)
|
|
# -------------------------
|
|
|
|
def parse_ostatus(text: str) -> List[dict]:
|
|
"""
|
|
Erwartet Datenzeilen mit '|'.
|
|
Wichtig: port_index wird als fortlaufender Index in der Tabellen-Reihenfolge vergeben.
|
|
Dadurch entspricht port_index der physischen Reihenfolge, wie sie die MMP ausgibt.
|
|
"""
|
|
rows: List[dict] = []
|
|
port_index = 0
|
|
|
|
for ln in text.splitlines():
|
|
if "|" not in ln:
|
|
continue
|
|
|
|
s = ln.strip()
|
|
|
|
# Header/Trenner filtern (wichtig: zweite Headerzeile!)
|
|
if ("Outlet" in ln and "True RMS" in ln):
|
|
continue
|
|
if ("Name" in ln and "Current" in ln):
|
|
continue
|
|
if set(s) <= set("-| "):
|
|
continue
|
|
|
|
parts = [p.strip() for p in ln.split("|")]
|
|
parts = [p for p in parts if p != ""]
|
|
if len(parts) < 7:
|
|
continue
|
|
|
|
# Schutz: falls doch Header-Schrott durchrutscht
|
|
if parts[0].strip().lower() in ("outlet", "name"):
|
|
continue
|
|
|
|
outlet_name = parts[0]
|
|
cur_s, peak_s, volt_s, power_s, va_s, state = parts[1:7]
|
|
|
|
def parse_num(val: str) -> Optional[float]:
|
|
m = NUM_RE.search(val.replace(",", "."))
|
|
return float(m.group(1)) if m else None
|
|
|
|
port_index += 1
|
|
rows.append({
|
|
"port_index": port_index,
|
|
"outlet_name": outlet_name,
|
|
"current_a": parse_num(cur_s),
|
|
"peak_a": parse_num(peak_s),
|
|
"voltage_v": parse_num(volt_s),
|
|
"power_w": parse_num(power_s),
|
|
"va": parse_num(va_s),
|
|
"state": state if state else None
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
# -------------------------
|
|
# Fill logic
|
|
# -------------------------
|
|
|
|
def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]:
|
|
if not last:
|
|
return row, 0, 0
|
|
filled_fields = 0
|
|
for k in FILL_KEYS:
|
|
if row.get(k) is None and last.get(k) is not None:
|
|
row[k] = last[k]
|
|
filled_fields += 1
|
|
filled_flag = 1 if filled_fields > 0 else 0
|
|
return row, filled_flag, filled_fields
|
|
|
|
|
|
# -------------------------
|
|
# Polling
|
|
# -------------------------
|
|
|
|
def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str],
|
|
cmd: str, connect_timeout: int, prompt_timeout: int, read_timeout: int,
|
|
enter_first: bool, prompt_pause_sec: float,
|
|
stop_on_prompt: bool,
|
|
debug_dump_raw: bool,
|
|
project_root: str) -> None:
|
|
device_id = get_or_create_device(con, dev)
|
|
started = utc_now_iso()
|
|
t0 = time.time()
|
|
|
|
run_id = con.execute("""
|
|
INSERT INTO poll_run(device_id, started_at, ok)
|
|
VALUES(?,?,0)
|
|
""", (device_id, started)).lastrowid
|
|
con.commit()
|
|
|
|
outlets_received = 0
|
|
outlets_filled = 0
|
|
fields_filled = 0
|
|
|
|
try:
|
|
raw = tcp_fetch_ostatus_raw(
|
|
host=dev.host,
|
|
port=dev.port,
|
|
cmd=cmd,
|
|
connect_timeout=connect_timeout,
|
|
prompt_timeout=prompt_timeout,
|
|
read_timeout=read_timeout,
|
|
enter_first=enter_first,
|
|
prompt_pause_sec=prompt_pause_sec,
|
|
stop_on_prompt=stop_on_prompt
|
|
)
|
|
|
|
text = raw.decode("utf-8", errors="replace")
|
|
|
|
if debug_dump_raw:
|
|
logs_dir = Path(project_root) / "logs"
|
|
logs_dir.mkdir(parents=True, exist_ok=True)
|
|
(logs_dir / f"raw_{dev.name}.txt").write_text(text, encoding="utf-8")
|
|
|
|
rows = parse_ostatus(text)
|
|
ts = utc_now_iso()
|
|
outlets_received = len(rows)
|
|
|
|
for r in rows:
|
|
outlet_name = r["outlet_name"].strip()
|
|
port_index = int(r.get("port_index") or 0)
|
|
|
|
cost_code, cost_name = cost_center_for(outlet_name, cc_map)
|
|
outlet_id = get_or_create_outlet(con, device_id, port_index, outlet_name, cost_code, cost_name)
|
|
|
|
last = last_reading_for_outlet(con, outlet_id)
|
|
r2, filled_flag, filled_fields_cnt = apply_fill(r, last)
|
|
|
|
if filled_flag:
|
|
outlets_filled += 1
|
|
fields_filled += filled_fields_cnt
|
|
|
|
con.execute("""
|
|
INSERT INTO reading(
|
|
poll_run_id, device_id, outlet_id, ts,
|
|
current_a, peak_a, voltage_v, power_w, va, state,
|
|
filled, filled_fields
|
|
)
|
|
VALUES(?,?,?,?,?,?,?,?,?,?,?,?)
|
|
""", (
|
|
run_id, device_id, outlet_id, ts,
|
|
r2.get("current_a"), r2.get("peak_a"), r2.get("voltage_v"),
|
|
r2.get("power_w"), r2.get("va"), r2.get("state"),
|
|
filled_flag, filled_fields_cnt
|
|
))
|
|
|
|
duration_ms = int((time.time() - t0) * 1000)
|
|
con.execute("""
|
|
UPDATE poll_run
|
|
SET finished_at=?, ok=1, error=NULL,
|
|
outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=?
|
|
WHERE id=?
|
|
""", (utc_now_iso(), outlets_received, outlets_filled, fields_filled, duration_ms, run_id))
|
|
con.commit()
|
|
|
|
print(f"{dev.name}: OK outlets={outlets_received} filled_outlets={outlets_filled} filled_fields={fields_filled}")
|
|
|
|
except Exception as e:
|
|
duration_ms = int((time.time() - t0) * 1000)
|
|
con.execute("""
|
|
UPDATE poll_run
|
|
SET finished_at=?, ok=0, error=?,
|
|
outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=?
|
|
WHERE id=?
|
|
""", (utc_now_iso(), str(e), outlets_received, outlets_filled, fields_filled, duration_ms, run_id))
|
|
con.commit()
|
|
raise
|
|
|
|
|
|
# -------------------------
|
|
# Report output (md/html/pdf)
|
|
# -------------------------
|
|
|
|
def write_report_files(project_root: str, name: str, md_text: str, write_md: bool, write_html: bool, write_pdf: bool):
|
|
root = Path(project_root)
|
|
reports_dir = root / "reports"
|
|
reports_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
md_path = reports_dir / f"{name}.md"
|
|
html_path = reports_dir / f"{name}.html"
|
|
pdf_path = reports_dir / f"{name}.pdf"
|
|
|
|
if write_md:
|
|
md_path.write_text(md_text, encoding="utf-8")
|
|
|
|
if write_html:
|
|
try:
|
|
import markdown as md
|
|
html = md.markdown(md_text, extensions=["tables"])
|
|
except Exception:
|
|
html = "<pre>" + (md_text
|
|
.replace("&", "&")
|
|
.replace("<", "<")
|
|
.replace(">", ">")) + "</pre>"
|
|
|
|
html_full = (
|
|
"<!doctype html><meta charset='utf-8'>"
|
|
"<style>"
|
|
"body{font-family:system-ui,Segoe UI,Roboto,Arial,sans-serif;max-width:1100px;margin:24px auto;padding:0 16px;}"
|
|
"table{border-collapse:collapse;width:100%;} td,th{border:1px solid #ccc;padding:6px 10px;text-align:left;}"
|
|
"code,pre{background:#f6f8fa;padding:2px 4px;border-radius:4px;}"
|
|
"h1,h2{margin-top:1.2em;}"
|
|
"</style>"
|
|
f"<body>{html}</body>"
|
|
)
|
|
html_path.write_text(html_full, encoding="utf-8")
|
|
|
|
if write_pdf:
|
|
if not write_md:
|
|
md_path.write_text(md_text, encoding="utf-8")
|
|
os.system(f"pandoc '{md_path}' -o '{pdf_path}'")
|
|
|
|
|
|
# -------------------------
|
|
# Reporting logic
|
|
# -------------------------
|
|
|
|
def parse_period_args(args) -> Tuple[str, str, str]:
|
|
now = dt.datetime.now(dt.timezone.utc).replace(microsecond=0)
|
|
|
|
if args.last_days is not None:
|
|
start = now - dt.timedelta(days=args.last_days)
|
|
return start.isoformat(), now.isoformat(), f"last{args.last_days}d"
|
|
|
|
if args.from_iso and args.to_iso:
|
|
return args.from_iso, args.to_iso, "custom"
|
|
|
|
if args.period == "weekly":
|
|
start = now - dt.timedelta(days=7)
|
|
suffix = "weekly"
|
|
elif args.period == "monthly":
|
|
start = now - dt.timedelta(days=30)
|
|
suffix = "monthly"
|
|
elif args.period == "yearly":
|
|
start = now - dt.timedelta(days=365)
|
|
suffix = "yearly"
|
|
else:
|
|
start = now - dt.timedelta(days=7)
|
|
suffix = "weekly"
|
|
|
|
return start.isoformat(), now.isoformat(), suffix
|
|
|
|
def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Optional[str], cc_map: Dict[str, str]) -> str:
|
|
"""
|
|
Layout:
|
|
- Outlets gesamt: physische Reihenfolge (port_index ASC), kein device, kein cost_code
|
|
- Kostenstellen: device-übergreifend aggregiert, nur Codes aus cost_centers.json (ohne _default)
|
|
- Job-Statistik wie bisher
|
|
"""
|
|
params = [from_iso, to_iso]
|
|
dev_filter = ""
|
|
if device_name:
|
|
dev_filter = "AND d.name = ?"
|
|
params.append(device_name)
|
|
|
|
# ---- Outlets ----
|
|
q_outlets = f"""
|
|
SELECT
|
|
o.port_index,
|
|
o.outlet_name,
|
|
o.cost_name,
|
|
COUNT(*) AS samples,
|
|
SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS filled_samples,
|
|
ROUND(AVG(COALESCE(r.power_w,0.0)), 2) AS avg_power_w,
|
|
ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh
|
|
FROM reading r
|
|
JOIN outlet o ON o.id = r.outlet_id
|
|
JOIN device d ON d.id = r.device_id
|
|
WHERE r.ts >= ? AND r.ts <= ?
|
|
{dev_filter}
|
|
GROUP BY o.id, o.port_index, o.outlet_name, o.cost_name
|
|
ORDER BY o.port_index ASC
|
|
"""
|
|
outlet_rows = con.execute(q_outlets, params).fetchall()
|
|
|
|
q_totals = f"""
|
|
SELECT
|
|
COUNT(DISTINCT o.id) AS outlets_count,
|
|
COUNT(*) AS samples_count,
|
|
SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS filled_samples_count,
|
|
ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh_total
|
|
FROM reading r
|
|
JOIN outlet o ON o.id = r.outlet_id
|
|
JOIN device d ON d.id = r.device_id
|
|
WHERE r.ts >= ? AND r.ts <= ?
|
|
{dev_filter}
|
|
"""
|
|
totals = con.execute(q_totals, params).fetchone()
|
|
outlets_count, samples_count, filled_samples_count, approx_wh_total = totals
|
|
|
|
# ---- Kostenstellen (device-übergreifend) ----
|
|
valid_codes = [k for k in cc_map.keys() if k != "_default"]
|
|
valid_codes = sorted(set([c.upper() for c in valid_codes]))
|
|
|
|
cost_rows = []
|
|
if valid_codes:
|
|
placeholders = ",".join(["?"] * len(valid_codes))
|
|
q_cost = f"""
|
|
SELECT
|
|
o.cost_name,
|
|
COUNT(DISTINCT o.id) AS outlets_in_costcenter,
|
|
COUNT(*) AS samples,
|
|
SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS filled_samples,
|
|
ROUND(AVG(COALESCE(r.power_w,0.0)), 2) AS avg_power_w,
|
|
ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh
|
|
FROM reading r
|
|
JOIN outlet o ON o.id = r.outlet_id
|
|
JOIN device d ON d.id = r.device_id
|
|
WHERE r.ts >= ? AND r.ts <= ?
|
|
{dev_filter}
|
|
AND o.cost_code IN ({placeholders})
|
|
GROUP BY o.cost_name
|
|
ORDER BY approx_energy_wh DESC, o.cost_name
|
|
"""
|
|
cost_params = params + valid_codes
|
|
cost_rows = con.execute(q_cost, cost_params).fetchall()
|
|
|
|
# ---- Job-Statistik ----
|
|
q_job = f"""
|
|
SELECT
|
|
d.name,
|
|
COUNT(*) AS runs,
|
|
SUM(CASE WHEN pr.ok=1 THEN 1 ELSE 0 END) AS ok_runs,
|
|
SUM(CASE WHEN pr.ok=0 THEN 1 ELSE 0 END) AS failed_runs,
|
|
SUM(pr.outlets_received) AS outlets_received_total,
|
|
SUM(pr.outlets_filled) AS outlets_filled_total,
|
|
SUM(pr.fields_filled) AS fields_filled_total,
|
|
ROUND(AVG(pr.duration_ms), 1) AS avg_duration_ms
|
|
FROM poll_run pr
|
|
JOIN device d ON d.id = pr.device_id
|
|
WHERE pr.started_at >= ? AND pr.started_at <= ?
|
|
{dev_filter}
|
|
GROUP BY d.name
|
|
ORDER BY d.name
|
|
"""
|
|
job_rows = con.execute(q_job, params).fetchall()
|
|
|
|
# ---- Markdown ----
|
|
md: List[str] = []
|
|
md.append("# MMP Report")
|
|
md.append("")
|
|
md.append(f"- Zeitraum (UTC): **{from_iso}** .. **{to_iso}**")
|
|
if device_name:
|
|
md.append(f"- Device-Filter: **{device_name}**")
|
|
md.append("")
|
|
|
|
md.append("## Outlets gesamt")
|
|
md.append("")
|
|
md.append(f"- Ports (distinct): **{outlets_count}**")
|
|
md.append(f"- Samples: **{samples_count}** (filled: **{filled_samples_count}**)")
|
|
md.append(f"- Kumuliert (approx): **{approx_wh_total} Wh**")
|
|
md.append("")
|
|
md.append("| port | outlet | cost_name | samples | filled | avg_W | approx_Wh |")
|
|
md.append("|---:|---|---|---:|---:|---:|---:|")
|
|
for port_index, outlet_name, cost_name, samples, filled, avg_w, wh in outlet_rows:
|
|
md.append(f"| {port_index} | {outlet_name} | {cost_name} | {samples} | {filled} | {avg_w} | {wh} |")
|
|
md.append("")
|
|
|
|
md.append("## Kostenstellen")
|
|
md.append("")
|
|
if not cost_rows:
|
|
md.append("_Keine Kostenstellen aus cost_centers.json im Zeitraum gefunden._")
|
|
md.append("")
|
|
else:
|
|
md.append("| name | outlets | samples | filled | avg_W | approx_Wh |")
|
|
md.append("|---|---:|---:|---:|---:|---:|")
|
|
for name, outlets_cc, samples, filled, avg_w, wh in cost_rows:
|
|
md.append(f"| {name} | {outlets_cc} | {samples} | {filled} | {avg_w} | {wh} |")
|
|
md.append("")
|
|
|
|
md.append("## Job-Statistik")
|
|
md.append("")
|
|
md.append("| device | runs | ok | failed | outlets_total | outlets_filled | fields_filled | avg_ms |")
|
|
md.append("|---|---:|---:|---:|---:|---:|---:|---:|")
|
|
for device, runs, ok_runs, failed_runs, outlets_total, outlets_filled, fields_filled, avg_ms in job_rows:
|
|
md.append(f"| {device} | {runs} | {ok_runs} | {failed_runs} | {outlets_total} | {outlets_filled} | {fields_filled} | {avg_ms} |")
|
|
md.append("")
|
|
|
|
return "\n".join(md)
|
|
|
|
|
|
# -------------------------
|
|
# CLI
|
|
# -------------------------
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="MMP ostatus Logger (raw TCP) -> SQLite + Reports (md/html/pdf)")
|
|
ap.add_argument("--config", required=True, help="Pfad zur config.json")
|
|
sub = ap.add_subparsers(dest="cmd", required=True)
|
|
|
|
sp_poll = sub.add_parser("poll", help="Pollt alle enabled Devices und schreibt Messwerte")
|
|
sp_poll.add_argument("--device", default=None, help="Optional: nur ein Device-Name")
|
|
|
|
sp_rep = sub.add_parser("report", help="Auswertung (weekly/monthly/yearly oder from/to / last-days)")
|
|
sp_rep.add_argument("--device", default=None, help="Optional: nur ein Device-Name")
|
|
sp_rep.add_argument("--period", choices=["weekly", "monthly", "yearly"], default="weekly")
|
|
sp_rep.add_argument("--last-days", type=int, default=None, help="Letzte N Tage")
|
|
sp_rep.add_argument("--from-iso", default=None, help="UTC ISO, z.B. 2026-02-01T00:00:00+00:00")
|
|
sp_rep.add_argument("--to-iso", default=None, help="UTC ISO, z.B. 2026-02-08T00:00:00+00:00")
|
|
|
|
args = ap.parse_args()
|
|
|
|
cfg = load_json(args.config)
|
|
proj_root = project_root_from_config(args.config)
|
|
|
|
safe_mkdir(proj_root, "data")
|
|
safe_mkdir(proj_root, "logs")
|
|
safe_mkdir(proj_root, "reports")
|
|
|
|
db_path = resolve_path(args.config, cfg["db_path"])
|
|
cc_path = resolve_path(args.config, cfg["cost_center_map"])
|
|
cc_map = load_json(cc_path)
|
|
|
|
con = db_connect(db_path)
|
|
db_init(con)
|
|
|
|
# TCP / Timeouts / Verhalten
|
|
tcp_cfg = cfg.get("tcp", cfg.get("telnet", {})) # akzeptiert "tcp" oder (alt) "telnet"
|
|
read_timeout = int(tcp_cfg.get("read_timeout_sec", 35))
|
|
connect_timeout = int(tcp_cfg.get("connect_timeout_sec", 10))
|
|
prompt_timeout = int(tcp_cfg.get("prompt_timeout_sec", 8))
|
|
cmd = tcp_cfg.get("command", "ostatus")
|
|
|
|
enter_first = bool(tcp_cfg.get("enter_first", True))
|
|
prompt_pause_sec = float(tcp_cfg.get("prompt_pause_sec", 2.0))
|
|
stop_on_prompt = bool(tcp_cfg.get("stop_on_prompt", True))
|
|
debug_dump_raw = bool(tcp_cfg.get("debug_dump_raw", False))
|
|
|
|
devices: List[DeviceCfg] = []
|
|
for d in cfg.get("devices", []):
|
|
devices.append(DeviceCfg(
|
|
name=d["name"],
|
|
host=d["host"],
|
|
port=int(d.get("port", 20108)),
|
|
enabled=bool(d.get("enabled", True)),
|
|
))
|
|
|
|
# Report-Ausgabe
|
|
rep_cfg = cfg.get("report", {})
|
|
write_md = bool(rep_cfg.get("write_markdown", True))
|
|
write_html = bool(rep_cfg.get("write_html", True))
|
|
write_pdf = bool(rep_cfg.get("write_pdf", False))
|
|
name_prefix = rep_cfg.get("report_name_prefix", "report")
|
|
|
|
if args.cmd == "poll":
|
|
any_ran = False
|
|
for d in devices:
|
|
if not d.enabled:
|
|
continue
|
|
if args.device and d.name != args.device:
|
|
continue
|
|
any_ran = True
|
|
poll_device(
|
|
con, d, cc_map,
|
|
cmd, connect_timeout, prompt_timeout, read_timeout,
|
|
enter_first=enter_first,
|
|
prompt_pause_sec=prompt_pause_sec,
|
|
stop_on_prompt=stop_on_prompt,
|
|
debug_dump_raw=debug_dump_raw,
|
|
project_root=proj_root
|
|
)
|
|
if not any_ran:
|
|
print("Hinweis: Kein enabled device gematcht (check config.json / --device).")
|
|
|
|
elif args.cmd == "report":
|
|
from_iso, to_iso, suffix = parse_period_args(args)
|
|
md_text = report(con, from_iso, to_iso, args.device, cc_map)
|
|
|
|
print(md_text)
|
|
|
|
name = f"{name_prefix}_{suffix}"
|
|
if args.device:
|
|
name = f"{name}_{args.device}"
|
|
|
|
write_report_files(
|
|
project_root=proj_root,
|
|
name=name,
|
|
md_text=md_text,
|
|
write_md=write_md,
|
|
write_html=write_html,
|
|
write_pdf=write_pdf
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |