import logging import threading import requests import time from functools import partial from json.decoder import JSONDecodeError try: from hashlib import scrypt UID_HASH = partial(scrypt, n=16384, r=8, p=1) except ImportError: # Python 3.5 needs pip install scrypt from scrypt import hash as scrypt UID_HASH = partial(scrypt, N=16384, r=8, p=1) from .wiegand import Decoder logging.basicConfig(level=logging.DEBUG) def hash_uid(uid: str, salt: str) -> str: return UID_HASH(bytes.fromhex(uid), salt=salt.encode()).hex() class Main: def __init__(self, door, api_allowed, api_longpoll, api_swipe, api_key, uid_hash): self.door = door self.api_allowed = api_allowed self.api_longpoll = api_longpoll self.api_swipe = api_swipe self.uid_hash = uid_hash self.uids = {} self.force_sync_now = threading.Event() self.session = requests.Session() self.session.headers.update({"KEY": api_key}) self.sync_cards() logging.info("Running") self.wiegand = Decoder(self.wiegand_callback) self.notify_thread = threading.Thread(target=self.listen_notification, daemon=True) self.notify_thread.start() self.auto_sync_loop() def sync_cards(self): logging.info("Downloading users list") r = self.session.get(self.api_allowed) try: allowed_uids = r.json()["allowed_uids"] except JSONDecodeError as e: logging.exception("Failed to decode allowed uids json") return uids = set() for token in allowed_uids: uids.add(token["token"]["uid_hash"].strip()) self.uids = uids def wiegand_callback(self, bits, value): #print("bits", bits, "value", value) uid_h = hash_uid(value, self.uid_hash) logging.debug("hash %s", uid_h) if uid_h in self.uids: logging.info("Opening door for UID hash trail %s", uid_h[-10:]) self.wiegand.open_door() success = True else: logging.info("Access card not in allow list, hash trail %s", uid_h[-10:]) success = False data = { "uid": value, "door": self.door, "success": success, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } print(data) requests.post(self.api_swipe, data=data) def listen_notification(self): while 1: try: r = self.session.get(self.api_longpoll, headers={"Connection": "close"}, timeout=60, stream=True) for line in r.iter_lines(1, decode_unicode=True): if not line.strip(): continue logging.debug("Got notification: %s", line) if self.door in line: logging.info("Opening door from notify") self.wiegand.open_door() self.force_sync_now.set() except (requests.Timeout, requests.ConnectionError) as e: logging.debug("notification timeout") time.sleep(0.1) def auto_sync_loop(self): while 1: try: self.force_sync_now.wait(60*10) # == 10min self.force_sync_now.clear() self.sync_cards() except KeyboardInterrupt as e: self.wiegand.cancel() break