Doorboy proxy exports door access information from MongoDB to door Raspberry Pi controllers
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

66 lines
2.4 KiB

2 years ago
from datetime import datetime
from sanic import Sanic, response
from sanic.response import text, json, stream
from motor.motor_asyncio import AsyncIOMotorClient
import uvloop, asyncio
import pymongo
2 years ago
import os
import const
2 years ago
app = Sanic(__name__)
#mongodb = MongoClient('mongodb://,,', replicaSet="kspace-mongo-set").kspace_accounting
mongodb = AsyncIOMotorClient(const.MONGO_URI)
mongodb = mongodb.get_default_database()
2 years ago
assert len(DOORBOY_SECRET) > 10
async def view_doorboy_uids(request):
2 years ago
if request.headers.get('KEY') != DOORBOY_SECRET:
return text("how about no")
allowed_names = []
async for obj in mongodb.member.find({"enabled": True}):
2 years ago
allowed_uids = []
async for obj in mongodb.inventory.find({"token.uid_hash": {"$exists":True}, "inventory.owner_id": {"$exists":True}, "token.enabled": True}, {"inventory.owner_id": True, "token.uid_hash": True }):
2 years ago
if obj["inventory"].pop("owner_id") in allowed_names:
del obj["_id"]
del obj["inventory"]
2 years ago
return json({"allowed_uids": allowed_uids})
2 years ago
async def view_longpoll(request):
2 years ago
if request.headers.get('KEY') != DOORBOY_SECRET:
return text("how about no")
2 years ago
async def g(response):
await response.write("data: response-generator-started\n\n")
2 years ago
pipeline = [
'$match': {
'operationType': "insert",
# 'component': 'doorboy',
# 'type': 'open-door'
async with as stream:
await response.write("data: watch-stream-opened\n\n")
async for event in stream:
2 years ago
if event["fullDocument"].get("type") == "open-door":
await response.write("data: %s\n\n" % event["fullDocument"]["door"])
2 years ago
except pymongo.errors.PyMongoError:
return stream(g, content_type="text/event-stream")
2 years ago
if __name__ == '__main__':
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # swap default event loop to uvloop
loop = asyncio.get_event_loop()'::', debug=False, loop=loop)