Door controller software that runs on the Raspberry Pi
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

135 lines
4.5 KiB

import logging
import threading
import time
import os
import requests
from hashlib import scrypt
from json.decoder import JSONDecodeError
from wiegand import Decoder
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s",
datefmt="%m-%d %H:%M"
)
def hash_uid(uid: str, salt: str) -> str:
return scrypt(bytes.fromhex(uid),
salt=salt.encode(),
n=16384,
r=8,
p=1).hex()
class DoorController:
def __init__(self,
door,
api_allowed,
api_longpoll,
api_swipe,
api_key,
uid_salt):
self.door = door
self.api_allowed = api_allowed
self.api_longpoll = api_longpoll
self.api_swipe = api_swipe
self.uid_salt = uid_salt
self.uids = {}
self.force_sync_now = threading.Event()
self.session = requests.Session()
self.session.headers.update({"KEY": api_key})
self.sync_cards()
logging.info("Running")
self.wiegand = Decoder(self.wiegand_callback)
self.notify_thread = threading.Thread(target=self.listen_notification,
daemon=True)
self.notify_thread.start()
self.auto_sync_loop()
def sync_cards(self):
logging.info("Downloading users list")
try:
r = self.session.get(self.api_allowed, timeout=15)
allowed_uids = r.json()["allowed_uids"]
except JSONDecodeError:
logging.exception("Failed to decode allowed uids json")
except (requests.Timeout, requests.ConnectionError):
logging.exception("Connection timeout/error in sync cards")
except Exception:
logging.exception("Some other exception")
else:
uids = set()
for token in allowed_uids:
uids.add(token["token"]["uid_hash"].strip())
self.uids = uids
def wiegand_callback(self, value):
uid_hash = hash_uid(value, self.uid_salt)
logging.debug(f"hash {uid_hash}")
if uid_hash in self.uids:
logging.info(f"Opening door for UID hash trail {uid_hash[-10:]}")
self.wiegand.open_door()
success = True
else:
logging.info(f"Access card not in allow list, hash trail {uid_hash[-10:]}")
success = False
data = {
"uid": value,
"uid_hash": uid_hash,
"door": self.door,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
}
if success:
data["success"] = success
try:
requests.post(self.api_swipe, data=data, timeout=15)
except (requests.Timeout, requests.ConnectionError):
logging.exception("Connection timeout/error in post swipes")
except Exception:
logging.exception("Some other exception")
def listen_notification(self):
while 1:
try:
r = self.session.get(self.api_longpoll,
headers={"Connection": "close"},
timeout=60, stream=True)
for line in r.iter_lines(1, decode_unicode=True):
if not line.strip():
continue
logging.debug("Got notification: %s", line)
if self.door in line:
logging.info("Opening door from notify")
self.wiegand.open_door()
self.force_sync_now.set()
except (requests.Timeout,
requests.ConnectionError):
logging.debug("notification timeout")
except Exception:
logging.exception("Some other exception")
time.sleep(0.1)
def auto_sync_loop(self):
while True:
try:
self.force_sync_now.wait(60*10) # == 10min
self.force_sync_now.clear()
self.sync_cards()
except KeyboardInterrupt:
self.wiegand.cancel()
break
if __name__ == "__main__":
DoorController(
door=os.environ["KDOORPI_DOOR"],
api_allowed=os.environ["KDOORPI_API_ALLOWED"],
api_longpoll=os.environ["KDOORPI_API_LONGPOLL"],
api_swipe=os.environ["KDOORPI_API_SWIPE"],
api_key=os.environ["KDOORPI_API_KEY"],
uid_salt=os.environ["KDOORPI_UID_SALT"],
)