#!/usr/bin/env python3 import aioboto3 import aiohttp import asyncio import cv2 import hashlib import io import json import numpy as np import os import signal import socket import sys from datetime import datetime, timedelta from jpeg2dct.numpy import loads from motor.motor_asyncio import AsyncIOMotorClient from prometheus_client import Counter, Gauge from sanic import Sanic, response from sanic.response import stream from sanic_prometheus import monitor _, url = sys.argv AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "camdetect") AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"] S3_ENDPOINT_URL = os.environ["S3_ENDPOINT_URL"] S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "camdetect") MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") MONGO_COLLECTION = os.getenv("MONGO_COLLETION", "eventlog") SOURCE_NAME = os.environ["SOURCE_NAME"] SLIDE_WINDOW = 2 DCT_BLOCK_SIZE = 8 # Percentage of blocks active to consider movement in whole frame THRESHOLD_RATIO = int(os.getenv("THRESHOLD_RATIO", "5")) CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n" counter_dropped_bytes = Counter( "camdetect_dropped_bytes", "Bytes that were not not handled or part of actual JPEG frames") counter_rx_bytes = Counter( "camdetect_rx_bytes", "Bytes received over HTTP stream") counter_tx_bytes = Counter( "camdetect_tx_bytes", "Bytes transmitted over HTTP streams") counter_rx_frames = Counter( "camdetect_rx_frames", "Frames received") counter_tx_frames = Counter( "camdetect_tx_frames", "Frames transmitted") counter_tx_events = Counter( "camdetect_tx_events", "Events emitted") counter_rx_chunks = Counter( "camdetect_rx_chunks", "HTTP chunks received") counter_errors = Counter( "camdetect_errors", "Upstream connection errors", ["exception"]) counter_movement_frames = Counter( "camdetect_movement_frames", "Frames with movement detected in them") counter_uploaded_frames = Counter( "camdetect_uploaded_frames", "Frames uploaded via S3") counter_upload_errors = Counter( "camdetect_upload_errors", "Frames upload errors related to S3") counter_upload_dropped_frames = Counter( "camdetect_upload_dropped_frames", "Frames that were dropped due to S3 upload queue being full") counter_download_dropped_frames = Counter( "camdetect_download_dropped_frames", "Frames that were downloaded from camera, but not processed") gauge_last_frame = Gauge( "camdetect_last_frame", "Timestamp of last frame") gauge_frame_motion_detected = Gauge( "camdetect_frame_motion_detected", "Motion detected in frame") gauge_event_active = Gauge( "camdetect_event_active", "Motion event in progress") gauge_total_blocks = Gauge( "camdetect_total_blocks", "Total DCT blocks") gauge_active_blocks = Gauge( "camdetect_active_blocks", "Total active, threshold exceeding DCT blocks") gauge_upload_queue_size = Gauge( "camdetect_upload_queue_size", "Number of frames awaiting to be uploaded via S3") gauge_download_queue_size = Gauge( "camdetect_download_queue_size", "Number of frames awaiting to be processed by motion detection loop") # Reset some gauges gauge_frame_motion_detected.set(0) gauge_upload_queue_size.set(0) gauge_download_queue_size.set(0) assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough async def upload(bucket, blob: bytes, event_id): """ Upload single JPEG blob to S3 bucket """ # Generate S3 path based on the JPEG blob SHA512 digest fp = hashlib.sha512(blob).hexdigest() path = "%s/%s/%s/%s.jpg" % (fp[:4], fp[4:8], fp[8:12], fp[12:]) await bucket.upload_fileobj(io.BytesIO(blob), path) # Add screenshot path to the event app.ctx.coll.update_one({ "_id": event_id }, { "$addToSet": { "screenshots": path, } }) # TODO: Handle 16MB maximum document size async def uploader(queue): """ Uploader task grabs JPEG blobs from upload queue and uploads them to S3 """ session = aioboto3.Session( aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) async with session.resource("s3", endpoint_url=S3_ENDPOINT_URL) as s3: bucket = await s3.Bucket(S3_BUCKET_NAME) while True: blob, event_id = await queue.get() await upload(bucket, blob, event_id) counter_uploaded_frames.inc() gauge_upload_queue_size.set(queue.qsize()) class ReferenceFrame(): """ ReferenceFrame keeps last 2 ^ size frames to infer the background scene compared to which motion is detected This is pretty much what background subtractor does in OpenCV, only difference is that we want have better performance instead of accuracy """ class NotEnoughFrames(Exception): pass def __init__(self, size=SLIDE_WINDOW): self.y = [] self.cumulative = None self.size = size def put(self, y): if self.cumulative is None: self.cumulative = np.copy(y) else: self.cumulative += y self.y.append(y) if len(self.y) > 2 ** self.size: self.cumulative -= self.y[0] self.y = self.y[1:] def get(self): if len(self.y) == 2 ** self.size: return self.cumulative >> SLIDE_WINDOW else: raise self.NotEnoughFrames() async def motion_detector(reference_frame, download_queue, upload_queue): """ Motion detector grabs JPEG blobs and Y channel coefficients from download queue, performs motion detection and pushes relevant JPEG blobs to upload queue going to S3 """ event_id = None differing_blocks = [] while True: dt, blob, y = await download_queue.get() app.ctx.last_frame = blob # Signal /bypass and /debug handlers about new frame app.ctx.event_frame.set() app.ctx.event_frame.clear() # Update metrics gauge_total_blocks.set(y.shape[0] * y.shape[1]) gauge_last_frame.set(dt.timestamp()) reference_frame.put(y) try: app.ctx.mask = cv2.inRange(cv2.absdiff(y, reference_frame.get()), 25, 65535) except ReferenceFrame.NotEnoughFrames: app.ctx.mask = None motion_detected = False else: # Implement dumb Kalman filter active_blocks = np.count_nonzero(app.ctx.mask) differing_blocks.append(active_blocks) differing_blocks[:] = differing_blocks[-10:] total_blocks = app.ctx.mask.shape[0] * app.ctx.mask.shape[1] threshold_blocks = THRESHOLD_RATIO * total_blocks / 100 average_blocks = sum(differing_blocks) / len(differing_blocks) motion_detected = average_blocks > threshold_blocks # Update metrics gauge_active_blocks.set(active_blocks) gauge_total_blocks.set(total_blocks) # Propagate SIGUSR1 signal handler if app.ctx.manual_trigger: print("Manually triggering event via SIGUSR1") motion_detected = True app.ctx.manual_trigger = False # Handle event start if motion_detected and not event_id: result = await app.ctx.coll.insert_one({ "timestamp": dt, "event": "motion-detected", "started": dt, "finished": dt + timedelta(minutes=2), "component": "camdetect", "source": SOURCE_NAME, "screenshots": [], "action": "event", }) app.ctx.event_id = event_id = result.inserted_id gauge_event_active.set(1) # Handle image upload if motion_detected and event_id: counter_movement_frames.inc() try: # Push JPEG blob into upload queue upload_queue.put_nowait((blob, event_id)) except asyncio.QueueFull: counter_upload_dropped_frames.inc() gauge_upload_queue_size.set(upload_queue.qsize()) # Handle event end if not motion_detected and event_id: app.ctx.coll.update_one({ "_id": event_id }, { "$set": { "finished": dt, } }) app.ctx.event_id = event_id = None gauge_event_active.set(0) async def download(resp, queue): """ This coroutine iterates over HTTP connection chunks assembling the original JPEG blobs and decodes the DCT coefficients of the frames """ buf = b"" print("Upstream connection opened with status:", resp.status) async for data, end_of_http_chunk in resp.content.iter_chunks(): counter_rx_bytes.inc(len(data)) if end_of_http_chunk: counter_rx_chunks.inc() if buf: # seek end marker = data.find(b"\xff\xd9") if marker < 0: buf += data continue else: # Assemble JPEG blob blob = buf + data[:marker+2] # Parse DCT coeffs and keep DCT coeffs only for Y channel y, _, _ = loads(blob) try: # Convert Y component to 16 bit for easier handling queue.put_nowait(( datetime.utcnow(), blob, np.int16(y[:, :, 0]))) except asyncio.QueueFull: counter_download_dropped_frames.inc() data = data[marker+2:] buf = b"" counter_rx_frames.inc() # seek begin marker = data.find(b"\xff\xd8") if marker >= 0: buf = data[marker:] else: counter_dropped_bytes.inc(len(data)) async def downloader(queue: asyncio.Queue): """ Downloader task connects to MJPEG source and pushes the JPEG frames to a queue """ while True: to = aiohttp.ClientTimeout(connect=5, sock_read=2) async with aiohttp.ClientSession(timeout=to) as session: print("Opening upstream connection to %s" % url) try: async with session.get(url) as resp: await download(resp, queue) except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) print("Caught exception %s" % j) counter_errors.labels(exception=j).inc() await asyncio.sleep(1) app = Sanic("camdetect") @app.route("/bypass") async def bypass_stream_wrapper(request): async def stream_camera(response): while True: await app.ctx.event_frame.wait() data = CHUNK_BOUNDARY + app.ctx.last_frame await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() return response.stream( stream_camera, content_type="multipart/x-mixed-replace; boundary=frame") @app.route("/debug") async def stream_wrapper(request): async def stream_camera(response): while True: await app.ctx.event_frame.wait() # Parse JPEG blob arr = np.frombuffer(app.ctx.last_frame, dtype=np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED) # Highlight green or red channel depending on whether # motion event is in progress or not channel = 2 if app.ctx.event_id else 1 if app.ctx.mask is not None: for y in range(0, app.ctx.mask.shape[0]): for x in range(0, app.ctx.mask.shape[1]): if app.ctx.mask[y][x] > 0: img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE, x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE, channel] = 255 # Compress modified frame as JPEG frame _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) data = CHUNK_BOUNDARY + jpeg.tobytes() await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() # Transmit as chunked MJPEG stream return response.stream( stream_camera, content_type="multipart/x-mixed-replace; boundary=frame" ) @app.route("/readyz") async def ready_check(request): if app.ctx.mask is not None: return response.text("OK") return response.text("Not enough frames", status=503) @app.route("/event") async def wrapper_stream_event(request): async def stream_event(response): while True: await app.ctx.event_frame.wait() if app.ctx.mask is not None: continue s = "data: " + json.dumps(app.ctx.mask.tolist()) + "\r\n\r\n" await response.write(s.encode()) counter_tx_events.inc() return stream(stream_event, content_type="text/event-stream") def handler(signum, frame): # SIGUSR1 handler for manually triggering an event app.ctx.manual_trigger = True @app.listener("before_server_start") async def setup_db(app, loop): app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database() app.ctx.coll = app.ctx.db[MONGO_COLLECTION] app.ctx.last_frame = None app.ctx.event_frame = asyncio.Event() app.ctx.event_id = None app.ctx.manual_trigger = False signal.signal(signal.SIGUSR1, handler) # Set up processing pipeline download_queue = asyncio.Queue() upload_queue = asyncio.Queue() asyncio.create_task(uploader( upload_queue)) asyncio.create_task(downloader( download_queue)) asyncio.create_task(motion_detector( ReferenceFrame(), download_queue, upload_queue)) monitor(app).expose_endpoint() try: app.run(host="0.0.0.0", port=5000) except KeyboardInterrupt: asyncio.get_event_loop().close()