import logging import threading import time import os import requests from hashlib import scrypt from json.decoder import JSONDecodeError from wiegand import Decoder logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s", datefmt="%m-%d %H:%M" ) def hash_uid(uid: str, salt: str) -> str: return scrypt(bytes.fromhex(uid), salt=salt.encode(), n=16384, r=8, p=1).hex() class DoorController: def __init__(self, door, api_allowed, api_longpoll, api_swipe, api_key, uid_salt): self.door = door self.api_allowed = api_allowed self.api_longpoll = api_longpoll self.api_swipe = api_swipe self.uid_salt = uid_salt self.uids = {} self.force_sync_now = threading.Event() self.session = requests.Session() self.session.headers.update({"KEY": api_key}) self.sync_cards() logging.info("Running") self.wiegand = Decoder(self.wiegand_callback) self.notify_thread = threading.Thread(target=self.listen_notification, daemon=True) self.notify_thread.start() self.auto_sync_loop() def sync_cards(self): logging.info("Downloading users list") try: r = self.session.get(self.api_allowed, timeout=15) allowed_uids = r.json()["allowed_uids"] except JSONDecodeError: logging.exception("Failed to decode allowed uids json") except (requests.Timeout, requests.ConnectionError): logging.exception("Connection timeout/error in sync cards") except Exception: logging.exception("Some other exception") else: uids = set() for token in allowed_uids: uids.add(token["token"]["uid_hash"].strip()) self.uids = uids def wiegand_callback(self, value): uid_hash = hash_uid(value, self.uid_salt) logging.debug(f"hash {uid_hash}") if uid_hash in self.uids: logging.info(f"Opening door for UID hash trail {uid_hash[-10:]}") self.wiegand.open_door() success = True else: logging.info(f"Access card not in allow list, hash trail {uid_hash[-10:]}") success = False data = { "uid": value, "uid_hash": uid_hash, "door": self.door, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) } if success: data["success"] = success try: requests.post(self.api_swipe, data=data, timeout=15) except (requests.Timeout, requests.ConnectionError): logging.exception("Connection timeout/error in post swipes") except Exception: logging.exception("Some other exception") def listen_notification(self): while 1: try: r = self.session.get(self.api_longpoll, headers={"Connection": "close"}, timeout=60, stream=True) for line in r.iter_lines(1, decode_unicode=True): if not line.strip(): continue logging.debug("Got notification: %s", line) if self.door in line: logging.info("Opening door from notify") self.wiegand.open_door() self.force_sync_now.set() except (requests.Timeout, requests.ConnectionError): logging.debug("notification timeout") except Exception: logging.exception("Some other exception") time.sleep(0.1) def auto_sync_loop(self): while True: try: self.force_sync_now.wait(60*10) # == 10min self.force_sync_now.clear() self.sync_cards() except KeyboardInterrupt: self.wiegand.cancel() break if __name__ == "__main__": DoorController( door=os.environ["KDOORPI_DOOR"], api_allowed=os.environ["KDOORPI_API_ALLOWED"], api_longpoll=os.environ["KDOORPI_API_LONGPOLL"], api_swipe=os.environ["KDOORPI_API_SWIPE"], api_key=os.environ["KDOORPI_API_KEY"], uid_salt=os.environ["KDOORPI_UID_SALT"], )