#!/usr/bin/env python3 import aiohttp import asyncio import cv2 import numpy as np import os import json import socket import sys from datetime import datetime from jpeg2dct.numpy import loads from prometheus_client import Counter, Gauge from sanic import Sanic, response from sanic.response import stream from sanic_prometheus import monitor from time import time _, url = sys.argv MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") FQDN = socket.getfqdn() SLIDE_WINDOW = 2 DCT_BLOCK_SIZE = 8 # How many blocks have changes to consider movement in frame THRESHOLD_BLOCKS = 20 THRESHOLD_MOTION_START = 2 CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n" counter_dropped_bytes = Counter( "camdetect_dropped_bytes", "Bytes that were not not handled or part of actual JPEG frames") counter_rx_bytes = Counter( "camdetect_rx_bytes", "Bytes received over HTTP stream") counter_tx_bytes = Counter( "camdetect_tx_bytes", "Bytes transmitted over HTTP streams") counter_rx_frames = Counter( "camdetect_rx_frames", "Frames received") counter_tx_frames = Counter( "camdetect_tx_frames", "Frames transmitted") counter_tx_events = Counter( "camdetect_tx_events", "Events emitted") counter_rx_chunks = Counter( "camdetect_rx_chunks", "HTTP chunks received") counter_errors = Counter( "camdetect_errors", "Upstream connection errors", ["exception"]) counter_movement_frames = Counter( "camdetect_movement_frames", "Frames with movement detected in them") gauge_last_frame = Gauge( "camdetect_last_frame", "Timestamp of last frame") gauge_total_blocks = Gauge( "camdetect_total_blocks", "Total DCT blocks") gauge_active_blocks = Gauge( "camdetect_active_blocks", "Total active, threshold exceeding DCT blocks") class Frame(object): def __init__(self, blob): self.blob = blob self.y, self.cb, self.cr = loads(blob) self.mask = np.int16(self.y[:, :, 0]) async def client_connect(resp): buf = b"" print("Upstream connection opened with status:", resp.status) async for data, end_of_http_chunk in resp.content.iter_chunks(): counter_rx_bytes.inc(len(data)) if end_of_http_chunk: counter_rx_chunks.inc() if buf: # seek end marker = data.find(b"\xff\xd9") if marker < 0: buf += data continue else: app.ctx.last_frame = Frame(buf + data[:marker+2]) gauge_last_frame.set(time()) reference = app.ctx.last_frame.mask app.ctx.frames.append(reference) if app.ctx.avg is None: app.ctx.avg = np.copy(reference) else: app.ctx.avg += reference if len(app.ctx.frames) > 2 ** SLIDE_WINDOW: app.ctx.avg -= app.ctx.frames[0] app.ctx.frames = app.ctx.frames[1:] if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: app.ctx.thresh = cv2.inRange(cv2.absdiff( app.ctx.last_frame.mask, app.ctx.avg >> SLIDE_WINDOW), 25, 65535) else: app.ctx.thresh = None gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] * app.ctx.last_frame.mask.shape[1]) movement_detected = False if app.ctx.thresh is not None: differing_blocks = np.count_nonzero(app.ctx.thresh) gauge_active_blocks.set(differing_blocks) if differing_blocks > THRESHOLD_BLOCKS: counter_movement_frames.inc() movement_detected = True if movement_detected: if app.ctx.motion_frames < 30: app.ctx.motion_frames += 1 else: if app.ctx.motion_frames > 0: app.ctx.motion_frames -= 1 if app.ctx.motion_frames > 20: if not app.ctx.motion_start: app.ctx.motion_start = datetime.utcnow() print("Movement start") elif app.ctx.motion_frames < 5: app.ctx.motion_start = None print("Movement end") app.ctx.event_frame.set() app.ctx.event_frame.clear() data = data[marker+2:] buf = b"" counter_rx_frames.inc() # seek begin marker = data.find(b"\xff\xd8") if marker >= 0: buf = data[marker:] else: counter_dropped_bytes.inc(len(data)) async def client(): while True: to = aiohttp.ClientTimeout(connect=5, sock_read=2) async with aiohttp.ClientSession(timeout=to) as session: print("Opening upstream connection to %s" % url) try: async with session.get(url) as resp: await client_connect(resp) except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) print("Caught exception %s" % j) counter_errors.labels(exception=j).inc() await asyncio.sleep(1) app = Sanic("lease") app.config["WTF_CSRF_ENABLED"] = False @app.route("/bypass") async def bypass_stream_wrapper(request): async def stream_camera(response): while True: await app.ctx.event_frame.wait() data = CHUNK_BOUNDARY + app.ctx.last_frame.blob await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() return response.stream( stream_camera, content_type="multipart/x-mixed-replace; boundary=frame" ) @app.route("/debug") async def stream_wrapper(request): async def stream_camera(response): while True: await app.ctx.event_frame.wait() arr = np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED) if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: for y in range(0, len(app.ctx.last_frame.mask)): for x in range(0, len(app.ctx.last_frame.mask[0])): if app.ctx.thresh[y][x] > 0: img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE, x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE, 2] = 255 _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) data = CHUNK_BOUNDARY + jpeg.tobytes() await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() return response.stream( stream_camera, content_type="multipart/x-mixed-replace; boundary=frame" ) @app.route("/readyz") async def ready_check(request): if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: return response.text("OK") return response.text("Not enough frames", status=503) @app.route("/event") async def wrapper_stream_event(request): async def stream_event(response): while True: await app.ctx.event_frame.wait() if len(app.ctx.frames) < 2 ** SLIDE_WINDOW: continue s = "data: " + json.dumps(app.ctx.thresh.tolist()) + "\r\n\r\n" await response.write(s.encode()) counter_tx_events.inc() return stream(stream_event, content_type="text/event-stream") @app.listener("before_server_start") async def setup_db(app, loop): app.ctx.last_frame = None app.ctx.event_frame = asyncio.Event() app.ctx.frames = [] app.ctx.avg = None app.ctx.motion_frames = 0 app.ctx.motion_start = None app.ctx.motion_end = None asyncio.create_task(client()) monitor(app).expose_endpoint() try: app.run(host="0.0.0.0", port=5000) except KeyboardInterrupt: asyncio.get_event_loop().close()