verlock-door-controller/kdoorpi.py

136 lines
4.5 KiB
Python

import logging
import threading
import time
import os
import requests
from hashlib import scrypt
from json.decoder import JSONDecodeError
from wiegand import Decoder
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s",
datefmt="%m-%d %H:%M"
)
def hash_uid(uid: str, salt: str) -> str:
return scrypt(bytes.fromhex(uid),
salt=salt.encode(),
n=16384,
r=8,
p=1).hex()
class DoorController:
def __init__(self,
door,
api_allowed,
api_longpoll,
api_swipe,
api_key,
uid_salt):
self.door = door
self.api_allowed = api_allowed
self.api_longpoll = api_longpoll
self.api_swipe = api_swipe
self.uid_salt = uid_salt
self.uids = {}
self.force_sync_now = threading.Event()
self.session = requests.Session()
self.session.headers.update({"KEY": api_key})
self.sync_cards()
logging.info("Running")
self.wiegand = Decoder(self.wiegand_callback)
self.notify_thread = threading.Thread(target=self.listen_notification,
daemon=True)
self.notify_thread.start()
self.auto_sync_loop()
def sync_cards(self):
logging.info("Downloading users list")
try:
r = self.session.get(self.api_allowed, timeout=15)
allowed_uids = r.json()["allowed_uids"]
except JSONDecodeError:
logging.exception("Failed to decode allowed uids json")
except (requests.Timeout, requests.ConnectionError):
logging.exception("Connection timeout/error in sync cards")
except Exception:
logging.exception("Some other exception")
else:
uids = set()
for token in allowed_uids:
uids.add(token["token"]["uid_hash"].strip())
self.uids = uids
def wiegand_callback(self, value):
uid_hash = hash_uid(value, self.uid_salt)
logging.debug(f"hash {uid_hash}")
if uid_hash in self.uids:
logging.info(f"Opening door for UID hash trail {uid_hash[-10:]}")
self.wiegand.open_door()
success = True
else:
logging.info(f"Access card not in allow list, hash trail {uid_hash[-10:]}")
success = False
data = {
"uid": value,
"uid_hash": uid_hash,
"door": self.door,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
}
if success:
data["success"] = success
try:
requests.post(self.api_swipe, data=data, timeout=15)
except (requests.Timeout, requests.ConnectionError):
logging.exception("Connection timeout/error in post swipes")
except Exception:
logging.exception("Some other exception")
def listen_notification(self):
while 1:
try:
r = self.session.get(self.api_longpoll,
headers={"Connection": "close"},
timeout=60, stream=True)
for line in r.iter_lines(1, decode_unicode=True):
if not line.strip():
continue
logging.debug("Got notification: %s", line)
if self.door in line:
logging.info("Opening door from notify")
self.wiegand.open_door()
self.force_sync_now.set()
except (requests.Timeout,
requests.ConnectionError):
logging.debug("notification timeout")
except Exception:
logging.exception("Some other exception")
time.sleep(0.1)
def auto_sync_loop(self):
while True:
try:
self.force_sync_now.wait(60*10) # == 10min
self.force_sync_now.clear()
self.sync_cards()
except KeyboardInterrupt:
self.wiegand.cancel()
break
if __name__ == "__main__":
DoorController(
door=os.environ["KDOORPI_DOOR"],
api_allowed=os.environ["KDOORPI_API_ALLOWED"],
api_longpoll=os.environ["KDOORPI_API_LONGPOLL"],
api_swipe=os.environ["KDOORPI_API_SWIPE"],
api_key=os.environ["KDOORPI_API_KEY"],
uid_salt=os.environ["KDOORPI_UID_SALT"],
)