From 0cb0d209f7d3e858905cd0276b407e3e066cf3f5 Mon Sep 17 00:00:00 2001 From: Ralf-Peter Wolff Date: Wed, 20 May 2026 13:02:26 +0200 Subject: [PATCH] fix: beide WiFi-API-Pfade (ROS 7.x neu + CAPsMAN-Fallback) --- presence_check.py | 150 +++++++++++++++++----------------------------- 1 file changed, 56 insertions(+), 94 deletions(-) diff --git a/presence_check.py b/presence_check.py index 0fdc22e..40b05a8 100644 --- a/presence_check.py +++ b/presence_check.py @@ -1,29 +1,27 @@ #!/usr/bin/env python3 """ -HomeMatic Anwesenheitserkennung via MikroTik CAPsMAN -==================================================== -Prüft ob iPhones im WLAN eingebucht sind und setzt HomeMatic-Systemvariablen. +HomeMatic Anwesenheitserkennung via MikroTik +============================================ +Prüft ob iPhones im Netzwerk sichtbar sind und setzt HomeMatic-Systemvariablen. -Erkennungsmethoden (in dieser Reihenfolge): - 1. CAPsMAN Registration Table (aktiv verbundene WLAN-Clients) – primär - 2. ARP-Tabelle (letzte bekannte IP/MAC-Zuordnung) – Fallback +Erkennungsmethoden: + 1. WiFi Registration Table – neue API (ROS 7.x, /interface/wifi/registration-table) + 2. CAPsMAN Registration Table – alte API (ROS 6/früh-7, /caps-man/registration-table) + Beide beziehen sich auf den CAPsMAN-Controller (Router), der den AP verwaltet. + 3. ARP-Tabelle als Fallback (letzte bekannte IP/MAC-Zuordnung) -Deployment: ebesch-docker (192.168.252.10) als Cronjob jede Minute. +Deployment: ebesch-docker (192.168.252.10), Cronjob jede Minute. """ import sys -import time import logging import configparser import argparse from pathlib import Path -from datetime import datetime import requests from requests.auth import HTTPBasicAuth -# --------------------------------------------------------------------------- -# Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, @@ -32,9 +30,6 @@ logging.basicConfig( ) log = logging.getLogger("presence") -# --------------------------------------------------------------------------- -# Konfiguration laden -# --------------------------------------------------------------------------- CONFIG_FILE = Path(__file__).parent / "config.ini" @@ -47,8 +42,6 @@ def load_config(path: Path) -> configparser.ConfigParser: return cfg -# --------------------------------------------------------------------------- -# MikroTik REST API # --------------------------------------------------------------------------- class MikroTik: def __init__(self, host: str, user: str, password: str, timeout: int = 8): @@ -59,9 +52,7 @@ class MikroTik: def _get(self, path: str) -> list: try: r = requests.get( - f"{self.base}{path}", - auth=self.auth, - timeout=self.timeout, + f"{self.base}{path}", auth=self.auth, timeout=self.timeout ) r.raise_for_status() return r.json() @@ -69,75 +60,55 @@ class MikroTik: log.warning(f"MikroTik API Fehler ({path}): {e}") return [] - def capsman_clients(self) -> set: - """Gibt alle aktuell per WLAN verbundenen MAC-Adressen zurück (CAPsMAN).""" + def wifi_clients(self) -> set: + """ + Aktive WLAN-Clients vom CAPsMAN-Controller (Router .254). + Probiert zuerst die neue WiFi-API (ROS 7.x), dann die alte CAPsMAN-API. + """ + # ROS 7.x: neues WiFi-System + data = self._get("/interface/wifi/registration-table") + if data: + macs = {e["mac-address"].upper() for e in data if "mac-address" in e} + log.debug(f"WiFi (neu): {len(macs)} aktive Clients") + return macs + + # Fallback: altes CAPsMAN (ROS 6 / früh-7) data = self._get("/caps-man/registration-table") - macs = {entry["mac-address"].upper() for entry in data if "mac-address" in entry} - log.debug(f"CAPsMAN: {len(macs)} aktive Clients") - return macs + if data: + macs = {e["mac-address"].upper() for e in data if "mac-address" in e} + log.debug(f"CAPsMAN (alt): {len(macs)} aktive Clients") + return macs + + log.debug("WiFi/CAPsMAN: keine Clients (oder API leer)") + return set() def arp_macs(self) -> set: - """Gibt alle MACs aus der ARP-Tabelle zurück (bridge-Interface).""" + """MACs aus der ARP-Tabelle (bridge-Interface) als Fallback.""" data = self._get("/ip/arp") macs = { - entry["mac-address"].upper() - for entry in data - if entry.get("interface") == "bridge" and "mac-address" in entry + e["mac-address"].upper() + for e in data + if e.get("interface") == "bridge" and "mac-address" in e } log.debug(f"ARP: {len(macs)} Einträge") return macs - def is_present(self, mac: str, capsman: set, arp: set) -> bool: - """Prüft ob eine MAC-Adresse im Netzwerk sichtbar ist.""" + def is_present(self, mac: str, wifi: set, arp: set) -> bool: mac = mac.upper() - if mac in capsman: - log.info(f" → {mac} in CAPsMAN (aktiv verbunden)") + if mac in wifi: + log.info(f" → {mac} im WLAN (aktiv verbunden via CAPsMAN-AP)") return True if mac in arp: - log.info(f" → {mac} in ARP-Tabelle (kürzlich gesehen)") + log.info(f" → {mac} in ARP-Tabelle (kürzlich gesehen)") return True return False -# --------------------------------------------------------------------------- -# HomeMatic CCU2 XML-API # --------------------------------------------------------------------------- class HomeMatic: def __init__(self, host: str, timeout: int = 8): self.base = f"http://{host}" self.timeout = timeout - self._session: str | None = None - - def _rpc(self, method: str, params: dict, session_id: str = None) -> dict: - """HomeMatic JSON-RPC Call.""" - if session_id: - params["_session_id_"] = session_id - payload = {"method": method, "params": params, "id": 1, "version": "1.1"} - try: - r = requests.post( - f"{self.base}/api/homematic.cgi", - json=payload, - timeout=self.timeout, - ) - r.raise_for_status() - return r.json() - except requests.RequestException as e: - log.warning(f"CCU2 RPC Fehler ({method}): {e}") - return {} - - def login(self, username: str, password: str) -> bool: - resp = self._rpc("Session.login", {"username": username, "password": password}) - if resp.get("result"): - self._session = resp["result"] - log.debug(f"CCU2 Login OK, Session: {self._session}") - return True - log.error(f"CCU2 Login fehlgeschlagen: {resp.get('error')}") - return False - - def logout(self): - if self._session: - self._rpc("Session.logout", {}, session_id=self._session) - self._session = None def set_variable(self, ise_id: str, value: bool) -> bool: """Setzt eine boolesche Systemvariable per XML-API (kein Login nötig).""" @@ -149,21 +120,19 @@ class HomeMatic: timeout=self.timeout, ) r.raise_for_status() - log.debug(f" Variable {ise_id} → {val_str} gesetzt") + log.debug(f" Variable {ise_id} → {val_str}") return True except requests.RequestException as e: log.warning(f"CCU2 statechange Fehler (ise_id={ise_id}): {e}") return False -# --------------------------------------------------------------------------- -# Hauptprogramm # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="HomeMatic iPhone Anwesenheitserkennung") parser.add_argument("--config", type=Path, default=CONFIG_FILE) parser.add_argument("--dry-run", action="store_true", help="Keine Änderungen schreiben") - parser.add_argument("--verbose", action="store_true") + parser.add_argument("--verbose", "-v", action="store_true") args = parser.parse_args() if args.verbose: @@ -171,55 +140,48 @@ def main(): cfg = load_config(args.config) - # MikroTik mt = MikroTik( host=cfg["mikrotik"]["host"], user=cfg["mikrotik"]["user"], password=cfg["mikrotik"]["password"], ) - - # HomeMatic hm = HomeMatic(host=cfg["homematic"]["host"]) - # Gerätedefinitionen aus Config - devices = {} - for section in cfg.sections(): - if section.startswith("device."): - name = section[len("device."):] - devices[name] = { - "mac": cfg[section]["mac"].upper(), - "ise_id": cfg[section]["ise_id"], - "label": cfg[section].get("label", name), - } + devices = { + name[len("device."):]: { + "mac": cfg[name]["mac"].upper(), + "ise_id": cfg[name]["ise_id"], + "label": cfg[name].get("label", name[len("device."):]), + } + for name in cfg.sections() if name.startswith("device.") + } if not devices: log.error("Keine Geräte in config.ini definiert!") sys.exit(1) - log.info(f"Prüfe {len(devices)} Gerät(e)...") + log.info(f"Prüfe {len(devices)} Gerät(e)…") - # Netzwerkdaten einmalig holen - capsman = mt.capsman_clients() - arp = mt.arp_macs() + wifi = mt.wifi_clients() + arp = mt.arp_macs() results = {} for name, dev in devices.items(): - present = mt.is_present(dev["mac"], capsman, arp) + present = mt.is_present(dev["mac"], wifi, arp) results[name] = present - status = "✓ zu Hause" if present else "✗ außer Haus" - log.info(f" {dev['label']:25} [{dev['mac']}] → {status}") + status = "✓ zu Hause " if present else "✗ außer Haus" + log.info(f" {dev['label']:25} [{dev['mac']}] {status}") - # HomeMatic-Variablen setzen if not args.dry_run: for name, dev in devices.items(): hm.set_variable(dev["ise_id"], results[name]) - # Kombinierte Anwesenheit (true wenn mind. 1 Person zuhause) combined_ise_id = cfg.get("homematic", "combined_ise_id", fallback=None) if combined_ise_id: anyone_home = any(results.values()) hm.set_variable(combined_ise_id, anyone_home) - log.info(f" {'Anwesenheit (gesamt)':25} → {'✓ jemand zuhause' if anyone_home else '✗ niemand zuhause'}") + status = "✓ jemand zuhause" if anyone_home else "✗ niemand zuhause" + log.info(f" {'Anwesenheit (gesamt)':25} {status}") else: log.info("DRY-RUN: Keine Änderungen geschrieben.")