Modernize stack

master
Lauri Võsandi 11 months ago committed by Lauri Võsandi
parent ef67faf6f8
commit b7f3f57efd
  1. 20
      .drone.yml
  2. 3
      .flake8
  3. 6
      .gitlint
  4. 11
      .pre-commit-config.yaml
  5. 5
      Dockerfile
  6. 26
      contrib/uidhash.py
  7. 63
      kdoorpi.py
  8. 15
      kdoorpi.service
  9. 30
      kdoorpi/__main__.py
  10. 23
      setup.py
  11. 45
      wiegand.py

@ -0,0 +1,20 @@
---
kind: pipeline
type: kubernetes
name: default
platform:
os: linux
arch: arm/v6
steps:
- name: build
image: plugins/docker
settings:
repo: harbor.k-space.ee/${DRONE_REPO}
registry: harbor.k-space.ee
mtu: 1300
username:
from_secret: docker_username
password:
from_secret: docker_password

@ -0,0 +1,3 @@
[flake8]
inline-quotes = "
indent-size = 4

@ -0,0 +1,6 @@
[general]
ignore=body-is-missing,T3
ignore-stdin=true
[title-match-regex]
regex=[A-Z]

@ -0,0 +1,11 @@
repos:
- repo: https://github.com/PyCQA/flake8
rev: 3.9.2
hooks:
- id: flake8
additional_dependencies: [flake8-typing-imports==1.10.0,flake8-quotes==3.2.0]
- repo: https://github.com/jorisroovers/gitlint
rev: v0.15.1
hooks:
- id: gitlint

@ -0,0 +1,5 @@
FROM python:3-alpine
ADD kdoorpi.py wiegand.py /app/
WORKDIR /app
RUN pip install requests
ENTRYPOINT python kdoorpi.py

@ -1,26 +0,0 @@
import secrets
from functools import partial
try:
from hashlib import scrypt
uid_hash = partial(scrypt, n=16384, r=8, p=1)
except ImportError:
# Python 3.5 has to use external py-scrypt package from pypi
from scrypt import hash as scrypt
uid_hash = partial(scrypt, N=16384, r=8, p=1)
# print(secrets.token_urlsafe())
UID_SALT = "hkRXwLlQKmCJoy5qaahp"
def hash_uid(uid: str, salt: str = UID_SALT) -> str:
return uid_hash(bytes.fromhex(uid), salt=salt.encode()).hex()
if __name__ == "__main__":
UID = "01 23 AD F0"
ch = hash_uid(UID)
h = 'a6d9ba36ecb5f8e6312f40ee260ad59e9cca3c6ce073bf072df3324c0072886196e6823a7c758ab567fc53e91fbbda297a4efe0072e41316c56446ef126a5180'
print("UID:", UID)
print("hash:", ch)
print("correct", ch == h)

@ -1,33 +1,28 @@
import logging
import threading
import requests
import time
from functools import partial
import os
import requests
from hashlib import scrypt
from json.decoder import JSONDecodeError
try:
from hashlib import scrypt
UID_HASH = partial(scrypt, n=16384, r=8, p=1)
except ImportError:
# Python 3.5 needs pip install scrypt
from scrypt import hash as scrypt
UID_HASH = partial(scrypt, N=16384, r=8, p=1)
from .wiegand import Decoder
from wiegand import Decoder
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M'
format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s",
datefmt="%m-%d %H:%M"
)
def hash_uid(uid: str, salt: str) -> str:
return UID_HASH(bytes.fromhex(uid), salt=salt.encode()).hex()
return scrypt(bytes.fromhex(uid),
salt=salt.encode(),
n=16384,
r=8,
p=1).hex()
class Main:
class DoorController:
def __init__(self,
door,
api_allowed,
@ -50,7 +45,8 @@ class Main:
self.sync_cards()
logging.info("Running")
self.wiegand = Decoder(self.wiegand_callback)
self.notify_thread = threading.Thread(target=self.listen_notification, daemon=True)
self.notify_thread = threading.Thread(target=self.listen_notification,
daemon=True)
self.notify_thread.start()
self.auto_sync_loop()
@ -59,9 +55,9 @@ class Main:
try:
r = self.session.get(self.api_allowed, timeout=15)
allowed_uids = r.json()["allowed_uids"]
except JSONDecodeError as e:
except JSONDecodeError:
logging.exception("Failed to decode allowed uids json")
except (requests.Timeout, requests.ConnectionError) as e:
except (requests.Timeout, requests.ConnectionError):
logging.exception("Connection timeout/error in sync cards")
except Exception:
logging.exception("Some other exception")
@ -72,7 +68,6 @@ class Main:
self.uids = uids
def wiegand_callback(self, bits, value):
#print("bits", bits, "value", value)
uid_h = hash_uid(value, self.uid_hash)
logging.debug("hash %s", uid_h)
if uid_h in self.uids:
@ -80,7 +75,8 @@ class Main:
self.wiegand.open_door()
success = True
else:
logging.info("Access card not in allow list, hash trail %s", uid_h[-10:])
logging.info("Access card not in allow list, hash trail %s",
uid_h[-10:])
success = False
data = {
"uid": value,
@ -92,7 +88,7 @@ class Main:
data["success"] = success
try:
requests.post(self.api_swipe, data=data, timeout=15)
except (requests.Timeout, requests.ConnectionError) as e:
except (requests.Timeout, requests.ConnectionError):
logging.exception("Connection timeout/error in post swipes")
except Exception:
logging.exception("Some other exception")
@ -101,8 +97,8 @@ class Main:
while 1:
try:
r = self.session.get(self.api_longpoll,
headers={"Connection": "close"},
timeout=60, stream=True)
headers={"Connection": "close"},
timeout=60, stream=True)
for line in r.iter_lines(1, decode_unicode=True):
if not line.strip():
continue
@ -112,19 +108,28 @@ class Main:
self.wiegand.open_door()
self.force_sync_now.set()
except (requests.Timeout,
requests.ConnectionError) as e:
requests.ConnectionError):
logging.debug("notification timeout")
except Exception:
logging.exception("Some other exception")
time.sleep(0.1)
def auto_sync_loop(self):
while 1:
while True:
try:
self.force_sync_now.wait(60*10) # == 10min
self.force_sync_now.clear()
self.sync_cards()
except KeyboardInterrupt as e:
except KeyboardInterrupt:
self.wiegand.cancel()
break
if __name__ == "__main__":
DoorController(
os.environ["KDOORPI_DOOR"],
os.environ["KDOORPI_API_ALLOWED"],
os.environ["KDOORPI_API_LONGPOLL"],
os.environ["KDOORPI_API_SWIPE"],
os.environ["KDOORPI_API_KEY"],
os.environ["KDOORPI_UID_SALT"])

@ -1,15 +0,0 @@
# /etc/systemd/system/kdoorpi.service
[Unit]
Description=Kdoorpi service
Requires=pigpiod.service
After=pigpiod.service
[Service]
Environment=PYTHONUNBUFFERED=1
ExecStart=/home/pi/kdoorpi/run.sh
WorkingDirectory=/home/pi/kdoorpi
User=pi
Restart=always
[Install]
WantedBy=multi-user.target

@ -1,30 +0,0 @@
import os
from . import Main
"""
if "eesuks" in buf:
door = "front"
elif "valis" in buf:
door = "ground"
elif "taga" in buf:
door = "back"
else:
door = "unknown"
"""
door = os.environ["KDOORPI_DOOR"]
api_allowed = os.environ["KDOORPI_API_ALLOWED"]
api_longpoll = os.environ["KDOORPI_API_LONGPOLL"]
api_swipe = os.environ["KDOORPI_API_SWIPE"]
api_key = os.environ["KDOORPI_API_KEY"]
uid_salt = os.environ["KDOORPI_UID_SALT"]
if __name__ == "__main__":
Main(door,
api_allowed,
api_longpoll,
api_swipe,
api_key,
uid_salt)

@ -1,23 +0,0 @@
from setuptools import find_packages, setup
setup(
name='kdoorpi',
version='0.0.0',
author="Arti Zirk",
author_email="arti@zirk.me",
description="K-Space Door client that talks to the hardware",
packages=find_packages(),
include_package_data=True,
zip_safe=False,
python_requires='>=3.5',
install_requires=["requests"],
extras_require={},
classifiers=[
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3 :: Only',
'Topic :: System :: Networking',
'Intended Audience :: System Administrators',
]
)

@ -10,8 +10,8 @@ except Exception:
class Decoder:
"""
A class to read Wiegand codes of an arbitrary length.
"""
A class to read Wiegand codes of an arbitrary length.
"""
def __init__(self, callback, bit_timeout=5):
@ -27,10 +27,10 @@ class Decoder:
else:
self.pi = False
self.gpio_0 = 17 #settings.WIEGAND[0]
self.gpio_1 = 18 #settings.WIEGAND[1]
self.door_pin = 21 # from settings.py
self.button_pin = 13 # from settings.py
self.gpio_0 = 17
self.gpio_1 = 18
self.door_pin = 21
self.button_pin = 13
self.callback = callback
@ -48,9 +48,15 @@ class Decoder:
self.pi.set_pull_up_down(self.gpio_1, pigpio.PUD_UP)
self.pi.set_pull_up_down(self.button_pin, pigpio.PUD_UP)
self.cb_0 = self.pi.callback(self.gpio_0, pigpio.FALLING_EDGE, self._cb)
self.cb_1 = self.pi.callback(self.gpio_1, pigpio.FALLING_EDGE, self._cb)
self.button_cb_h = self.pi.callback(self.button_pin, pigpio.FALLING_EDGE, self._cb)
self.cb_0 = self.pi.callback(self.gpio_0,
pigpio.FALLING_EDGE,
self._cb)
self.cb_1 = self.pi.callback(self.gpio_1,
pigpio.FALLING_EDGE,
self._cb)
self.button_cb_h = self.pi.callback(self.button_pin,
pigpio.FALLING_EDGE,
self._cb)
def cut_empty(self, item):
if item[0:8] == "00000000":
@ -69,14 +75,16 @@ class Decoder:
bits = []
for i in range(len(items), 0, -8):
bits.append(int(items[i - 8:i], 2))
return (" ".join(map(lambda a: "%-0.2X" % ((a + 256) % 256), bits))).rstrip()
return (" ".join(map(lambda a: "%-0.2X" % ((a + 256) % 256),
bits))).rstrip()
except ValueError:
logging.error("Wiegand convert error: bin to hex convertion ended with ValeError. raw: " + str(self.items))
logging.error("Failed to convert binary to hex: %s" % self.items)
return False
except Exception as e:
logging.error("Wiegand convert error: (raw: " + str(self.items) + ") " + str(e))
logging.error("Wiegand convert error (raw: %s): %s" % (
self.items, e))
return False
def _cb(self, gpio_pin, level, tick):
@ -100,10 +108,10 @@ class Decoder:
self.pi.set_watchdog(self.gpio_1, self.bit_timeout)
if gpio_pin == self.gpio_0:
self.code_timeout &= 2 # clear gpio 0 timeout
self.code_timeout &= 2
self.items += "1"
else:
self.code_timeout &= 1 # clear gpio 1 timeout
self.code_timeout &= 1
self.items += "0"
else:
@ -125,13 +133,16 @@ class Decoder:
if hex:
self.callback(self.bits, hex)
else:
logging.error("Wiegand receive error: Expected at least 26 got %i bits. raw: %s" %(self.bits, self.items))
logging.error("Expected 26 bits, but got %i: %s" %
(self.bits, self.items))
except Exception as e:
logging.error("Wiegand callback error: " + str(e))
logging.error("Wiegand callback error: %s" % e)
def button_cb(self, gpio_pin, level, tick):
print("button: gpio_pin:{}, level:{}, tick:{}".format(gpio_pin, level, tick))
print("button: gpio_pin:{}, level:{}, tick:{}".format(gpio_pin,
level,
tick))
def open_door(self):
self.pi.write(self.door_pin, 1)
Loading…
Cancel
Save