116 lines
4.3 KiB
Python
116 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
HomeMatic Anwesenheitserkennung – MikroTik cAP ax (lokale WiFi-Konfig)
|
||
======================================================================
|
||
Erkennungsreihenfolge:
|
||
1. AP WiFi Registration Table (/interface/wifi/registration-table @ 192.168.252.253)
|
||
2. ARP-Tabelle am Router (/ip/arp @ 192.168.252.254)
|
||
|
||
Deployment: ebesch-docker (192.168.252.10), Cronjob jede Minute.
|
||
"""
|
||
import sys, logging, configparser, argparse
|
||
from pathlib import Path
|
||
import requests
|
||
from requests.auth import HTTPBasicAuth
|
||
|
||
logging.basicConfig(level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
||
log = logging.getLogger("presence")
|
||
CONFIG_FILE = Path(__file__).parent / "config.ini"
|
||
|
||
def load_config(path):
|
||
cfg = configparser.ConfigParser()
|
||
if not path.exists():
|
||
log.error(f"config.ini nicht gefunden: {path}"); sys.exit(1)
|
||
cfg.read(path); return cfg
|
||
|
||
class MikroTik:
|
||
def __init__(self, host, user, password, timeout=8):
|
||
self.base = f"http://{host}/rest"
|
||
self.auth = HTTPBasicAuth(user, password)
|
||
self.timeout = timeout
|
||
|
||
def _get(self, path):
|
||
try:
|
||
r = requests.get(f"{self.base}{path}", auth=self.auth, timeout=self.timeout)
|
||
r.raise_for_status(); return r.json()
|
||
except Exception as e:
|
||
log.warning(f"API Fehler {path}: {e}"); return []
|
||
|
||
def wifi_clients(self):
|
||
"""Aktive WLAN-Clients vom AP (lokale Konfig, ROS 7.x WiFi-System)."""
|
||
data = self._get("/interface/wifi/registration-table")
|
||
macs = {e["mac-address"].upper() for e in data if "mac-address" in e}
|
||
log.debug(f"AP WiFi: {len(macs)} Clients")
|
||
return macs
|
||
|
||
def arp_macs(self):
|
||
"""ARP-Tabelle am Router als Fallback."""
|
||
data = self._get("/ip/arp")
|
||
macs = {e["mac-address"].upper() for e in data
|
||
if e.get("interface") == "bridge" and "mac-address" in e}
|
||
log.debug(f"Router ARP: {len(macs)} Einträge")
|
||
return macs
|
||
|
||
class HomeMatic:
|
||
def __init__(self, host, timeout=8):
|
||
self.base = f"http://{host}"; self.timeout = timeout
|
||
|
||
def set_variable(self, ise_id, value):
|
||
val = "true" if value else "false"
|
||
try:
|
||
requests.get(f"{self.base}/config/xmlapi/statechange.cgi",
|
||
params={"ise_id": ise_id, "new_val": val}, timeout=self.timeout).raise_for_status()
|
||
log.debug(f" ISE {ise_id} → {val}")
|
||
except Exception as e:
|
||
log.warning(f"CCU2 Fehler ise_id={ise_id}: {e}")
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--config", type=Path, default=CONFIG_FILE)
|
||
parser.add_argument("--dry-run", action="store_true")
|
||
parser.add_argument("-v", "--verbose", action="store_true")
|
||
args = parser.parse_args()
|
||
if args.verbose: log.setLevel(logging.DEBUG)
|
||
|
||
cfg = load_config(args.config)
|
||
|
||
ap = MikroTik(cfg["ap"]["host"], cfg["ap"]["user"], cfg["ap"]["password"])
|
||
rt = MikroTik(cfg["router"]["host"], cfg["router"]["user"], cfg["router"]["password"])
|
||
hm = HomeMatic(cfg["homematic"]["host"])
|
||
|
||
devices = {
|
||
s[len("device."):]: {"mac": cfg[s]["mac"].upper(),
|
||
"ise_id": cfg[s]["ise_id"],
|
||
"label": cfg[s].get("label", s)}
|
||
for s in cfg.sections() if s.startswith("device.")
|
||
}
|
||
if not devices: log.error("Keine Geräte!"); sys.exit(1)
|
||
|
||
log.info(f"Prüfe {len(devices)} Gerät(e)…")
|
||
wifi = ap.wifi_clients()
|
||
arp = rt.arp_macs()
|
||
|
||
results = {}
|
||
for name, dev in devices.items():
|
||
mac = dev["mac"]
|
||
present = mac in wifi or mac in arp
|
||
results[name] = present
|
||
src = "WLAN" if mac in wifi else ("ARP" if mac in arp else "–")
|
||
log.info(f" {dev['label']:25} {mac} {'✓ zu Hause ' if present else '✗ außer Haus'} [{src}]")
|
||
|
||
if not args.dry_run:
|
||
for name, dev in devices.items():
|
||
hm.set_variable(dev["ise_id"], results[name])
|
||
comb = cfg.get("homematic", "combined_ise_id", fallback=None)
|
||
if comb:
|
||
home = any(results.values())
|
||
hm.set_variable(comb, home)
|
||
log.info(f" {'Anwesenheit':25} {'✓ jemand da' if home else '✗ niemand da'}")
|
||
else:
|
||
log.info("DRY-RUN – nichts geschrieben.")
|
||
log.info("Fertig.")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|