1
0
forked from arti/doors

Compare commits

..

6 Commits

Author SHA1 Message Date
Arti Zirk
d1d673bf7c Merge branch 'implementAddingNewKeycards' of dos4dev/doors into master
LGTM
2020-10-15 16:05:40 +00:00
dos4dev
eb16e01f79 Add post "/info/<user_id>" endpoint to save new card key. 2020-10-11 22:31:30 +03:00
Arti Zirk
3551611c85 Add API auth for doors 2020-10-08 23:11:35 +03:00
Arti Zirk
7878cadb30 Teach DB to create schema in existing connection 2020-10-08 20:44:56 +03:00
Arti Zirk
ea3d617e9c Add tests and more docs 2020-10-08 19:47:53 +03:00
Arti Zirk
7fa907699d Add db init instructions 2020-10-08 19:18:10 +03:00
5 changed files with 77 additions and 16 deletions

View File

@ -9,14 +9,23 @@
## Initialize database ## Initialize database
source venv/bin/activate source venv/bin/activate
curl http://172.21.43.1:5000/user > ad.json
python3 -m kdoorweb initdb
## Run dev server ## Run dev server
source venv/bin/activate source venv/bin/activate
python -m kdoorweb
Or
bottle.py --debug --reload kdoorweb:application bottle.py --debug --reload kdoorweb:application
To set the TCP port number add it as the last argument
python -m kdoorweb 8888
## Run unittests ## Run unittests
source venv/bin/activate source venv/bin/activate

View File

@ -1,29 +1,33 @@
from bottle import Bottle, request, response import hashlib
from functools import partial
from bottle import Bottle, request, response, HTTPError
api = Bottle() api = Bottle()
scrypt = partial(hashlib.scrypt, n=16384, r=8, p=1)
# FIXME: Fix door api auth
def check_api_auth(callback): def check_api_auth(callback):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
print("check api auth") print("check api auth")
user, api_key = request.auth or (None, None)
if "db" not in kwargs: if "db" not in kwargs:
request.current_user = None return "Auth not possible, db not available"
return callback(*args, **kwargs) user = kwargs["db"].get_door_by_name_and_api_key(user) or {}
user = None stored_hash, salt = dict(user).get("api_key", ":").split(":")
request.current_user = user api_hash = scrypt(api_key, salt=salt)
if user: if user and api_hash == api_key:
print(f"logged in as {user['user']}") request.current_door = user
print(request.current_user) print(f"logged in as {user['name']}")
print(user)
return callback(*args, **kwargs) return callback(*args, **kwargs)
else: else:
print("not logged in") print("not logged in")
return "Invalid authentication" return "Invalid authentication"
return wrapper return wrapper
# FIXME: db plugin not available yet
api.install(check_api_auth)
@api.route("/") @api.route("/")
def index(): def index():

View File

@ -51,7 +51,10 @@ class DB:
@staticmethod @staticmethod
def create_db(dbfile): def create_db(dbfile):
db = sqlite3.connect(dbfile) if type(dbfile) is DB:
db = dbfile.db
else:
db = sqlite3.connect(dbfile)
db.executescript(""" db.executescript("""
create table versions ( create table versions (
version integer, version integer,
@ -80,7 +83,7 @@ class DB:
); );
create table doors ( create table doors (
id integer primary key, id integer primary key,
name text, name text unique,
note text, note text,
api_key text, api_key text,
created text, created text,
@ -177,6 +180,24 @@ class DB:
""") """)
return cur.fetchall() return cur.fetchall()
def add_door(self, name, note, api_key):
self.add_doors([name, note, api_key, ])
def add_doors(self, doors):
self.db.executemany("""
insert into doors(name, note, api_key, created, disabled)
values(?, ?, ?, ?, ?)
""", doors)
self.db.commit()
def get_door(self, door_id):
cur = self.db.execute("select id, name, note, api_key, created, disabled from doors where id = ?", (door_id,))
return cur.fetchone()
def get_door_by_name(self, name):
cur = self.db.execute("select id, name, note, api_key, created, disabled from doors where name = ?", (name,))
return cur.fetchone()
@staticmethod @staticmethod
def import_ad(json_file): def import_ad(json_file):
with open(json_file) as fp: with open(json_file) as fp:
@ -238,7 +259,7 @@ def initdb():
pass pass
DB.create_db(dbfile) DB.create_db(dbfile)
db = DB(dbfile) db = DB(dbfile)
db.add_users(db.import_ad("../ad.json")) db.add_users(db.import_ad("ad.json"))
users = db.list_users() users = db.list_users()
for user in users: for user in users:
print(dict(user)) print(dict(user))

View File

@ -5,7 +5,7 @@ from bottle import Bottle, view, TEMPLATE_PATH, static_file, \
request, redirect, response, HTTPError request, redirect, response, HTTPError
from .db import SQLitePlugin from .db import SQLitePlugin
from .api import api from .api import api, check_api_auth
application = app = Bottle() application = app = Bottle()
@ -56,6 +56,7 @@ app.install(db_plugin)
app.install(check_auth) app.install(check_auth)
api.install(db_plugin) api.install(db_plugin)
api.install(check_api_auth)
app.mount("/api/v1", api) app.mount("/api/v1", api)
@ -138,6 +139,14 @@ def info(db, user_id):
return {**user, "keycards": keycards} return {**user, "keycards": keycards}
@app.post("/info/<user_id>")
def add_key_card(db, user_id):
card_name = request.forms.get("name")
card_uid = request.forms.get("uid")
db.add_keycard(user_id, card_uid, card_name)
redirect("/info/" + user_id)
@app.route("/log") @app.route("/log")
@view("log.html") @view("log.html")
def log(db): def log(db):

18
kdoorweb/tests/test_db.py Normal file
View File

@ -0,0 +1,18 @@
import unittest
from kdoorweb.db import DB
class TestDB(unittest.TestCase):
def setUp(self) -> None:
self.db = DB()
def test_create_db_in_memory(self):
DB.create_db(dbfile=":memory:")
def test_create_db_in_connection(self):
DB.create_db(dbfile=self.db)
if __name__ == '__main__':
unittest.main()