From 2a1849aa194af7806f36af25b434b0b4b8789bbe Mon Sep 17 00:00:00 2001 From: Ralf-Peter Wolff Date: Wed, 20 May 2026 12:59:55 +0200 Subject: [PATCH] feat: Anwesenheitserkennungs-Script via CAPsMAN + ARP --- presence_check.py | 230 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 presence_check.py diff --git a/presence_check.py b/presence_check.py new file mode 100644 index 0000000..0fdc22e --- /dev/null +++ b/presence_check.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +""" +HomeMatic Anwesenheitserkennung via MikroTik CAPsMAN +==================================================== +Prüft ob iPhones im WLAN eingebucht sind und setzt HomeMatic-Systemvariablen. + +Erkennungsmethoden (in dieser Reihenfolge): + 1. CAPsMAN Registration Table (aktiv verbundene WLAN-Clients) – primär + 2. ARP-Tabelle (letzte bekannte IP/MAC-Zuordnung) – Fallback + +Deployment: ebesch-docker (192.168.252.10) als Cronjob jede Minute. +""" + +import sys +import time +import logging +import configparser +import argparse +from pathlib import Path +from datetime import datetime + +import requests +from requests.auth import HTTPBasicAuth + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("presence") + +# --------------------------------------------------------------------------- +# Konfiguration laden +# --------------------------------------------------------------------------- +CONFIG_FILE = Path(__file__).parent / "config.ini" + + +def load_config(path: Path) -> configparser.ConfigParser: + cfg = configparser.ConfigParser() + if not path.exists(): + log.error(f"Konfigurationsdatei nicht gefunden: {path}") + sys.exit(1) + cfg.read(path) + return cfg + + +# --------------------------------------------------------------------------- +# MikroTik REST API +# --------------------------------------------------------------------------- +class MikroTik: + def __init__(self, host: str, user: str, password: str, timeout: int = 8): + self.base = f"http://{host}/rest" + self.auth = HTTPBasicAuth(user, password) + self.timeout = timeout + + def _get(self, path: str) -> list: + try: + r = requests.get( + f"{self.base}{path}", + auth=self.auth, + timeout=self.timeout, + ) + r.raise_for_status() + return r.json() + except requests.RequestException as e: + log.warning(f"MikroTik API Fehler ({path}): {e}") + return [] + + def capsman_clients(self) -> set: + """Gibt alle aktuell per WLAN verbundenen MAC-Adressen zurück (CAPsMAN).""" + data = self._get("/caps-man/registration-table") + macs = {entry["mac-address"].upper() for entry in data if "mac-address" in entry} + log.debug(f"CAPsMAN: {len(macs)} aktive Clients") + return macs + + def arp_macs(self) -> set: + """Gibt alle MACs aus der ARP-Tabelle zurück (bridge-Interface).""" + data = self._get("/ip/arp") + macs = { + entry["mac-address"].upper() + for entry in data + if entry.get("interface") == "bridge" and "mac-address" in entry + } + log.debug(f"ARP: {len(macs)} Einträge") + return macs + + def is_present(self, mac: str, capsman: set, arp: set) -> bool: + """Prüft ob eine MAC-Adresse im Netzwerk sichtbar ist.""" + mac = mac.upper() + if mac in capsman: + log.info(f" → {mac} in CAPsMAN (aktiv verbunden)") + return True + if mac in arp: + log.info(f" → {mac} in ARP-Tabelle (kürzlich gesehen)") + return True + return False + + +# --------------------------------------------------------------------------- +# HomeMatic CCU2 XML-API +# --------------------------------------------------------------------------- +class HomeMatic: + def __init__(self, host: str, timeout: int = 8): + self.base = f"http://{host}" + self.timeout = timeout + self._session: str | None = None + + def _rpc(self, method: str, params: dict, session_id: str = None) -> dict: + """HomeMatic JSON-RPC Call.""" + if session_id: + params["_session_id_"] = session_id + payload = {"method": method, "params": params, "id": 1, "version": "1.1"} + try: + r = requests.post( + f"{self.base}/api/homematic.cgi", + json=payload, + timeout=self.timeout, + ) + r.raise_for_status() + return r.json() + except requests.RequestException as e: + log.warning(f"CCU2 RPC Fehler ({method}): {e}") + return {} + + def login(self, username: str, password: str) -> bool: + resp = self._rpc("Session.login", {"username": username, "password": password}) + if resp.get("result"): + self._session = resp["result"] + log.debug(f"CCU2 Login OK, Session: {self._session}") + return True + log.error(f"CCU2 Login fehlgeschlagen: {resp.get('error')}") + return False + + def logout(self): + if self._session: + self._rpc("Session.logout", {}, session_id=self._session) + self._session = None + + def set_variable(self, ise_id: str, value: bool) -> bool: + """Setzt eine boolesche Systemvariable per XML-API (kein Login nötig).""" + val_str = "true" if value else "false" + try: + r = requests.get( + f"{self.base}/config/xmlapi/statechange.cgi", + params={"ise_id": ise_id, "new_val": val_str}, + timeout=self.timeout, + ) + r.raise_for_status() + log.debug(f" Variable {ise_id} → {val_str} gesetzt") + return True + except requests.RequestException as e: + log.warning(f"CCU2 statechange Fehler (ise_id={ise_id}): {e}") + return False + + +# --------------------------------------------------------------------------- +# Hauptprogramm +# --------------------------------------------------------------------------- +def main(): + parser = argparse.ArgumentParser(description="HomeMatic iPhone Anwesenheitserkennung") + parser.add_argument("--config", type=Path, default=CONFIG_FILE) + parser.add_argument("--dry-run", action="store_true", help="Keine Änderungen schreiben") + parser.add_argument("--verbose", action="store_true") + args = parser.parse_args() + + if args.verbose: + log.setLevel(logging.DEBUG) + + cfg = load_config(args.config) + + # MikroTik + mt = MikroTik( + host=cfg["mikrotik"]["host"], + user=cfg["mikrotik"]["user"], + password=cfg["mikrotik"]["password"], + ) + + # HomeMatic + hm = HomeMatic(host=cfg["homematic"]["host"]) + + # Gerätedefinitionen aus Config + devices = {} + for section in cfg.sections(): + if section.startswith("device."): + name = section[len("device."):] + devices[name] = { + "mac": cfg[section]["mac"].upper(), + "ise_id": cfg[section]["ise_id"], + "label": cfg[section].get("label", name), + } + + if not devices: + log.error("Keine Geräte in config.ini definiert!") + sys.exit(1) + + log.info(f"Prüfe {len(devices)} Gerät(e)...") + + # Netzwerkdaten einmalig holen + capsman = mt.capsman_clients() + arp = mt.arp_macs() + + results = {} + for name, dev in devices.items(): + present = mt.is_present(dev["mac"], capsman, arp) + results[name] = present + status = "✓ zu Hause" if present else "✗ außer Haus" + log.info(f" {dev['label']:25} [{dev['mac']}] → {status}") + + # HomeMatic-Variablen setzen + if not args.dry_run: + for name, dev in devices.items(): + hm.set_variable(dev["ise_id"], results[name]) + + # Kombinierte Anwesenheit (true wenn mind. 1 Person zuhause) + combined_ise_id = cfg.get("homematic", "combined_ise_id", fallback=None) + if combined_ise_id: + anyone_home = any(results.values()) + hm.set_variable(combined_ise_id, anyone_home) + log.info(f" {'Anwesenheit (gesamt)':25} → {'✓ jemand zuhause' if anyone_home else '✗ niemand zuhause'}") + else: + log.info("DRY-RUN: Keine Änderungen geschrieben.") + + log.info("Fertig.") + + +if __name__ == "__main__": + main()