fix: AP direkt abfragen (cAP ax, lokale WiFi-Konfig)

This commit is contained in:
2026-05-20 13:06:14 +02:00
parent 0cb0d209f7
commit 6807115b5d
+61 -138
View File
@@ -1,192 +1,115 @@
#!/usr/bin/env python3
"""
HomeMatic Anwesenheitserkennung via MikroTik
============================================
Prüft ob iPhones im Netzwerk sichtbar sind und setzt HomeMatic-Systemvariablen.
Erkennungsmethoden:
1. WiFi Registration Table neue API (ROS 7.x, /interface/wifi/registration-table)
2. CAPsMAN Registration Table alte API (ROS 6/früh-7, /caps-man/registration-table)
Beide beziehen sich auf den CAPsMAN-Controller (Router), der den AP verwaltet.
3. ARP-Tabelle als Fallback (letzte bekannte IP/MAC-Zuordnung)
HomeMatic Anwesenheitserkennung MikroTik cAP ax (lokale WiFi-Konfig)
======================================================================
Erkennungsreihenfolge:
1. AP WiFi Registration Table (/interface/wifi/registration-table @ 192.168.252.253)
2. ARP-Tabelle am Router (/ip/arp @ 192.168.252.254)
Deployment: ebesch-docker (192.168.252.10), Cronjob jede Minute.
"""
import sys
import logging
import configparser
import argparse
import sys, logging, configparser, argparse
from pathlib import Path
import requests
from requests.auth import HTTPBasicAuth
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
log = logging.getLogger("presence")
CONFIG_FILE = Path(__file__).parent / "config.ini"
def load_config(path: Path) -> configparser.ConfigParser:
def load_config(path):
cfg = configparser.ConfigParser()
if not path.exists():
log.error(f"Konfigurationsdatei nicht gefunden: {path}")
sys.exit(1)
cfg.read(path)
return cfg
log.error(f"config.ini nicht gefunden: {path}"); sys.exit(1)
cfg.read(path); return cfg
# ---------------------------------------------------------------------------
class MikroTik:
def __init__(self, host: str, user: str, password: str, timeout: int = 8):
def __init__(self, host, user, password, timeout=8):
self.base = f"http://{host}/rest"
self.auth = HTTPBasicAuth(user, password)
self.timeout = timeout
def _get(self, path: str) -> list:
def _get(self, path):
try:
r = requests.get(
f"{self.base}{path}", auth=self.auth, timeout=self.timeout
)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
log.warning(f"MikroTik API Fehler ({path}): {e}")
return []
r = requests.get(f"{self.base}{path}", auth=self.auth, timeout=self.timeout)
r.raise_for_status(); return r.json()
except Exception as e:
log.warning(f"API Fehler {path}: {e}"); return []
def wifi_clients(self) -> set:
"""
Aktive WLAN-Clients vom CAPsMAN-Controller (Router .254).
Probiert zuerst die neue WiFi-API (ROS 7.x), dann die alte CAPsMAN-API.
"""
# ROS 7.x: neues WiFi-System
def wifi_clients(self):
"""Aktive WLAN-Clients vom AP (lokale Konfig, ROS 7.x WiFi-System)."""
data = self._get("/interface/wifi/registration-table")
if data:
macs = {e["mac-address"].upper() for e in data if "mac-address" in e}
log.debug(f"WiFi (neu): {len(macs)} aktive Clients")
return macs
# Fallback: altes CAPsMAN (ROS 6 / früh-7)
data = self._get("/caps-man/registration-table")
if data:
macs = {e["mac-address"].upper() for e in data if "mac-address" in e}
log.debug(f"CAPsMAN (alt): {len(macs)} aktive Clients")
return macs
log.debug("WiFi/CAPsMAN: keine Clients (oder API leer)")
return set()
def arp_macs(self) -> set:
"""MACs aus der ARP-Tabelle (bridge-Interface) als Fallback."""
data = self._get("/ip/arp")
macs = {
e["mac-address"].upper()
for e in data
if e.get("interface") == "bridge" and "mac-address" in e
}
log.debug(f"ARP: {len(macs)} Einträge")
macs = {e["mac-address"].upper() for e in data if "mac-address" in e}
log.debug(f"AP WiFi: {len(macs)} Clients")
return macs
def is_present(self, mac: str, wifi: set, arp: set) -> bool:
mac = mac.upper()
if mac in wifi:
log.info(f"{mac} im WLAN (aktiv verbunden via CAPsMAN-AP)")
return True
if mac in arp:
log.info(f"{mac} in ARP-Tabelle (kürzlich gesehen)")
return True
return False
def arp_macs(self):
"""ARP-Tabelle am Router als Fallback."""
data = self._get("/ip/arp")
macs = {e["mac-address"].upper() for e in data
if e.get("interface") == "bridge" and "mac-address" in e}
log.debug(f"Router ARP: {len(macs)} Einträge")
return macs
# ---------------------------------------------------------------------------
class HomeMatic:
def __init__(self, host: str, timeout: int = 8):
self.base = f"http://{host}"
self.timeout = timeout
def __init__(self, host, timeout=8):
self.base = f"http://{host}"; self.timeout = timeout
def set_variable(self, ise_id: str, value: bool) -> bool:
"""Setzt eine boolesche Systemvariable per XML-API (kein Login nötig)."""
val_str = "true" if value else "false"
def set_variable(self, ise_id, value):
val = "true" if value else "false"
try:
r = requests.get(
f"{self.base}/config/xmlapi/statechange.cgi",
params={"ise_id": ise_id, "new_val": val_str},
timeout=self.timeout,
)
r.raise_for_status()
log.debug(f" Variable {ise_id}{val_str}")
return True
except requests.RequestException as e:
log.warning(f"CCU2 statechange Fehler (ise_id={ise_id}): {e}")
return False
requests.get(f"{self.base}/config/xmlapi/statechange.cgi",
params={"ise_id": ise_id, "new_val": val}, timeout=self.timeout).raise_for_status()
log.debug(f" ISE {ise_id}{val}")
except Exception as e:
log.warning(f"CCU2 Fehler ise_id={ise_id}: {e}")
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="HomeMatic iPhone Anwesenheitserkennung")
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=Path, default=CONFIG_FILE)
parser.add_argument("--dry-run", action="store_true", help="Keine Änderungen schreiben")
parser.add_argument("--verbose", "-v", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
if args.verbose: log.setLevel(logging.DEBUG)
cfg = load_config(args.config)
mt = MikroTik(
host=cfg["mikrotik"]["host"],
user=cfg["mikrotik"]["user"],
password=cfg["mikrotik"]["password"],
)
hm = HomeMatic(host=cfg["homematic"]["host"])
ap = MikroTik(cfg["ap"]["host"], cfg["ap"]["user"], cfg["ap"]["password"])
rt = MikroTik(cfg["router"]["host"], cfg["router"]["user"], cfg["router"]["password"])
hm = HomeMatic(cfg["homematic"]["host"])
devices = {
name[len("device."):]: {
"mac": cfg[name]["mac"].upper(),
"ise_id": cfg[name]["ise_id"],
"label": cfg[name].get("label", name[len("device."):]),
}
for name in cfg.sections() if name.startswith("device.")
s[len("device."):]: {"mac": cfg[s]["mac"].upper(),
"ise_id": cfg[s]["ise_id"],
"label": cfg[s].get("label", s)}
for s in cfg.sections() if s.startswith("device.")
}
if not devices:
log.error("Keine Geräte in config.ini definiert!")
sys.exit(1)
if not devices: log.error("Keine Geräte!"); sys.exit(1)
log.info(f"Prüfe {len(devices)} Gerät(e)…")
wifi = mt.wifi_clients()
arp = mt.arp_macs()
wifi = ap.wifi_clients()
arp = rt.arp_macs()
results = {}
for name, dev in devices.items():
present = mt.is_present(dev["mac"], wifi, arp)
mac = dev["mac"]
present = mac in wifi or mac in arp
results[name] = present
status = "✓ zu Hause " if present else "✗ außer Haus"
log.info(f" {dev['label']:25} [{dev['mac']}] {status}")
src = "WLAN" if mac in wifi else ("ARP" if mac in arp else "")
log.info(f" {dev['label']:25} {mac} {'✓ zu Hause ' if present else '✗ außer Haus'} [{src}]")
if not args.dry_run:
for name, dev in devices.items():
hm.set_variable(dev["ise_id"], results[name])
combined_ise_id = cfg.get("homematic", "combined_ise_id", fallback=None)
if combined_ise_id:
anyone_home = any(results.values())
hm.set_variable(combined_ise_id, anyone_home)
status = "✓ jemand zuhause" if anyone_home else "✗ niemand zuhause"
log.info(f" {'Anwesenheit (gesamt)':25} {status}")
comb = cfg.get("homematic", "combined_ise_id", fallback=None)
if comb:
home = any(results.values())
hm.set_variable(comb, home)
log.info(f" {'Anwesenheit':25} {'✓ jemand da' if home else '✗ niemand da'}")
else:
log.info("DRY-RUN: Keine Änderungen geschrieben.")
log.info("DRY-RUN nichts geschrieben.")
log.info("Fertig.")
if __name__ == "__main__":
main()