#!/usr/bin/env python3 import aiohttp import asyncio import cv2 import io import numpy as np import os import json import socket import sys from datetime import datetime from jpeg2dct.numpy import load, loads from prometheus_client import Counter, Gauge from sanic import Sanic, response from sanic.response import stream from sanic_prometheus import monitor _, url = sys.argv MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") FQDN = socket.getfqdn() SLIDE_WINDOW = 2 DCT_BLOCK_SIZE = 8 # How many blocks have changes to consider movement in frame THRESHOLD_BLOCKS = 20 THRESHOLD_MOTION_START = 2 counter_dropped_bytes = Counter( "camtiler_client_dropped_bytes", "Bytes that were not not handled or part of actual JPEG frames") counter_rx_bytes = Counter( "camtiler_client_rx_bytes", "Bytes received over HTTP stream") counter_tx_bytes = Counter( "camtiler_client_tx_bytes", "Bytes transmitted over HTTP streams") counter_rx_frames = Counter( "camtiler_client_rx_frames", "Frames received") counter_tx_frames = Counter( "camtiler_client_tx_frames", "Frames transmitted") counter_tx_events = Counter( "camtiler_client_tx_events", "Events emitted") counter_eos = Counter( "camtiler_client_eos", "Count of End of Stream occurrences") counter_timeout_errors = Counter( "camtiler_client_timeout_errors", "Upstream connection timeout errors") counter_cancelled_errors = Counter( "camtiler_client_cancelled_errors", "Upstream connection cancelled errors") counter_incomplete_read_errors = Counter( "camtiler_client_incomplete_read_errors", "Upstream incomplete read errors") counter_movement_frames = Counter( "camtiler_client_movement_frames", "Frames with movement detected in them") gauge_total_blocks = Gauge( "camtiler_client_total_blocks", "Total DCT blocks") gauge_active_blocks = Gauge( "camtiler_client_active_blocks", "Total active, threshold exceeding DCT blocks") class Frame(object): def __init__(self, blob): self.blob = blob self.y, self.cb, self.cr = loads(blob) self.mask = np.int16(self.y[:,:,0]) async def client_connect(resp): buf = b"" print("Upstream connection opened with status:", resp.status) async for data, end_of_http_chunk in resp.content.iter_chunks(): counter_rx_bytes.inc(len(data)) if end_of_http_chunk: counter_eos.inc() break if buf: # seek end marker = data.find(b'\xff\xd9') if marker < 0: buf += data continue else: app.ctx.last_frame = Frame(buf + data[:marker+2]) reference = app.ctx.last_frame.mask app.ctx.frames.append(reference) if app.ctx.avg is None: app.ctx.avg = np.copy(reference) else: app.ctx.avg += reference if len(app.ctx.frames) > 2 ** SLIDE_WINDOW: app.ctx.avg -= app.ctx.frames[0] app.ctx.frames = app.ctx.frames[1:] if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: app.ctx.thresh = cv2.inRange(cv2.absdiff(app.ctx.last_frame.mask, app.ctx.avg >> SLIDE_WINDOW), 25, 65535) else: app.ctx.thresh = None gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] * app.ctx.last_frame.mask.shape[1]) movement_detected = False if app.ctx.thresh is not None: differing_blocks = np.count_nonzero(app.ctx.thresh) gauge_active_blocks.set(differing_blocks) if differing_blocks > THRESHOLD_BLOCKS: counter_movement_frames.inc() movement_detected = True if movement_detected: if app.ctx.motion_frames < 30: app.ctx.motion_frames += 1 else: if app.ctx.motion_frames > 0: app.ctx.motion_frames -= 1 if app.ctx.motion_frames > 20: if not app.ctx.motion_start: app.ctx.motion_start = datetime.utcnow() print("Movement start") elif app.ctx.motion_frames < 5: app.ctx.motion_start = None print("Movement end") app.ctx.event_frame.set() app.ctx.event_frame.clear() data = data[marker+2:] buf = b"" counter_rx_frames.inc() # seek begin marker = data.find(b'\xff\xd8') if marker >= 0: buf = data[marker:] else: counter_dropped_bytes.inc(len(data)) async def client(): print("Opening upstream connection...") async with aiohttp.ClientSession() as session: async with session.get(url) as resp: try: await client_connect(resp) except asyncio.TimeoutError: counter_timeout_errors.inc() except asyncio.CancelledError: counter_cancelled_errors.inc() except asyncio.IncompleteReadError: counter_incomplete_read_errors.inc() app = Sanic("lease") app.config["WTF_CSRF_ENABLED"] = False STREAM_RESPONSE = \ b""" --frame Content-Type: image/jpeg """ @app.route("/bypass") async def bypass_stream_wrapper(request): async def stream_camera(response): while True: await app.ctx.event_frame.wait() data = STREAM_RESPONSE + app.ctx.last_frame.blob await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() return response.stream( stream_camera, content_type='multipart/x-mixed-replace; boundary=frame' ) @app.route("/debug") async def stream_wrapper(request): async def stream_camera(response): while True: await app.ctx.event_frame.wait() img = cv2.imdecode(np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8), cv2.IMREAD_UNCHANGED) if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: for y in range(0, len(app.ctx.last_frame.mask)): for x in range(0, len(app.ctx.last_frame.mask[0])): if app.ctx.thresh[y][x] > 0: img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE,x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE,2] = 255 _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) data = STREAM_RESPONSE + jpeg.tobytes() await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() return response.stream( stream_camera, content_type='multipart/x-mixed-replace; boundary=frame' ) @app.route('/event') async def wrapper_stream_event(request): async def stream_event(response): while True: await app.ctx.event_frame.wait() if len(app.ctx.frames) < 2 ** SLIDE_WINDOW: continue s = 'data: ' + json.dumps(app.ctx.thresh.tolist()) + '\r\n\r\n' await response.write(s.encode()) counter_tx_events.inc() return stream(stream_event, content_type='text/event-stream') @app.listener('before_server_start') async def setup_db(app, loop): app.ctx.last_frame = None app.ctx.event_frame = asyncio.Event() app.ctx.frames = [] app.ctx.avg = None app.ctx.motion_frames = 0 app.ctx.motion_start = None app.ctx.motion_end = None task = asyncio.create_task(client()) monitor(app).expose_endpoint() try: app.run(port=5000) except KeyboardInterrupt: asyncio.get_event_loop().close()