diff --git a/Dockerfile b/Dockerfile index 344c83b..deaba34 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ RUN apt-get update && apt-get install -y \ python3-flask \ python3-pip \ && apt-get clean -RUN pip3 install boto3 prometheus_client pymongo==3.12.2 aiohttp jpeg2dct sanic==21.6.2 sanic_prometheus motor +RUN pip3 install aioboto3 prometheus_client pymongo==3.12.2 aiohttp jpeg2dct sanic==21.6.2 sanic_prometheus motor COPY camdetect.py /app ENTRYPOINT /app/camdetect.py EXPOSE 5000 diff --git a/README.md b/README.md index f5f3b52..258debb 100644 --- a/README.md +++ b/README.md @@ -13,3 +13,16 @@ In a nutshell: - Exposes endpoint for distributing MJPEG stream inside the cluster, eg by the `camera-tiler` - Exposes endpoint for inspecting DCT blocks where motion has been detected + +# Developing + +Bundled `docker-compose.yml` brings up: + +* [Minio](http://localhost:9001/buckets/camdetect/browse) +* [Mongoexpress](http://localhost:8081/db/default/eventlog) + +To manually trigger event: + +``` +docker kill -sUSR1 camera-motion-detect_camdetect_1 +``` diff --git a/camdetect.py b/camdetect.py index b144ccb..1791514 100755 --- a/camdetect.py +++ b/camdetect.py @@ -1,30 +1,39 @@ #!/usr/bin/env python3 +import aioboto3 import aiohttp import asyncio import cv2 +import hashlib +import io +import json import numpy as np import os -import json +import signal import socket import sys -from datetime import datetime +from datetime import datetime, timedelta from jpeg2dct.numpy import loads +from motor.motor_asyncio import AsyncIOMotorClient from prometheus_client import Counter, Gauge from sanic import Sanic, response from sanic.response import stream from sanic_prometheus import monitor -from time import time _, url = sys.argv +AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "camdetect") +AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"] +S3_ENDPOINT_URL = os.environ["S3_ENDPOINT_URL"] +S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "camdetect") MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") -FQDN = socket.getfqdn() +MONGO_COLLECTION = os.getenv("MONGO_COLLETION", "eventlog") +SOURCE_NAME = os.environ["SOURCE_NAME"] SLIDE_WINDOW = 2 DCT_BLOCK_SIZE = 8 -# How many blocks have changes to consider movement in frame -THRESHOLD_BLOCKS = 20 -THRESHOLD_MOTION_START = 2 +# Percentage of blocks active to consider movement in whole frame +THRESHOLD_RATIO = int(os.getenv("THRESHOLD_RATIO", "5")) + CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n" counter_dropped_bytes = Counter( @@ -55,32 +64,218 @@ counter_errors = Counter( counter_movement_frames = Counter( "camdetect_movement_frames", "Frames with movement detected in them") +counter_uploaded_frames = Counter( + "camdetect_uploaded_frames", + "Frames uploaded via S3") +counter_upload_errors = Counter( + "camdetect_upload_errors", + "Frames upload errors related to S3") +counter_upload_dropped_frames = Counter( + "camdetect_upload_dropped_frames", + "Frames that were dropped due to S3 upload queue being full") +counter_download_dropped_frames = Counter( + "camdetect_download_dropped_frames", + "Frames that were downloaded from camera, but not processed") + gauge_last_frame = Gauge( "camdetect_last_frame", "Timestamp of last frame") +gauge_frame_motion_detected = Gauge( + "camdetect_frame_motion_detected", + "Motion detected in frame") +gauge_event_active = Gauge( + "camdetect_event_active", + "Motion event in progress") gauge_total_blocks = Gauge( "camdetect_total_blocks", "Total DCT blocks") gauge_active_blocks = Gauge( "camdetect_active_blocks", "Total active, threshold exceeding DCT blocks") +gauge_upload_queue_size = Gauge( + "camdetect_upload_queue_size", + "Number of frames awaiting to be uploaded via S3") +gauge_download_queue_size = Gauge( + "camdetect_download_queue_size", + "Number of frames awaiting to be processed by motion detection loop") + +# Reset some gauges +gauge_frame_motion_detected.set(0) +gauge_upload_queue_size.set(0) +gauge_download_queue_size.set(0) + +assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough -class Frame(object): - def __init__(self, blob): - self.blob = blob - self.y, self.cb, self.cr = loads(blob) - self.mask = np.int16(self.y[:, :, 0]) +async def upload(bucket, blob: bytes, event_id): + """ + Upload single JPEG blob to S3 bucket + """ + + # Generate S3 path based on the JPEG blob SHA512 digest + fp = hashlib.sha512(blob).hexdigest() + path = "%s/%s/%s/%s.jpg" % (fp[:4], fp[4:8], fp[8:12], fp[12:]) + await bucket.upload_fileobj(io.BytesIO(blob), path) + + # Add screenshot path to the event + app.ctx.coll.update_one({ + "_id": event_id + }, { + "$addToSet": { + "screenshots": path, + } + }) + + # TODO: Handle 16MB maximum document size -async def client_connect(resp): +async def uploader(queue): + """ + Uploader task grabs JPEG blobs from upload queue and uploads them to S3 + """ + session = aioboto3.Session( + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY) + async with session.resource("s3", endpoint_url=S3_ENDPOINT_URL) as s3: + bucket = await s3.Bucket(S3_BUCKET_NAME) + while True: + blob, event_id = await queue.get() + await upload(bucket, blob, event_id) + counter_uploaded_frames.inc() + gauge_upload_queue_size.set(queue.qsize()) + + +class ReferenceFrame(): + """ + ReferenceFrame keeps last 2 ^ size frames to infer the background scene + compared to which motion is detected + + This is pretty much what background subtractor does in OpenCV, + only difference is that we want have better performance instead of + accuracy + """ + class NotEnoughFrames(Exception): + pass + + def __init__(self, size=SLIDE_WINDOW): + self.y = [] + self.cumulative = None + self.size = size + + def put(self, y): + if self.cumulative is None: + self.cumulative = np.copy(y) + else: + self.cumulative += y + self.y.append(y) + if len(self.y) > 2 ** self.size: + self.cumulative -= self.y[0] + self.y = self.y[1:] + + def get(self): + if len(self.y) == 2 ** self.size: + return self.cumulative >> SLIDE_WINDOW + else: + raise self.NotEnoughFrames() + + +async def motion_detector(reference_frame, download_queue, upload_queue): + """ + Motion detector grabs JPEG blobs and Y channel coefficients + from download queue, performs motion detection and pushes relevant + JPEG blobs to upload queue going to S3 + """ + event_id = None + differing_blocks = [] + while True: + dt, blob, y = await download_queue.get() + app.ctx.last_frame = blob + + # Signal /bypass and /debug handlers about new frame + app.ctx.event_frame.set() + app.ctx.event_frame.clear() + + # Update metrics + gauge_total_blocks.set(y.shape[0] * y.shape[1]) + gauge_last_frame.set(dt.timestamp()) + + reference_frame.put(y) + try: + app.ctx.mask = cv2.inRange(cv2.absdiff(y, + reference_frame.get()), 25, 65535) + except ReferenceFrame.NotEnoughFrames: + app.ctx.mask = None + motion_detected = False + else: + # Implement dumb Kalman filter + active_blocks = np.count_nonzero(app.ctx.mask) + differing_blocks.append(active_blocks) + differing_blocks[:] = differing_blocks[-10:] + total_blocks = app.ctx.mask.shape[0] * app.ctx.mask.shape[1] + threshold_blocks = THRESHOLD_RATIO * total_blocks / 100 + average_blocks = sum(differing_blocks) / len(differing_blocks) + motion_detected = average_blocks > threshold_blocks + + # Update metrics + gauge_active_blocks.set(active_blocks) + gauge_total_blocks.set(total_blocks) + + # Propagate SIGUSR1 signal handler + if app.ctx.manual_trigger: + print("Manually triggering event via SIGUSR1") + motion_detected = True + app.ctx.manual_trigger = False + + # Handle event start + if motion_detected and not event_id: + result = await app.ctx.coll.insert_one({ + "timestamp": dt, + "event": "motion-detected", + "started": dt, + "finished": dt + timedelta(minutes=2), + "component": "camdetect", + "source": SOURCE_NAME, + "screenshots": [], + "action": "event", + }) + app.ctx.event_id = event_id = result.inserted_id + gauge_event_active.set(1) + + # Handle image upload + if motion_detected and event_id: + counter_movement_frames.inc() + try: + # Push JPEG blob into upload queue + upload_queue.put_nowait((blob, event_id)) + except asyncio.QueueFull: + counter_upload_dropped_frames.inc() + gauge_upload_queue_size.set(upload_queue.qsize()) + + # Handle event end + if not motion_detected and event_id: + app.ctx.coll.update_one({ + "_id": event_id + }, { + "$set": { + "finished": dt, + } + }) + app.ctx.event_id = event_id = None + gauge_event_active.set(0) + + +async def download(resp, queue): + """ + This coroutine iterates over HTTP connection chunks + assembling the original JPEG blobs and decodes the + DCT coefficients of the frames + """ buf = b"" print("Upstream connection opened with status:", resp.status) async for data, end_of_http_chunk in resp.content.iter_chunks(): counter_rx_bytes.inc(len(data)) if end_of_http_chunk: counter_rx_chunks.inc() - if buf: # seek end marker = data.find(b"\xff\xd9") @@ -88,55 +283,20 @@ async def client_connect(resp): buf += data continue else: - app.ctx.last_frame = Frame(buf + data[:marker+2]) - gauge_last_frame.set(time()) + # Assemble JPEG blob + blob = buf + data[:marker+2] - reference = app.ctx.last_frame.mask - app.ctx.frames.append(reference) - if app.ctx.avg is None: - app.ctx.avg = np.copy(reference) - else: - app.ctx.avg += reference - - if len(app.ctx.frames) > 2 ** SLIDE_WINDOW: - app.ctx.avg -= app.ctx.frames[0] - app.ctx.frames = app.ctx.frames[1:] - - if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: - app.ctx.thresh = cv2.inRange(cv2.absdiff( - app.ctx.last_frame.mask, - app.ctx.avg >> SLIDE_WINDOW), 25, 65535) - else: - app.ctx.thresh = None - gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] * - app.ctx.last_frame.mask.shape[1]) - - movement_detected = False - if app.ctx.thresh is not None: - differing_blocks = np.count_nonzero(app.ctx.thresh) - gauge_active_blocks.set(differing_blocks) - if differing_blocks > THRESHOLD_BLOCKS: - counter_movement_frames.inc() - movement_detected = True - - if movement_detected: - if app.ctx.motion_frames < 30: - app.ctx.motion_frames += 1 - else: - if app.ctx.motion_frames > 0: - app.ctx.motion_frames -= 1 - - if app.ctx.motion_frames > 20: - if not app.ctx.motion_start: - app.ctx.motion_start = datetime.utcnow() - print("Movement start") - elif app.ctx.motion_frames < 5: - app.ctx.motion_start = None - print("Movement end") - - app.ctx.event_frame.set() - app.ctx.event_frame.clear() + # Parse DCT coeffs and keep DCT coeffs only for Y channel + y, _, _ = loads(blob) + try: + # Convert Y component to 16 bit for easier handling + queue.put_nowait(( + datetime.utcnow(), + blob, + np.int16(y[:, :, 0]))) + except asyncio.QueueFull: + counter_download_dropped_frames.inc() data = data[marker+2:] buf = b"" counter_rx_frames.inc() @@ -149,23 +309,25 @@ async def client_connect(resp): counter_dropped_bytes.inc(len(data)) -async def client(): +async def downloader(queue: asyncio.Queue): + """ + Downloader task connects to MJPEG source and + pushes the JPEG frames to a queue + """ while True: to = aiohttp.ClientTimeout(connect=5, sock_read=2) async with aiohttp.ClientSession(timeout=to) as session: print("Opening upstream connection to %s" % url) try: async with session.get(url) as resp: - await client_connect(resp) + await download(resp, queue) except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) print("Caught exception %s" % j) counter_errors.labels(exception=j).inc() await asyncio.sleep(1) - -app = Sanic("lease") -app.config["WTF_CSRF_ENABLED"] = False +app = Sanic("camdetect") @app.route("/bypass") @@ -173,15 +335,13 @@ async def bypass_stream_wrapper(request): async def stream_camera(response): while True: await app.ctx.event_frame.wait() - data = CHUNK_BOUNDARY + app.ctx.last_frame.blob + data = CHUNK_BOUNDARY + app.ctx.last_frame await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() - return response.stream( stream_camera, - content_type="multipart/x-mixed-replace; boundary=frame" - ) + content_type="multipart/x-mixed-replace; boundary=frame") @app.route("/debug") @@ -189,21 +349,30 @@ async def stream_wrapper(request): async def stream_camera(response): while True: await app.ctx.event_frame.wait() - arr = np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8) - img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED) - if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: - for y in range(0, len(app.ctx.last_frame.mask)): - for x in range(0, len(app.ctx.last_frame.mask[0])): - if app.ctx.thresh[y][x] > 0: - img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE, - x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE, 2] = 255 + # Parse JPEG blob + arr = np.frombuffer(app.ctx.last_frame, dtype=np.uint8) + img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED) + + # Highlight green or red channel depending on whether + # motion event is in progress or not + channel = 2 if app.ctx.event_id else 1 + if app.ctx.mask is not None: + for y in range(0, app.ctx.mask.shape[0]): + for x in range(0, app.ctx.mask.shape[1]): + if app.ctx.mask[y][x] > 0: + img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE, + x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE, + channel] = 255 + + # Compress modified frame as JPEG frame _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) data = CHUNK_BOUNDARY + jpeg.tobytes() await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() + # Transmit as chunked MJPEG stream return response.stream( stream_camera, content_type="multipart/x-mixed-replace; boundary=frame" @@ -212,7 +381,7 @@ async def stream_wrapper(request): @app.route("/readyz") async def ready_check(request): - if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: + if app.ctx.mask is not None: return response.text("OK") return response.text("Not enough frames", status=503) @@ -222,24 +391,41 @@ async def wrapper_stream_event(request): async def stream_event(response): while True: await app.ctx.event_frame.wait() - if len(app.ctx.frames) < 2 ** SLIDE_WINDOW: + if app.ctx.mask is not None: continue - s = "data: " + json.dumps(app.ctx.thresh.tolist()) + "\r\n\r\n" + s = "data: " + json.dumps(app.ctx.mask.tolist()) + "\r\n\r\n" await response.write(s.encode()) counter_tx_events.inc() return stream(stream_event, content_type="text/event-stream") +def handler(signum, frame): + # SIGUSR1 handler for manually triggering an event + app.ctx.manual_trigger = True + + @app.listener("before_server_start") async def setup_db(app, loop): + app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database() + app.ctx.coll = app.ctx.db[MONGO_COLLECTION] app.ctx.last_frame = None app.ctx.event_frame = asyncio.Event() - app.ctx.frames = [] - app.ctx.avg = None - app.ctx.motion_frames = 0 - app.ctx.motion_start = None - app.ctx.motion_end = None - asyncio.create_task(client()) + app.ctx.event_id = None + app.ctx.manual_trigger = False + signal.signal(signal.SIGUSR1, handler) + + # Set up processing pipeline + download_queue = asyncio.Queue() + upload_queue = asyncio.Queue() + asyncio.create_task(uploader( + upload_queue)) + asyncio.create_task(downloader( + download_queue)) + asyncio.create_task(motion_detector( + ReferenceFrame(), + download_queue, + upload_queue)) + monitor(app).expose_endpoint() diff --git a/docker-compose.yml b/docker-compose.yml index 7e28aa2..04823d7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,6 @@ version: '3.7' # All keys here are for dev instance only, do not put prod keys here -# To override and use inventory from prod use .env file services: camdetect: @@ -13,7 +12,9 @@ services: command: http://user:123456@127.0.0.1:8080?action=stream environment: - MJPEGSTREAMER_CREDENTIALS=user:123456 - env_file: .env + - AWS_SECRET_ACCESS_KEY=2mSI6HdbJ8 + - S3_ENDPOINT_URL=http://127.0.0.1:9000 + - SOURCE_NAME=dummy mongoexpress: restart: always @@ -42,16 +43,20 @@ services: - --config.file=/config/prometheus.yml volumes: - ./config:/config:ro + logging: + driver: none minio: restart: always network_mode: host image: bitnami/minio:latest environment: - - MINIO_ACCESS_KEY=kspace-mugshot + - MINIO_ACCESS_KEY=camdetect - MINIO_SECRET_KEY=2mSI6HdbJ8 - - MINIO_DEFAULT_BUCKETS=kspace-mugshot:download + - MINIO_DEFAULT_BUCKETS=camdetect - MINIO_CONSOLE_PORT_NUMBER=9001 + logging: + driver: none mjpg-streamer: network_mode: host