This commit is contained in:
2026-02-08 19:31:09 +01:00
parent 0b03fed798
commit e727f37c27
3 changed files with 373 additions and 168 deletions

View File

@ -14,4 +14,53 @@ mmp_logger/
├─ report_weekly.md ├─ report_weekly.md
├─ report_weekly.html ├─ report_weekly.html
└─ report_weekly.pdf (optional) └─ report_weekly.pdf (optional)
Im Projektordner auf dem MAC
```python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python3 mmp_logger.py --config config.json poll
python3 mmp_logger.py --config config.json report --period weekly
auf dem pi
sudo mkdir -p /opt/mmp_logger
sudo chown -R pi:pi /opt/mmp_logger
übertragen auf Pi
```
rsync -av --delete ./mmp_logger/ pi@raspberrypi:/opt/mmp_logger/
```
Pi: venv + deps:
```
cd /opt/mmp_logger
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
mkdir -p data logs reports
```
Cron (stündlich) im Projekt, Logs im Projekt
crontab -e (User pi):
```
SHELL=/bin/bash
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
5 * * * * cd /opt/mmp_logger && /opt/mmp_logger/.venv/bin/python /opt/mmp_logger/mmp_logger.py --config /opt/mmp_logger/config.json poll >> /opt/mmp_logger/logs/mmp_logger.log 2>&1
20 0 * * * cd /opt/mmp_logger && /opt/mmp_logger/.venv/bin/python /opt/mmp_logger/mmp_logger.py --config /opt/mmp_logger/config.json report --period weekly >> /opt/mmp_logger/logs/mmp_report.log 2>&1
```
Empfehlung zur Ausgabe
Markdown: gut für Git/Archiv/Lesbarkeit, diffbar
HTML: gut fürs schnelle Öffnen im Browser, ggf. später ins Intranet
PDF: optional per pandoc (am Pi stabil). Install:
```
sudo apt-get update
sudo apt-get install -y pandoc
``` ```

View File

@ -4,10 +4,31 @@
"telnet": { "telnet": {
"read_timeout_sec": 25, "read_timeout_sec": 25,
"connect_timeout_sec": 10, "connect_timeout_sec": 10,
"command": "ostatus" "command": "ostatus",
"prompt_timeout_sec": 25
},
"report": {
"write_markdown": true,
"write_html": true,
"write_pdf": false,
"report_name_prefix": "report"
}, },
"devices": [ "devices": [
{ "name": "mmp17-1", "host": "192.168.252.155", "port": 20108, "enabled": true, "username": null, "password": null }, {
{ "name": "mmp17-2", "host": "192.168.252.156", "port": 20108, "enabled": false, "username": null, "password": null } "name": "mmp17-1",
"host": "192.168.252.155",
"port": 20108,
"enabled": true,
"username": null,
"password": null
},
{
"name": "mmp17-2",
"host": "192.168.252.156",
"port": 20108,
"enabled": false,
"username": null,
"password": null
}
] ]
} }

View File

@ -10,10 +10,11 @@ import sqlite3
import telnetlib import telnetlib
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Tuple
PROMPT_RE = re.compile(rb".*> ?$") # Prompt endet mit '>' # Prompt endet mit '>' (laut deiner Beschreibung)
PROMPT_END = b">"
NUM_RE = re.compile(r"([-+]?\d+(\.\d+)?)") NUM_RE = re.compile(r"([-+]?\d+(\.\d+)?)")
@dataclass @dataclass
@ -25,11 +26,10 @@ class DeviceCfg:
username: Optional[str] = None username: Optional[str] = None
password: Optional[str] = None password: Optional[str] = None
def resolve_path(base_file: str, maybe_rel: str) -> str:
p = Path(maybe_rel) # -------------------------
if p.is_absolute(): # Utils / Time / Paths
return str(p) # -------------------------
return str(Path(base_file).resolve().parent / p)
def utc_now_iso() -> str: def utc_now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat() return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()
@ -43,6 +43,25 @@ def ensure_dir(path: str) -> None:
if d: if d:
os.makedirs(d, exist_ok=True) os.makedirs(d, exist_ok=True)
def resolve_path(config_file: str, maybe_rel: str) -> str:
p = Path(maybe_rel)
if p.is_absolute():
return str(p)
return str(Path(config_file).resolve().parent / p)
def project_root_from_config(config_file: str) -> str:
return str(Path(config_file).resolve().parent)
def safe_mkdir(root: str, sub: str) -> str:
p = Path(root) / sub
p.mkdir(parents=True, exist_ok=True)
return str(p)
# -------------------------
# SQLite
# -------------------------
def db_connect(db_path: str) -> sqlite3.Connection: def db_connect(db_path: str) -> sqlite3.Connection:
ensure_dir(db_path) ensure_dir(db_path)
con = sqlite3.connect(db_path) con = sqlite3.connect(db_path)
@ -99,8 +118,8 @@ def db_init(con: sqlite3.Connection) -> None:
power_w REAL, power_w REAL,
va REAL, va REAL,
state TEXT, state TEXT,
filled INTEGER NOT NULL DEFAULT 0, -- 1 wenn (teilweise/komplett) aus letztem Messwert gefüllt filled INTEGER NOT NULL DEFAULT 0,
filled_fields INTEGER NOT NULL DEFAULT 0, -- Anzahl gefüllter Felder filled_fields INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY(poll_run_id) REFERENCES poll_run(id), FOREIGN KEY(poll_run_id) REFERENCES poll_run(id),
FOREIGN KEY(device_id) REFERENCES device(id), FOREIGN KEY(device_id) REFERENCES device(id),
FOREIGN KEY(outlet_id) REFERENCES outlet(id) FOREIGN KEY(outlet_id) REFERENCES outlet(id)
@ -120,7 +139,8 @@ def get_or_create_device(con: sqlite3.Connection, d: DeviceCfg) -> int:
con.execute("UPDATE device SET host=?, port=?, last_seen_at=? WHERE id=?", con.execute("UPDATE device SET host=?, port=?, last_seen_at=? WHERE id=?",
(d.host, d.port, now, device_id)) (d.host, d.port, now, device_id))
con.commit() con.commit()
return device_id return int(device_id)
cur = con.execute( cur = con.execute(
"INSERT INTO device(name,host,port,created_at,last_seen_at) VALUES(?,?,?,?,?)", "INSERT INTO device(name,host,port,created_at,last_seen_at) VALUES(?,?,?,?,?)",
(d.name, d.host, d.port, now, None) (d.name, d.host, d.port, now, None)
@ -128,86 +148,28 @@ def get_or_create_device(con: sqlite3.Connection, d: DeviceCfg) -> int:
con.commit() con.commit()
return int(cur.lastrowid) return int(cur.lastrowid)
def cost_center_for(outlet_name: str, cc_map: Dict[str, str]) -> Tuple[str, str]: def get_or_create_outlet(con: sqlite3.Connection, device_id: int, outlet_name: str,
outlet_name = outlet_name.strip() cost_code: str, cost_name: str) -> int:
code = outlet_name[:1].upper() if outlet_name else "_" now = utc_now_iso()
name = cc_map.get(code) or cc_map.get("_default", "Unbekannt") cur = con.execute("SELECT id FROM outlet WHERE device_id=? AND outlet_name=?",
return code, name (device_id, outlet_name))
row = cur.fetchone()
if row:
outlet_id = int(row[0])
con.execute("""
UPDATE outlet
SET cost_code=?, cost_name=?, last_seen_at=?
WHERE id=?
""", (cost_code, cost_name, now, outlet_id))
con.commit()
return outlet_id
def telnet_fetch_ostatus(dev: DeviceCfg, cmd: str, connect_timeout: int, read_timeout: int) -> bytes: cur = con.execute("""
""" INSERT INTO outlet(device_id,outlet_name,cost_code,cost_name,created_at,last_seen_at)
Öffnet Telnet, wartet auf Prompt (endet mit '>'), sendet cmd, liest bis Prompt. VALUES(?,?,?,?,?,?)
Unterstützt optional simple Login-Prompts (best effort). """, (device_id, outlet_name, cost_code, cost_name, now, now))
""" con.commit()
tn = telnetlib.Telnet() return int(cur.lastrowid)
tn.open(dev.host, dev.port, timeout=connect_timeout)
# Erstes Prompt / Banner
buf = tn.read_until(b">", timeout=read_timeout)
# Best-effort Login, falls vorhanden
# (Viele Geräte zeigen "login:" / "Password:" - nicht garantiert)
if dev.username:
if b"login" in buf.lower() or b"user" in buf.lower():
tn.write(dev.username.encode("utf-8") + b"\n")
buf += tn.read_until(b":", timeout=read_timeout)
if dev.password and (b"password" in buf.lower()):
tn.write(dev.password.encode("utf-8") + b"\n")
tn.read_until(b">", timeout=read_timeout)
# Kommando senden
tn.write(cmd.encode("utf-8") + b"\n")
# Lesen bis nächstes Prompt
out = tn.read_until(b">", timeout=read_timeout)
tn.close()
return out
def parse_ostatus(text: str) -> List[dict]:
"""
Parst ostatus Tabellenzeilen:
| <Outlet Name> | 0.0 A | 0.1 A | 230.2 V | 21 W | 26 VA | On |
Rückgabe: Liste dicts
"""
lines = text.splitlines()
rows = []
for ln in lines:
if "|" not in ln:
continue
# Tabellenheader / Trenner wegfiltern
if "Outlet" in ln and "True RMS" in ln:
continue
if set(ln.strip()) <= set("-| "):
continue
parts = [p.strip() for p in ln.split("|")]
# typisches Format hat leere Ränder: ['', name, cur, peak, volt, power, va, state, '']
parts = [p for p in parts if p != ""]
if len(parts) < 7:
continue
outlet_name = parts[0]
cur_s = parts[1]
peak_s = parts[2]
volt_s = parts[3]
power_s = parts[4]
va_s = parts[5]
state = parts[6]
def parse_num(s: str) -> Optional[float]:
m = NUM_RE.search(s.replace(",", "."))
return float(m.group(1)) if m else None
rows.append({
"outlet_name": outlet_name,
"current_a": parse_num(cur_s),
"peak_a": parse_num(peak_s),
"voltage_v": parse_num(volt_s),
"power_w": parse_num(power_s),
"va": parse_num(va_s),
"state": state
})
return rows
def last_reading_for_outlet(con: sqlite3.Connection, outlet_id: int) -> Optional[dict]: def last_reading_for_outlet(con: sqlite3.Connection, outlet_id: int) -> Optional[dict]:
cur = con.execute(""" cur = con.execute("""
@ -229,50 +191,121 @@ def last_reading_for_outlet(con: sqlite3.Connection, outlet_id: int) -> Optional
"state": row[5] "state": row[5]
} }
def get_or_create_outlet(con: sqlite3.Connection, device_id: int, outlet_name: str,
cost_code: str, cost_name: str) -> int:
now = utc_now_iso()
cur = con.execute("""
SELECT id FROM outlet WHERE device_id=? AND outlet_name=?
""", (device_id, outlet_name))
row = cur.fetchone()
if row:
outlet_id = row[0]
con.execute("""
UPDATE outlet
SET cost_code=?, cost_name=?, last_seen_at=?
WHERE id=?
""", (cost_code, cost_name, now, outlet_id))
con.commit()
return int(outlet_id)
cur = con.execute(""" # -------------------------
INSERT INTO outlet(device_id,outlet_name,cost_code,cost_name,created_at,last_seen_at) # Cost center mapping
VALUES(?,?,?,?,?,?) # -------------------------
""", (device_id, outlet_name, cost_code, cost_name, now, now))
con.commit() def cost_center_for(outlet_name: str, cc_map: Dict[str, str]) -> Tuple[str, str]:
return int(cur.lastrowid) outlet_name = outlet_name.strip()
code = outlet_name[:1].upper() if outlet_name else "_"
name = cc_map.get(code) or cc_map.get("_default", "Unbekannt")
return code, name
# -------------------------
# Telnet / Fetch / Parse
# -------------------------
def telnet_read_until_prompt(tn: telnetlib.Telnet, timeout: int) -> bytes:
# robust: lesen bis '>' auftaucht, nicht zwingend am Zeilenende
return tn.read_until(PROMPT_END, timeout=timeout)
def telnet_fetch_ostatus(dev: DeviceCfg, cmd: str, connect_timeout: int, prompt_timeout: int, read_timeout: int) -> bytes:
"""
Öffnet Telnet, wartet auf Prompt ('>'), sendet cmd, liest Antwort (bis nächstes Prompt).
Optionales Login best-effort, falls Username/Password gesetzt sind.
"""
tn = telnetlib.Telnet()
tn.open(dev.host, dev.port, timeout=connect_timeout)
buf = telnet_read_until_prompt(tn, timeout=prompt_timeout)
# Best-effort login flow
if dev.username:
low = buf.lower()
if b"login" in low or b"user" in low:
tn.write(dev.username.encode("utf-8") + b"\n")
buf = telnet_read_until_prompt(tn, timeout=prompt_timeout)
if dev.password:
low = buf.lower()
if b"pass" in low:
tn.write(dev.password.encode("utf-8") + b"\n")
buf = telnet_read_until_prompt(tn, timeout=prompt_timeout)
tn.write(cmd.encode("utf-8") + b"\n")
out = tn.read_until(PROMPT_END, timeout=read_timeout)
tn.close()
return out
def parse_ostatus(text: str) -> List[dict]:
"""
Parst Tabellenzeilen mit '|' (dein Format).
Beispielzeile:
| W Power1 | 0.0 A | 0.0 A | 230.3 V | 0 W | 4 VA | On |
"""
rows = []
for ln in text.splitlines():
if "|" not in ln:
continue
# Header / Trenner filtern
if "Outlet" in ln and "True RMS" in ln:
continue
if set(ln.strip()) <= set("-| "):
continue
parts = [p.strip() for p in ln.split("|")]
parts = [p for p in parts if p != ""]
if len(parts) < 7:
continue
outlet_name = parts[0]
cur_s, peak_s, volt_s, power_s, va_s, state = parts[1:7]
def parse_num(s: str) -> Optional[float]:
m = NUM_RE.search(s.replace(",", "."))
return float(m.group(1)) if m else None
rows.append({
"outlet_name": outlet_name,
"current_a": parse_num(cur_s),
"peak_a": parse_num(peak_s),
"voltage_v": parse_num(volt_s),
"power_w": parse_num(power_s),
"va": parse_num(va_s),
"state": state if state else None
})
return rows
# -------------------------
# Fill logic
# -------------------------
FILL_KEYS = ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"]
def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]: def apply_fill(row: dict, last: Optional[dict]) -> Tuple[dict, int, int]:
"""
Füllt fehlende Werte (None) aus last.
Returns: (new_row, filled_flag, filled_fields_count)
"""
if not last: if not last:
return row, 0, 0 return row, 0, 0
filled_fields = 0 filled_fields = 0
for k in ["current_a", "peak_a", "voltage_v", "power_w", "va", "state"]: for k in FILL_KEYS:
if row.get(k) is None and last.get(k) is not None: if row.get(k) is None and last.get(k) is not None:
row[k] = last[k] row[k] = last[k]
filled_fields += 1 filled_fields += 1
filled_flag = 1 if filled_fields > 0 else 0 filled_flag = 1 if filled_fields > 0 else 0
return row, filled_flag, filled_fields return row, filled_flag, filled_fields
# -------------------------
# Polling
# -------------------------
def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str], def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str],
cmd: str, connect_timeout: int, read_timeout: int) -> None: cmd: str, connect_timeout: int, prompt_timeout: int, read_timeout: int) -> None:
device_id = get_or_create_device(con, dev) device_id = get_or_create_device(con, dev)
started = utc_now_iso() started = utc_now_iso()
t0 = time.time() t0 = time.time()
run_id = con.execute(""" run_id = con.execute("""
INSERT INTO poll_run(device_id, started_at, ok) INSERT INTO poll_run(device_id, started_at, ok)
VALUES(?,?,0) VALUES(?,?,0)
@ -284,11 +317,10 @@ def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str],
fields_filled = 0 fields_filled = 0
try: try:
raw = telnet_fetch_ostatus(dev, cmd, connect_timeout, read_timeout) raw = telnet_fetch_ostatus(dev, cmd, connect_timeout, prompt_timeout, read_timeout)
text = raw.decode("utf-8", errors="replace") text = raw.decode("utf-8", errors="replace")
rows = parse_ostatus(text) rows = parse_ostatus(text)
ts = utc_now_iso() ts = utc_now_iso()
outlets_received = len(rows) outlets_received = len(rows)
for r in rows: for r in rows:
@ -330,46 +362,105 @@ def poll_device(con: sqlite3.Connection, dev: DeviceCfg, cc_map: Dict[str, str],
duration_ms = int((time.time() - t0) * 1000) duration_ms = int((time.time() - t0) * 1000)
con.execute(""" con.execute("""
UPDATE poll_run UPDATE poll_run
SET finished_at=?, ok=0, error=?, outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=? SET finished_at=?, ok=0, error=?,
outlets_received=?, outlets_filled=?, fields_filled=?, duration_ms=?
WHERE id=? WHERE id=?
""", (utc_now_iso(), str(e), outlets_received, outlets_filled, fields_filled, duration_ms, run_id)) """, (utc_now_iso(), str(e), outlets_received, outlets_filled, fields_filled, duration_ms, run_id))
con.commit() con.commit()
raise raise
def parse_period_args(args) -> Tuple[str, str]:
# -------------------------
# Report output (md/html/pdf)
# -------------------------
def write_report_files(project_root: str, name: str, md_text: str, write_md: bool, write_html: bool, write_pdf: bool):
root = Path(project_root)
reports_dir = root / "reports"
reports_dir.mkdir(parents=True, exist_ok=True)
md_path = reports_dir / f"{name}.md"
html_path = reports_dir / f"{name}.html"
pdf_path = reports_dir / f"{name}.pdf"
if write_md:
md_path.write_text(md_text, encoding="utf-8")
if write_html:
try:
import markdown as md
html = md.markdown(md_text, extensions=["tables"])
except Exception:
html = "<pre>" + (md_text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")) + "</pre>"
html_full = (
"<!doctype html><meta charset='utf-8'>"
"<style>"
"body{font-family:system-ui,Segoe UI,Roboto,Arial,sans-serif;max-width:1100px;margin:24px auto;padding:0 16px;}"
"table{border-collapse:collapse;width:100%;} td,th{border:1px solid #ccc;padding:6px 10px;text-align:left;}"
"code,pre{background:#f6f8fa;padding:2px 4px;border-radius:4px;}"
"h1,h2{margin-top:1.2em;}"
"</style>"
f"<body>{html}</body>"
)
html_path.write_text(html_full, encoding="utf-8")
if write_pdf:
# robust über pandoc (muss installiert sein)
# sudo apt-get install -y pandoc
if not write_md:
md_path.write_text(md_text, encoding="utf-8")
os.system(f"pandoc '{md_path}' -o '{pdf_path}'")
# -------------------------
# Reporting logic
# -------------------------
def parse_period_args(args) -> Tuple[str, str, str]:
""" """
Liefert (from_iso, to_iso) in UTC. Returns (from_iso, to_iso, suffix_name) in UTC.
""" """
now = dt.datetime.now(dt.timezone.utc).replace(microsecond=0) now = dt.datetime.now(dt.timezone.utc).replace(microsecond=0)
if args.last_days is not None: if args.last_days is not None:
start = now - dt.timedelta(days=args.last_days) start = now - dt.timedelta(days=args.last_days)
return start.isoformat(), now.isoformat() return start.isoformat(), now.isoformat(), f"last{args.last_days}d"
if args.from_iso and args.to_iso: if args.from_iso and args.to_iso:
return args.from_iso, args.to_iso # Name aus from/to (gekürzt)
return args.from_iso, args.to_iso, "custom"
# vordefinierte Perioden
if args.period == "weekly": if args.period == "weekly":
start = now - dt.timedelta(days=7) start = now - dt.timedelta(days=7)
suffix = "weekly"
elif args.period == "monthly": elif args.period == "monthly":
start = now - dt.timedelta(days=30) start = now - dt.timedelta(days=30)
suffix = "monthly"
elif args.period == "yearly": elif args.period == "yearly":
start = now - dt.timedelta(days=365) start = now - dt.timedelta(days=365)
suffix = "yearly"
else: else:
start = now - dt.timedelta(days=7) start = now - dt.timedelta(days=7)
return start.isoformat(), now.isoformat() suffix = "weekly"
def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Optional[str]) -> None: return start.isoformat(), now.isoformat(), suffix
# Energie grob aus average power * 1h (weil stündlich) -> Wh je Messpunkt
# Wenn du mal nicht exakt stündlich bist, kann man über delta(ts) verbessern; fürs erste: 1 Messung = 1 Stunde. def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Optional[str]) -> str:
"""
Erzeugt Markdown-Report als String (kann gedruckt + in Dateien geschrieben werden).
Energie: grob approx_Wh = Sum(power_w) * 1h (bei stündlichem Poll).
"""
params = [from_iso, to_iso] params = [from_iso, to_iso]
dev_filter = "" dev_filter = ""
if device_name: if device_name:
dev_filter = "AND d.name = ?" dev_filter = "AND d.name = ?"
params.append(device_name) params.append(device_name)
# Kostenstellen-Übersicht q_cost = f"""
q = f"""
SELECT SELECT
d.name AS device, d.name AS device,
o.cost_code, o.cost_code,
@ -377,7 +468,6 @@ def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Opt
COUNT(*) AS samples, COUNT(*) AS samples,
SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS samples_with_fill, SUM(CASE WHEN r.filled=1 THEN 1 ELSE 0 END) AS samples_with_fill,
ROUND(AVG(r.power_w), 2) AS avg_power_w, ROUND(AVG(r.power_w), 2) AS avg_power_w,
ROUND(SUM(COALESCE(r.power_w,0.0)), 2) AS sum_power_w,
ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh ROUND(SUM(COALESCE(r.power_w,0.0))*1.0, 2) AS approx_energy_wh
FROM reading r FROM reading r
JOIN outlet o ON o.id = r.outlet_id JOIN outlet o ON o.id = r.outlet_id
@ -387,24 +477,15 @@ def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Opt
GROUP BY d.name, o.cost_code, o.cost_name GROUP BY d.name, o.cost_code, o.cost_name
ORDER BY d.name, o.cost_code ORDER BY d.name, o.cost_code
""" """
cur = con.execute(q, params) cost_rows = con.execute(q_cost, params).fetchall()
rows = cur.fetchall()
print(f"\nReport UTC: {from_iso} .. {to_iso}") q_job = f"""
if device_name:
print(f"Device: {device_name}")
print("\nKostenstellen:")
print("device | code | name | samples | filled | avg_W | approx_Wh")
for device, code, name, samples, filled, avg_w, sum_w, wh in rows:
print(f"{device} | {code} | {name} | {samples} | {filled} | {avg_w} | {wh}")
# Fehler-/Qualitätsstatistik aus poll_run
q2 = f"""
SELECT SELECT
d.name, d.name,
COUNT(*) AS runs, COUNT(*) AS runs,
SUM(CASE WHEN pr.ok=1 THEN 1 ELSE 0 END) AS ok_runs, SUM(CASE WHEN pr.ok=1 THEN 1 ELSE 0 END) AS ok_runs,
SUM(CASE WHEN pr.ok=0 THEN 1 ELSE 0 END) AS failed_runs, SUM(CASE WHEN pr.ok=0 THEN 1 ELSE 0 END) AS failed_runs,
SUM(pr.outlets_received) AS outlets_received_total,
SUM(pr.outlets_filled) AS outlets_filled_total, SUM(pr.outlets_filled) AS outlets_filled_total,
SUM(pr.fields_filled) AS fields_filled_total, SUM(pr.fields_filled) AS fields_filled_total,
ROUND(AVG(pr.duration_ms), 1) AS avg_duration_ms ROUND(AVG(pr.duration_ms), 1) AS avg_duration_ms
@ -415,24 +496,48 @@ def report(con: sqlite3.Connection, from_iso: str, to_iso: str, device_name: Opt
GROUP BY d.name GROUP BY d.name
ORDER BY d.name ORDER BY d.name
""" """
cur = con.execute(q2, params) job_rows = con.execute(q_job, params).fetchall()
rows = cur.fetchall()
print("\nJob-Statistik:") md = []
print("device | runs | ok | failed | outlets_filled | fields_filled | avg_ms") md.append("# MMP Report")
for r in rows: md.append("")
print(" | ".join(str(x) for x in r)) md.append(f"- Zeitraum (UTC): **{from_iso}** .. **{to_iso}**")
print() if device_name:
md.append(f"- Device: **{device_name}**")
md.append("")
md.append("## Kostenstellen")
md.append("")
md.append("| device | code | name | samples | filled_samples | avg_W | approx_Wh |")
md.append("|---|---:|---|---:|---:|---:|---:|")
for device, code, name, samples, filled, avg_w, wh in cost_rows:
md.append(f"| {device} | {code} | {name} | {samples} | {filled} | {avg_w} | {wh} |")
md.append("")
md.append("## Job-Statistik")
md.append("")
md.append("| device | runs | ok | failed | outlets_total | outlets_filled | fields_filled | avg_ms |")
md.append("|---|---:|---:|---:|---:|---:|---:|---:|")
for device, runs, ok_runs, failed_runs, outlets_total, outlets_filled, fields_filled, avg_ms in job_rows:
md.append(f"| {device} | {runs} | {ok_runs} | {failed_runs} | {outlets_total} | {outlets_filled} | {fields_filled} | {avg_ms} |")
md.append("")
return "\n".join(md)
# -------------------------
# CLI
# -------------------------
def main(): def main():
ap = argparse.ArgumentParser(description="MMP (BayTech) ostatus Logger -> SQLite") ap = argparse.ArgumentParser(description="MMP ostatus Logger -> SQLite + Reports (md/html/pdf)")
ap.add_argument("--config", required=True, help="Pfad zur config.json") ap.add_argument("--config", required=True, help="Pfad zur config.json")
sub = ap.add_subparsers(dest="cmd", required=True) sub = ap.add_subparsers(dest="cmd", required=True)
sp_poll = sub.add_parser("poll", help="Pollt alle enabled Devices und schreibt Messwerte") sp_poll = sub.add_parser("poll", help="Pollt alle enabled Devices und schreibt Messwerte")
sp_poll.add_argument("--device", default=None, help="Optional: nur ein Device-Name") sp_poll.add_argument("--device", default=None, help="Optional: nur ein Device-Name")
sp_rep = sub.add_parser("report", help="Auswertung (weekly/monthly/yearly oder von-bis / last-days)") sp_rep = sub.add_parser("report", help="Auswertung (weekly/monthly/yearly oder from/to / last-days)")
sp_rep.add_argument("--device", default=None, help="Optional: nur ein Device-Name") sp_rep.add_argument("--device", default=None, help="Optional: nur ein Device-Name")
sp_rep.add_argument("--period", choices=["weekly", "monthly", "yearly"], default="weekly") sp_rep.add_argument("--period", choices=["weekly", "monthly", "yearly"], default="weekly")
sp_rep.add_argument("--last-days", type=int, default=None, help="Letzte N Tage") sp_rep.add_argument("--last-days", type=int, default=None, help="Letzte N Tage")
@ -440,13 +545,19 @@ def main():
sp_rep.add_argument("--to-iso", default=None, help="UTC ISO, z.B. 2026-02-08T00:00:00+00:00") sp_rep.add_argument("--to-iso", default=None, help="UTC ISO, z.B. 2026-02-08T00:00:00+00:00")
args = ap.parse_args() args = ap.parse_args()
cfg = load_json(args.config) cfg = load_json(args.config)
cfg_path = args.config proj_root = project_root_from_config(args.config)
db_path = resolve_path(cfg_path, cfg["db_path"])
cc_path = resolve_path(cfg_path, cfg["cost_center_map"]) # Projekt-Unterordner sicherstellen (alles im selben Verzeichnis)
safe_mkdir(proj_root, "data")
safe_mkdir(proj_root, "logs")
safe_mkdir(proj_root, "reports")
db_path = resolve_path(args.config, cfg["db_path"])
cc_path = resolve_path(args.config, cfg["cost_center_map"])
cc_map = load_json(cc_path) cc_map = load_json(cc_path)
cc_map = load_json(cfg["cost_center_map"])
db_path = cfg["db_path"]
con = db_connect(db_path) con = db_connect(db_path)
db_init(con) db_init(con)
@ -454,9 +565,16 @@ def main():
tel = cfg.get("telnet", {}) tel = cfg.get("telnet", {})
read_timeout = int(tel.get("read_timeout_sec", 25)) read_timeout = int(tel.get("read_timeout_sec", 25))
connect_timeout = int(tel.get("connect_timeout_sec", 10)) connect_timeout = int(tel.get("connect_timeout_sec", 10))
prompt_timeout = int(tel.get("prompt_timeout_sec", 25))
cmd = tel.get("command", "ostatus") cmd = tel.get("command", "ostatus")
devices = [] rep_cfg = cfg.get("report", {})
write_md = bool(rep_cfg.get("write_markdown", True))
write_html = bool(rep_cfg.get("write_html", True))
write_pdf = bool(rep_cfg.get("write_pdf", False))
name_prefix = rep_cfg.get("report_name_prefix", "report")
devices: List[DeviceCfg] = []
for d in cfg.get("devices", []): for d in cfg.get("devices", []):
devices.append(DeviceCfg( devices.append(DeviceCfg(
name=d["name"], name=d["name"],
@ -473,12 +591,29 @@ def main():
continue continue
if args.device and d.name != args.device: if args.device and d.name != args.device:
continue continue
poll_device(con, d, cc_map, cmd, connect_timeout, read_timeout) poll_device(con, d, cc_map, cmd, connect_timeout, prompt_timeout, read_timeout)
print("OK") print("OK")
elif args.cmd == "report": elif args.cmd == "report":
from_iso, to_iso = parse_period_args(args) from_iso, to_iso, suffix = parse_period_args(args)
report(con, from_iso, to_iso, args.device) md_text = report(con, from_iso, to_iso, args.device)
print(md_text)
# Dateiname: z.B. report_weekly, report_monthly, report_last14d, report_custom
name = f"{name_prefix}_{suffix}"
if args.device:
name = f"{name}_{args.device}"
write_report_files(
project_root=proj_root,
name=name,
md_text=md_text,
write_md=write_md,
write_html=write_html,
write_pdf=write_pdf
)
if __name__ == "__main__": if __name__ == "__main__":
main() main()