136 lines
4.5 KiB
Python
136 lines
4.5 KiB
Python
import logging
|
|
import threading
|
|
import time
|
|
import os
|
|
import requests
|
|
from hashlib import scrypt
|
|
from json.decoder import JSONDecodeError
|
|
from wiegand import Decoder
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s",
|
|
datefmt="%m-%d %H:%M"
|
|
)
|
|
|
|
|
|
def hash_uid(uid: str, salt: str) -> str:
|
|
return scrypt(bytes.fromhex(uid),
|
|
salt=salt.encode(),
|
|
n=16384,
|
|
r=8,
|
|
p=1).hex()
|
|
|
|
|
|
class DoorController:
|
|
def __init__(self,
|
|
door,
|
|
api_allowed,
|
|
api_longpoll,
|
|
api_swipe,
|
|
api_key,
|
|
uid_salt):
|
|
self.door = door
|
|
self.api_allowed = api_allowed
|
|
self.api_longpoll = api_longpoll
|
|
self.api_swipe = api_swipe
|
|
self.uid_salt = uid_salt
|
|
|
|
self.uids = {}
|
|
|
|
self.force_sync_now = threading.Event()
|
|
self.session = requests.Session()
|
|
self.session.headers.update({"KEY": api_key})
|
|
|
|
self.sync_cards()
|
|
logging.info("Running")
|
|
self.wiegand = Decoder(self.wiegand_callback)
|
|
self.notify_thread = threading.Thread(target=self.listen_notification,
|
|
daemon=True)
|
|
self.notify_thread.start()
|
|
self.auto_sync_loop()
|
|
|
|
def sync_cards(self):
|
|
logging.info("Downloading users list")
|
|
try:
|
|
r = self.session.get(self.api_allowed, timeout=15)
|
|
allowed_uids = r.json()["allowed_uids"]
|
|
except JSONDecodeError:
|
|
logging.exception("Failed to decode allowed uids json")
|
|
except (requests.Timeout, requests.ConnectionError):
|
|
logging.exception("Connection timeout/error in sync cards")
|
|
except Exception:
|
|
logging.exception("Some other exception")
|
|
else:
|
|
uids = set()
|
|
for token in allowed_uids:
|
|
uids.add(token["token"]["uid_hash"].strip())
|
|
self.uids = uids
|
|
|
|
def wiegand_callback(self, value):
|
|
uid_hash = hash_uid(value, self.uid_salt)
|
|
logging.debug(f"hash {uid_hash}")
|
|
if uid_hash in self.uids:
|
|
logging.info(f"Opening door for UID hash trail {uid_hash[-10:]}")
|
|
self.wiegand.open_door()
|
|
success = True
|
|
else:
|
|
logging.info(f"Access card not in allow list, hash trail {uid_hash[-10:]}")
|
|
success = False
|
|
data = {
|
|
"uid": value,
|
|
"uid_hash": uid_hash,
|
|
"door": self.door,
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
|
|
}
|
|
if success:
|
|
data["success"] = success
|
|
try:
|
|
requests.post(self.api_swipe, data=data, timeout=15)
|
|
except (requests.Timeout, requests.ConnectionError):
|
|
logging.exception("Connection timeout/error in post swipes")
|
|
except Exception:
|
|
logging.exception("Some other exception")
|
|
|
|
def listen_notification(self):
|
|
while 1:
|
|
try:
|
|
r = self.session.get(self.api_longpoll,
|
|
headers={"Connection": "close"},
|
|
timeout=60, stream=True)
|
|
for line in r.iter_lines(1, decode_unicode=True):
|
|
if not line.strip():
|
|
continue
|
|
logging.debug("Got notification: %s", line)
|
|
if self.door in line:
|
|
logging.info("Opening door from notify")
|
|
self.wiegand.open_door()
|
|
self.force_sync_now.set()
|
|
except (requests.Timeout,
|
|
requests.ConnectionError):
|
|
logging.debug("notification timeout")
|
|
except Exception:
|
|
logging.exception("Some other exception")
|
|
time.sleep(0.1)
|
|
|
|
def auto_sync_loop(self):
|
|
while True:
|
|
try:
|
|
self.force_sync_now.wait(60*10) # == 10min
|
|
self.force_sync_now.clear()
|
|
self.sync_cards()
|
|
except KeyboardInterrupt:
|
|
self.wiegand.cancel()
|
|
break
|
|
|
|
|
|
if __name__ == "__main__":
|
|
DoorController(
|
|
door=os.environ["KDOORPI_DOOR"],
|
|
api_allowed=os.environ["KDOORPI_API_ALLOWED"],
|
|
api_longpoll=os.environ["KDOORPI_API_LONGPOLL"],
|
|
api_swipe=os.environ["KDOORPI_API_SWIPE"],
|
|
api_key=os.environ["KDOORPI_API_KEY"],
|
|
uid_salt=os.environ["KDOORPI_UID_SALT"],
|
|
)
|