feat: Anwesenheitserkennungs-Script via CAPsMAN + ARP

This commit is contained in:
2026-05-20 12:59:55 +02:00
parent c6ffd9c26a
commit 2a1849aa19
+230
View File
@@ -0,0 +1,230 @@
#!/usr/bin/env python3
"""
HomeMatic Anwesenheitserkennung via MikroTik CAPsMAN
====================================================
Prüft ob iPhones im WLAN eingebucht sind und setzt HomeMatic-Systemvariablen.
Erkennungsmethoden (in dieser Reihenfolge):
1. CAPsMAN Registration Table (aktiv verbundene WLAN-Clients) primär
2. ARP-Tabelle (letzte bekannte IP/MAC-Zuordnung) Fallback
Deployment: ebesch-docker (192.168.252.10) als Cronjob jede Minute.
"""
import sys
import time
import logging
import configparser
import argparse
from pathlib import Path
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("presence")
# ---------------------------------------------------------------------------
# Konfiguration laden
# ---------------------------------------------------------------------------
CONFIG_FILE = Path(__file__).parent / "config.ini"
def load_config(path: Path) -> configparser.ConfigParser:
cfg = configparser.ConfigParser()
if not path.exists():
log.error(f"Konfigurationsdatei nicht gefunden: {path}")
sys.exit(1)
cfg.read(path)
return cfg
# ---------------------------------------------------------------------------
# MikroTik REST API
# ---------------------------------------------------------------------------
class MikroTik:
def __init__(self, host: str, user: str, password: str, timeout: int = 8):
self.base = f"http://{host}/rest"
self.auth = HTTPBasicAuth(user, password)
self.timeout = timeout
def _get(self, path: str) -> list:
try:
r = requests.get(
f"{self.base}{path}",
auth=self.auth,
timeout=self.timeout,
)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
log.warning(f"MikroTik API Fehler ({path}): {e}")
return []
def capsman_clients(self) -> set:
"""Gibt alle aktuell per WLAN verbundenen MAC-Adressen zurück (CAPsMAN)."""
data = self._get("/caps-man/registration-table")
macs = {entry["mac-address"].upper() for entry in data if "mac-address" in entry}
log.debug(f"CAPsMAN: {len(macs)} aktive Clients")
return macs
def arp_macs(self) -> set:
"""Gibt alle MACs aus der ARP-Tabelle zurück (bridge-Interface)."""
data = self._get("/ip/arp")
macs = {
entry["mac-address"].upper()
for entry in data
if entry.get("interface") == "bridge" and "mac-address" in entry
}
log.debug(f"ARP: {len(macs)} Einträge")
return macs
def is_present(self, mac: str, capsman: set, arp: set) -> bool:
"""Prüft ob eine MAC-Adresse im Netzwerk sichtbar ist."""
mac = mac.upper()
if mac in capsman:
log.info(f"{mac} in CAPsMAN (aktiv verbunden)")
return True
if mac in arp:
log.info(f"{mac} in ARP-Tabelle (kürzlich gesehen)")
return True
return False
# ---------------------------------------------------------------------------
# HomeMatic CCU2 XML-API
# ---------------------------------------------------------------------------
class HomeMatic:
def __init__(self, host: str, timeout: int = 8):
self.base = f"http://{host}"
self.timeout = timeout
self._session: str | None = None
def _rpc(self, method: str, params: dict, session_id: str = None) -> dict:
"""HomeMatic JSON-RPC Call."""
if session_id:
params["_session_id_"] = session_id
payload = {"method": method, "params": params, "id": 1, "version": "1.1"}
try:
r = requests.post(
f"{self.base}/api/homematic.cgi",
json=payload,
timeout=self.timeout,
)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
log.warning(f"CCU2 RPC Fehler ({method}): {e}")
return {}
def login(self, username: str, password: str) -> bool:
resp = self._rpc("Session.login", {"username": username, "password": password})
if resp.get("result"):
self._session = resp["result"]
log.debug(f"CCU2 Login OK, Session: {self._session}")
return True
log.error(f"CCU2 Login fehlgeschlagen: {resp.get('error')}")
return False
def logout(self):
if self._session:
self._rpc("Session.logout", {}, session_id=self._session)
self._session = None
def set_variable(self, ise_id: str, value: bool) -> bool:
"""Setzt eine boolesche Systemvariable per XML-API (kein Login nötig)."""
val_str = "true" if value else "false"
try:
r = requests.get(
f"{self.base}/config/xmlapi/statechange.cgi",
params={"ise_id": ise_id, "new_val": val_str},
timeout=self.timeout,
)
r.raise_for_status()
log.debug(f" Variable {ise_id}{val_str} gesetzt")
return True
except requests.RequestException as e:
log.warning(f"CCU2 statechange Fehler (ise_id={ise_id}): {e}")
return False
# ---------------------------------------------------------------------------
# Hauptprogramm
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="HomeMatic iPhone Anwesenheitserkennung")
parser.add_argument("--config", type=Path, default=CONFIG_FILE)
parser.add_argument("--dry-run", action="store_true", help="Keine Änderungen schreiben")
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
cfg = load_config(args.config)
# MikroTik
mt = MikroTik(
host=cfg["mikrotik"]["host"],
user=cfg["mikrotik"]["user"],
password=cfg["mikrotik"]["password"],
)
# HomeMatic
hm = HomeMatic(host=cfg["homematic"]["host"])
# Gerätedefinitionen aus Config
devices = {}
for section in cfg.sections():
if section.startswith("device."):
name = section[len("device."):]
devices[name] = {
"mac": cfg[section]["mac"].upper(),
"ise_id": cfg[section]["ise_id"],
"label": cfg[section].get("label", name),
}
if not devices:
log.error("Keine Geräte in config.ini definiert!")
sys.exit(1)
log.info(f"Prüfe {len(devices)} Gerät(e)...")
# Netzwerkdaten einmalig holen
capsman = mt.capsman_clients()
arp = mt.arp_macs()
results = {}
for name, dev in devices.items():
present = mt.is_present(dev["mac"], capsman, arp)
results[name] = present
status = "✓ zu Hause" if present else "✗ außer Haus"
log.info(f" {dev['label']:25} [{dev['mac']}] → {status}")
# HomeMatic-Variablen setzen
if not args.dry_run:
for name, dev in devices.items():
hm.set_variable(dev["ise_id"], results[name])
# Kombinierte Anwesenheit (true wenn mind. 1 Person zuhause)
combined_ise_id = cfg.get("homematic", "combined_ise_id", fallback=None)
if combined_ise_id:
anyone_home = any(results.values())
hm.set_variable(combined_ise_id, anyone_home)
log.info(f" {'Anwesenheit (gesamt)':25}{'✓ jemand zuhause' if anyone_home else '✗ niemand zuhause'}")
else:
log.info("DRY-RUN: Keine Änderungen geschrieben.")
log.info("Fertig.")
if __name__ == "__main__":
main()