verlock-door-controller/kdoorpi/__init__.py
2020-11-05 19:35:42 +02:00

131 lines
4.3 KiB
Python

import logging
import threading
import requests
import time
from functools import partial
from json.decoder import JSONDecodeError
try:
from hashlib import scrypt
UID_HASH = partial(scrypt, n=16384, r=8, p=1)
except ImportError:
# Python 3.5 needs pip install scrypt
from scrypt import hash as scrypt
UID_HASH = partial(scrypt, N=16384, r=8, p=1)
from .wiegand import Decoder
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M'
)
def hash_uid(uid: str, salt: str) -> str:
return UID_HASH(bytes.fromhex(uid), salt=salt.encode()).hex()
class Main:
def __init__(self,
door,
api_allowed,
api_longpoll,
api_swipe,
api_key,
uid_hash):
self.door = door
self.api_allowed = api_allowed
self.api_longpoll = api_longpoll
self.api_swipe = api_swipe
self.uid_hash = uid_hash
self.uids = {}
self.force_sync_now = threading.Event()
self.session = requests.Session()
self.session.headers.update({"KEY": api_key})
self.sync_cards()
logging.info("Running")
self.wiegand = Decoder(self.wiegand_callback)
self.notify_thread = threading.Thread(target=self.listen_notification, daemon=True)
self.notify_thread.start()
self.auto_sync_loop()
def sync_cards(self):
logging.info("Downloading users list")
try:
r = self.session.get(self.api_allowed, timeout=15)
allowed_uids = r.json()["allowed_uids"]
except JSONDecodeError as e:
logging.exception("Failed to decode allowed uids json")
except (requests.Timeout, requests.ConnectionError) as e:
logging.exception("Connection timeout/error in sync cards")
except Exception:
logging.exception("Some other exception")
else:
uids = set()
for token in allowed_uids:
uids.add(token["token"]["uid_hash"].strip())
self.uids = uids
def wiegand_callback(self, bits, value):
#print("bits", bits, "value", value)
uid_h = hash_uid(value, self.uid_hash)
logging.debug("hash %s", uid_h)
if uid_h in self.uids:
logging.info("Opening door for UID hash trail %s", uid_h[-10:])
self.wiegand.open_door()
success = True
else:
logging.info("Access card not in allow list, hash trail %s", uid_h[-10:])
success = False
data = {
"uid": value,
"uid_hash": uid_h,
"door": self.door,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
}
if success:
data["success"] = success
try:
requests.post(self.api_swipe, data=data, timeout=15)
except (requests.Timeout, requests.ConnectionError) as e:
logging.exception("Connection timeout/error in post swipes")
except Exception:
logging.exception("Some other exception")
def listen_notification(self):
while 1:
try:
r = self.session.get(self.api_longpoll,
headers={"Connection": "close"},
timeout=60, stream=True)
for line in r.iter_lines(1, decode_unicode=True):
if not line.strip():
continue
logging.debug("Got notification: %s", line)
if self.door in line:
logging.info("Opening door from notify")
self.wiegand.open_door()
self.force_sync_now.set()
except (requests.Timeout,
requests.ConnectionError) as e:
logging.debug("notification timeout")
except Exception:
logging.exception("Some other exception")
time.sleep(0.1)
def auto_sync_loop(self):
while 1:
try:
self.force_sync_now.wait(60*10) # == 10min
self.force_sync_now.clear()
self.sync_cards()
except KeyboardInterrupt as e:
self.wiegand.cancel()
break