diff --git a/presence_check.py b/presence_check.py index 40b05a8..fac78ac 100644 --- a/presence_check.py +++ b/presence_check.py @@ -1,192 +1,115 @@ #!/usr/bin/env python3 """ -HomeMatic Anwesenheitserkennung via MikroTik -============================================ -Prüft ob iPhones im Netzwerk sichtbar sind und setzt HomeMatic-Systemvariablen. - -Erkennungsmethoden: - 1. WiFi Registration Table – neue API (ROS 7.x, /interface/wifi/registration-table) - 2. CAPsMAN Registration Table – alte API (ROS 6/früh-7, /caps-man/registration-table) - Beide beziehen sich auf den CAPsMAN-Controller (Router), der den AP verwaltet. - 3. ARP-Tabelle als Fallback (letzte bekannte IP/MAC-Zuordnung) +HomeMatic Anwesenheitserkennung – MikroTik cAP ax (lokale WiFi-Konfig) +====================================================================== +Erkennungsreihenfolge: + 1. AP WiFi Registration Table (/interface/wifi/registration-table @ 192.168.252.253) + 2. ARP-Tabelle am Router (/ip/arp @ 192.168.252.254) Deployment: ebesch-docker (192.168.252.10), Cronjob jede Minute. """ - -import sys -import logging -import configparser -import argparse +import sys, logging, configparser, argparse from pathlib import Path - import requests from requests.auth import HTTPBasicAuth -# --------------------------------------------------------------------------- -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", -) +logging.basicConfig(level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") log = logging.getLogger("presence") - CONFIG_FILE = Path(__file__).parent / "config.ini" - -def load_config(path: Path) -> configparser.ConfigParser: +def load_config(path): cfg = configparser.ConfigParser() if not path.exists(): - log.error(f"Konfigurationsdatei nicht gefunden: {path}") - sys.exit(1) - cfg.read(path) - return cfg + log.error(f"config.ini nicht gefunden: {path}"); sys.exit(1) + cfg.read(path); return cfg - -# --------------------------------------------------------------------------- class MikroTik: - def __init__(self, host: str, user: str, password: str, timeout: int = 8): + def __init__(self, host, user, password, timeout=8): self.base = f"http://{host}/rest" self.auth = HTTPBasicAuth(user, password) self.timeout = timeout - def _get(self, path: str) -> list: + def _get(self, path): try: - r = requests.get( - f"{self.base}{path}", auth=self.auth, timeout=self.timeout - ) - r.raise_for_status() - return r.json() - except requests.RequestException as e: - log.warning(f"MikroTik API Fehler ({path}): {e}") - return [] + r = requests.get(f"{self.base}{path}", auth=self.auth, timeout=self.timeout) + r.raise_for_status(); return r.json() + except Exception as e: + log.warning(f"API Fehler {path}: {e}"); return [] - def wifi_clients(self) -> set: - """ - Aktive WLAN-Clients vom CAPsMAN-Controller (Router .254). - Probiert zuerst die neue WiFi-API (ROS 7.x), dann die alte CAPsMAN-API. - """ - # ROS 7.x: neues WiFi-System + def wifi_clients(self): + """Aktive WLAN-Clients vom AP (lokale Konfig, ROS 7.x WiFi-System).""" data = self._get("/interface/wifi/registration-table") - if data: - macs = {e["mac-address"].upper() for e in data if "mac-address" in e} - log.debug(f"WiFi (neu): {len(macs)} aktive Clients") - return macs - - # Fallback: altes CAPsMAN (ROS 6 / früh-7) - data = self._get("/caps-man/registration-table") - if data: - macs = {e["mac-address"].upper() for e in data if "mac-address" in e} - log.debug(f"CAPsMAN (alt): {len(macs)} aktive Clients") - return macs - - log.debug("WiFi/CAPsMAN: keine Clients (oder API leer)") - return set() - - def arp_macs(self) -> set: - """MACs aus der ARP-Tabelle (bridge-Interface) als Fallback.""" - data = self._get("/ip/arp") - macs = { - e["mac-address"].upper() - for e in data - if e.get("interface") == "bridge" and "mac-address" in e - } - log.debug(f"ARP: {len(macs)} Einträge") + macs = {e["mac-address"].upper() for e in data if "mac-address" in e} + log.debug(f"AP WiFi: {len(macs)} Clients") return macs - def is_present(self, mac: str, wifi: set, arp: set) -> bool: - mac = mac.upper() - if mac in wifi: - log.info(f" → {mac} im WLAN (aktiv verbunden via CAPsMAN-AP)") - return True - if mac in arp: - log.info(f" → {mac} in ARP-Tabelle (kürzlich gesehen)") - return True - return False + def arp_macs(self): + """ARP-Tabelle am Router als Fallback.""" + data = self._get("/ip/arp") + macs = {e["mac-address"].upper() for e in data + if e.get("interface") == "bridge" and "mac-address" in e} + log.debug(f"Router ARP: {len(macs)} Einträge") + return macs - -# --------------------------------------------------------------------------- class HomeMatic: - def __init__(self, host: str, timeout: int = 8): - self.base = f"http://{host}" - self.timeout = timeout + def __init__(self, host, timeout=8): + self.base = f"http://{host}"; self.timeout = timeout - def set_variable(self, ise_id: str, value: bool) -> bool: - """Setzt eine boolesche Systemvariable per XML-API (kein Login nötig).""" - val_str = "true" if value else "false" + def set_variable(self, ise_id, value): + val = "true" if value else "false" try: - r = requests.get( - f"{self.base}/config/xmlapi/statechange.cgi", - params={"ise_id": ise_id, "new_val": val_str}, - timeout=self.timeout, - ) - r.raise_for_status() - log.debug(f" Variable {ise_id} → {val_str}") - return True - except requests.RequestException as e: - log.warning(f"CCU2 statechange Fehler (ise_id={ise_id}): {e}") - return False + requests.get(f"{self.base}/config/xmlapi/statechange.cgi", + params={"ise_id": ise_id, "new_val": val}, timeout=self.timeout).raise_for_status() + log.debug(f" ISE {ise_id} → {val}") + except Exception as e: + log.warning(f"CCU2 Fehler ise_id={ise_id}: {e}") - -# --------------------------------------------------------------------------- def main(): - parser = argparse.ArgumentParser(description="HomeMatic iPhone Anwesenheitserkennung") + parser = argparse.ArgumentParser() parser.add_argument("--config", type=Path, default=CONFIG_FILE) - parser.add_argument("--dry-run", action="store_true", help="Keine Änderungen schreiben") - parser.add_argument("--verbose", "-v", action="store_true") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("-v", "--verbose", action="store_true") args = parser.parse_args() - - if args.verbose: - log.setLevel(logging.DEBUG) + if args.verbose: log.setLevel(logging.DEBUG) cfg = load_config(args.config) - mt = MikroTik( - host=cfg["mikrotik"]["host"], - user=cfg["mikrotik"]["user"], - password=cfg["mikrotik"]["password"], - ) - hm = HomeMatic(host=cfg["homematic"]["host"]) + ap = MikroTik(cfg["ap"]["host"], cfg["ap"]["user"], cfg["ap"]["password"]) + rt = MikroTik(cfg["router"]["host"], cfg["router"]["user"], cfg["router"]["password"]) + hm = HomeMatic(cfg["homematic"]["host"]) devices = { - name[len("device."):]: { - "mac": cfg[name]["mac"].upper(), - "ise_id": cfg[name]["ise_id"], - "label": cfg[name].get("label", name[len("device."):]), - } - for name in cfg.sections() if name.startswith("device.") + s[len("device."):]: {"mac": cfg[s]["mac"].upper(), + "ise_id": cfg[s]["ise_id"], + "label": cfg[s].get("label", s)} + for s in cfg.sections() if s.startswith("device.") } - - if not devices: - log.error("Keine Geräte in config.ini definiert!") - sys.exit(1) + if not devices: log.error("Keine Geräte!"); sys.exit(1) log.info(f"Prüfe {len(devices)} Gerät(e)…") - - wifi = mt.wifi_clients() - arp = mt.arp_macs() + wifi = ap.wifi_clients() + arp = rt.arp_macs() results = {} for name, dev in devices.items(): - present = mt.is_present(dev["mac"], wifi, arp) + mac = dev["mac"] + present = mac in wifi or mac in arp results[name] = present - status = "✓ zu Hause " if present else "✗ außer Haus" - log.info(f" {dev['label']:25} [{dev['mac']}] {status}") + src = "WLAN" if mac in wifi else ("ARP" if mac in arp else "–") + log.info(f" {dev['label']:25} {mac} {'✓ zu Hause ' if present else '✗ außer Haus'} [{src}]") if not args.dry_run: for name, dev in devices.items(): hm.set_variable(dev["ise_id"], results[name]) - - combined_ise_id = cfg.get("homematic", "combined_ise_id", fallback=None) - if combined_ise_id: - anyone_home = any(results.values()) - hm.set_variable(combined_ise_id, anyone_home) - status = "✓ jemand zuhause" if anyone_home else "✗ niemand zuhause" - log.info(f" {'Anwesenheit (gesamt)':25} {status}") + comb = cfg.get("homematic", "combined_ise_id", fallback=None) + if comb: + home = any(results.values()) + hm.set_variable(comb, home) + log.info(f" {'Anwesenheit':25} {'✓ jemand da' if home else '✗ niemand da'}") else: - log.info("DRY-RUN: Keine Änderungen geschrieben.") - + log.info("DRY-RUN – nichts geschrieben.") log.info("Fertig.") - if __name__ == "__main__": main()