Files
homematic-anwesenheit/presence_check.py
T

193 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
HomeMatic Anwesenheitserkennung via MikroTik
============================================
Prüft ob iPhones im Netzwerk sichtbar sind und setzt HomeMatic-Systemvariablen.
Erkennungsmethoden:
1. WiFi Registration Table neue API (ROS 7.x, /interface/wifi/registration-table)
2. CAPsMAN Registration Table alte API (ROS 6/früh-7, /caps-man/registration-table)
Beide beziehen sich auf den CAPsMAN-Controller (Router), der den AP verwaltet.
3. ARP-Tabelle als Fallback (letzte bekannte IP/MAC-Zuordnung)
Deployment: ebesch-docker (192.168.252.10), Cronjob jede Minute.
"""
import sys
import logging
import configparser
import argparse
from pathlib import Path
import requests
from requests.auth import HTTPBasicAuth
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("presence")
CONFIG_FILE = Path(__file__).parent / "config.ini"
def load_config(path: Path) -> configparser.ConfigParser:
cfg = configparser.ConfigParser()
if not path.exists():
log.error(f"Konfigurationsdatei nicht gefunden: {path}")
sys.exit(1)
cfg.read(path)
return cfg
# ---------------------------------------------------------------------------
class MikroTik:
def __init__(self, host: str, user: str, password: str, timeout: int = 8):
self.base = f"http://{host}/rest"
self.auth = HTTPBasicAuth(user, password)
self.timeout = timeout
def _get(self, path: str) -> list:
try:
r = requests.get(
f"{self.base}{path}", auth=self.auth, timeout=self.timeout
)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
log.warning(f"MikroTik API Fehler ({path}): {e}")
return []
def wifi_clients(self) -> set:
"""
Aktive WLAN-Clients vom CAPsMAN-Controller (Router .254).
Probiert zuerst die neue WiFi-API (ROS 7.x), dann die alte CAPsMAN-API.
"""
# ROS 7.x: neues WiFi-System
data = self._get("/interface/wifi/registration-table")
if data:
macs = {e["mac-address"].upper() for e in data if "mac-address" in e}
log.debug(f"WiFi (neu): {len(macs)} aktive Clients")
return macs
# Fallback: altes CAPsMAN (ROS 6 / früh-7)
data = self._get("/caps-man/registration-table")
if data:
macs = {e["mac-address"].upper() for e in data if "mac-address" in e}
log.debug(f"CAPsMAN (alt): {len(macs)} aktive Clients")
return macs
log.debug("WiFi/CAPsMAN: keine Clients (oder API leer)")
return set()
def arp_macs(self) -> set:
"""MACs aus der ARP-Tabelle (bridge-Interface) als Fallback."""
data = self._get("/ip/arp")
macs = {
e["mac-address"].upper()
for e in data
if e.get("interface") == "bridge" and "mac-address" in e
}
log.debug(f"ARP: {len(macs)} Einträge")
return macs
def is_present(self, mac: str, wifi: set, arp: set) -> bool:
mac = mac.upper()
if mac in wifi:
log.info(f"{mac} im WLAN (aktiv verbunden via CAPsMAN-AP)")
return True
if mac in arp:
log.info(f"{mac} in ARP-Tabelle (kürzlich gesehen)")
return True
return False
# ---------------------------------------------------------------------------
class HomeMatic:
def __init__(self, host: str, timeout: int = 8):
self.base = f"http://{host}"
self.timeout = timeout
def set_variable(self, ise_id: str, value: bool) -> bool:
"""Setzt eine boolesche Systemvariable per XML-API (kein Login nötig)."""
val_str = "true" if value else "false"
try:
r = requests.get(
f"{self.base}/config/xmlapi/statechange.cgi",
params={"ise_id": ise_id, "new_val": val_str},
timeout=self.timeout,
)
r.raise_for_status()
log.debug(f" Variable {ise_id}{val_str}")
return True
except requests.RequestException as e:
log.warning(f"CCU2 statechange Fehler (ise_id={ise_id}): {e}")
return False
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="HomeMatic iPhone Anwesenheitserkennung")
parser.add_argument("--config", type=Path, default=CONFIG_FILE)
parser.add_argument("--dry-run", action="store_true", help="Keine Änderungen schreiben")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
cfg = load_config(args.config)
mt = MikroTik(
host=cfg["mikrotik"]["host"],
user=cfg["mikrotik"]["user"],
password=cfg["mikrotik"]["password"],
)
hm = HomeMatic(host=cfg["homematic"]["host"])
devices = {
name[len("device."):]: {
"mac": cfg[name]["mac"].upper(),
"ise_id": cfg[name]["ise_id"],
"label": cfg[name].get("label", name[len("device."):]),
}
for name in cfg.sections() if name.startswith("device.")
}
if not devices:
log.error("Keine Geräte in config.ini definiert!")
sys.exit(1)
log.info(f"Prüfe {len(devices)} Gerät(e)…")
wifi = mt.wifi_clients()
arp = mt.arp_macs()
results = {}
for name, dev in devices.items():
present = mt.is_present(dev["mac"], wifi, arp)
results[name] = present
status = "✓ zu Hause " if present else "✗ außer Haus"
log.info(f" {dev['label']:25} [{dev['mac']}] {status}")
if not args.dry_run:
for name, dev in devices.items():
hm.set_variable(dev["ise_id"], results[name])
combined_ise_id = cfg.get("homematic", "combined_ise_id", fallback=None)
if combined_ise_id:
anyone_home = any(results.values())
hm.set_variable(combined_ise_id, anyone_home)
status = "✓ jemand zuhause" if anyone_home else "✗ niemand zuhause"
log.info(f" {'Anwesenheit (gesamt)':25} {status}")
else:
log.info("DRY-RUN: Keine Änderungen geschrieben.")
log.info("Fertig.")
if __name__ == "__main__":
main()