From b3486bafc2bac9ea0d418269b4a73086d32645ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Sat, 12 Feb 2022 20:44:20 +0200 Subject: [PATCH] Initial commit --- .drone.yml | 16 +++ .gitignore | 1 + Dockerfile | 22 +++++ camdetect.py | 241 +++++++++++++++++++++++++++++++++++++++++++++ docker-compose.yml | 58 +++++++++++ 5 files changed, 338 insertions(+) create mode 100644 .drone.yml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100755 camdetect.py create mode 100644 docker-compose.yml diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..7e9134b --- /dev/null +++ b/.drone.yml @@ -0,0 +1,16 @@ +--- +kind: pipeline +type: kubernetes +name: default + +steps: +- name: docker + image: plugins/docker + settings: + repo: harbor.k-space.ee/${DRONE_REPO} + registry: harbor.k-space.ee + mtu: 1300 + username: + from_secret: docker_username + password: + from_secret: docker_password diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c49bd7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c408446 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM ubuntu +WORKDIR /app +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update && apt-get install -y \ + gstreamer1.0-libav \ + gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-base \ + gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-ugly \ + gstreamer1.0-tools \ + libjpeg-dev \ + python3-gevent \ + python3-numpy \ + python3-opencv \ + python3-flask \ + python3-pip \ + && apt-get clean +RUN pip3 install boto3 prometheus_client pymongo==3.12.2 aiohttp jpeg2dct sanic==21.6.2 sanic_prometheus motor +COPY camdetect.py /app +ENTRYPOINT /app/camdetect.py +EXPOSE 8000 +ENV PYTHONUNBUFFERED=1 diff --git a/camdetect.py b/camdetect.py new file mode 100755 index 0000000..7d2fcb9 --- /dev/null +++ b/camdetect.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 +import aiohttp +import asyncio +import cv2 +import io +import numpy as np +import os +import json +import socket +import sys +from datetime import datetime +from jpeg2dct.numpy import load, loads +from prometheus_client import Counter, Gauge +from sanic import Sanic, response +from sanic.response import stream +from sanic_prometheus import monitor + +_, url = sys.argv + +MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") +FQDN = socket.getfqdn() +SLIDE_WINDOW = 2 +DCT_BLOCK_SIZE = 8 + +# How many blocks have changes to consider movement in frame +THRESHOLD_BLOCKS = 20 +THRESHOLD_MOTION_START = 2 + +counter_dropped_bytes = Counter( + "camtiler_client_dropped_bytes", + "Bytes that were not not handled or part of actual JPEG frames") +counter_rx_bytes = Counter( + "camtiler_client_rx_bytes", + "Bytes received over HTTP stream") +counter_tx_bytes = Counter( + "camtiler_client_tx_bytes", + "Bytes transmitted over HTTP streams") +counter_rx_frames = Counter( + "camtiler_client_rx_frames", + "Frames received") +counter_tx_frames = Counter( + "camtiler_client_tx_frames", + "Frames transmitted") +counter_tx_events = Counter( + "camtiler_client_tx_events", + "Events emitted") +counter_eos = Counter( + "camtiler_client_eos", + "Count of End of Stream occurrences") +counter_timeout_errors = Counter( + "camtiler_client_timeout_errors", + "Upstream connection timeout errors") +counter_cancelled_errors = Counter( + "camtiler_client_cancelled_errors", + "Upstream connection cancelled errors") +counter_incomplete_read_errors = Counter( + "camtiler_client_incomplete_read_errors", + "Upstream incomplete read errors") +counter_movement_frames = Counter( + "camtiler_client_movement_frames", + "Frames with movement detected in them") + +gauge_total_blocks = Gauge( + "camtiler_client_total_blocks", + "Total DCT blocks") +gauge_active_blocks = Gauge( + "camtiler_client_active_blocks", + "Total active, threshold exceeding DCT blocks") + +class Frame(object): + def __init__(self, blob): + self.blob = blob + self.y, self.cb, self.cr = loads(blob) + self.mask = np.int16(self.y[:,:,0]) + +async def client_connect(resp): + buf = b"" + print("Upstream connection opened with status:", resp.status) + async for data, end_of_http_chunk in resp.content.iter_chunks(): + counter_rx_bytes.inc(len(data)) + if end_of_http_chunk: + counter_eos.inc() + break + + if buf: + # seek end + marker = data.find(b'\xff\xd9') + if marker < 0: + buf += data + continue + else: + app.ctx.last_frame = Frame(buf + data[:marker+2]) + + reference = app.ctx.last_frame.mask + app.ctx.frames.append(reference) + if app.ctx.avg is None: + app.ctx.avg = np.copy(reference) + else: + app.ctx.avg += reference + + if len(app.ctx.frames) > 2 ** SLIDE_WINDOW: + app.ctx.avg -= app.ctx.frames[0] + app.ctx.frames = app.ctx.frames[1:] + + if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: + app.ctx.thresh = cv2.inRange(cv2.absdiff(app.ctx.last_frame.mask, app.ctx.avg >> SLIDE_WINDOW), 25, 65535) + else: + app.ctx.thresh = None + gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] * app.ctx.last_frame.mask.shape[1]) + + movement_detected = False + if app.ctx.thresh is not None: + differing_blocks = np.count_nonzero(app.ctx.thresh) + gauge_active_blocks.set(differing_blocks) + if differing_blocks > THRESHOLD_BLOCKS: + counter_movement_frames.inc() + movement_detected = True + + if movement_detected: + if app.ctx.motion_frames < 30: + app.ctx.motion_frames += 1 + else: + if app.ctx.motion_frames > 0: + app.ctx.motion_frames -= 1 + + if app.ctx.motion_frames > 20: + if not app.ctx.motion_start: + app.ctx.motion_start = datetime.utcnow() + print("Movement start") + elif app.ctx.motion_frames < 5: + app.ctx.motion_start = None + print("Movement end") + + app.ctx.event_frame.set() + app.ctx.event_frame.clear() + + data = data[marker+2:] + buf = b"" + counter_rx_frames.inc() + + # seek begin + marker = data.find(b'\xff\xd8') + if marker >= 0: + buf = data[marker:] + else: + counter_dropped_bytes.inc(len(data)) + + +async def client(): + print("Opening upstream connection...") + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + try: + await client_connect(resp) + except asyncio.TimeoutError: + counter_timeout_errors.inc() + except asyncio.CancelledError: + counter_cancelled_errors.inc() + except asyncio.IncompleteReadError: + counter_incomplete_read_errors.inc() + + +app = Sanic("lease") +app.config["WTF_CSRF_ENABLED"] = False + +STREAM_RESPONSE = \ +b""" +--frame +Content-Type: image/jpeg + +""" + +@app.route("/bypass") +async def bypass_stream_wrapper(request): + async def stream_camera(response): + while True: + await app.ctx.event_frame.wait() + data = STREAM_RESPONSE + app.ctx.last_frame.blob + await response.write(data) + counter_tx_bytes.inc(len(data)) + counter_tx_frames.inc() + + return response.stream( + stream_camera, + content_type='multipart/x-mixed-replace; boundary=frame' + ) + +@app.route("/debug") +async def stream_wrapper(request): + async def stream_camera(response): + while True: + await app.ctx.event_frame.wait() + img = cv2.imdecode(np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8), cv2.IMREAD_UNCHANGED) + if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: + for y in range(0, len(app.ctx.last_frame.mask)): + for x in range(0, len(app.ctx.last_frame.mask[0])): + if app.ctx.thresh[y][x] > 0: + img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE,x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE,2] = 255 + + _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) + data = STREAM_RESPONSE + jpeg.tobytes() + await response.write(data) + counter_tx_bytes.inc(len(data)) + counter_tx_frames.inc() + + return response.stream( + stream_camera, + content_type='multipart/x-mixed-replace; boundary=frame' + ) + + +@app.route('/event') +async def wrapper_stream_event(request): + async def stream_event(response): + while True: + await app.ctx.event_frame.wait() + if len(app.ctx.frames) < 2 ** SLIDE_WINDOW: + continue + s = 'data: ' + json.dumps(app.ctx.thresh.tolist()) + '\r\n\r\n' + await response.write(s.encode()) + counter_tx_events.inc() + return stream(stream_event, content_type='text/event-stream') + + +@app.listener('before_server_start') +async def setup_db(app, loop): + app.ctx.last_frame = None + app.ctx.event_frame = asyncio.Event() + app.ctx.frames = [] + app.ctx.avg = None + app.ctx.motion_frames = 0 + app.ctx.motion_start = None + app.ctx.motion_end = None + task = asyncio.create_task(client()) + +monitor(app).expose_endpoint() + +try: + app.run(port=5000) +except KeyboardInterrupt: + asyncio.get_event_loop().close() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6c579cb --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,58 @@ +version: '3.7' + +# All keys here are for dev instance only, do not put prod keys here +# To override and use inventory from prod use .env file + +services: + camdetect: + restart: always + network_mode: host + build: + context: . + entrypoint: /app/camdetect.py + command: http://user:123456@127.0.0.1:8080?action=stream + environment: + - MJPEGSTREAMER_CREDENTIALS=user:123456 + env_file: .env + + mongoexpress: + restart: always + image: mongo-express + network_mode: host + environment: + - ME_CONFIG_MONGODB_ENABLE_ADMIN=true + - ME_CONFIG_MONGODB_SERVER=127.0.0.1 + - ME_CONFIG_MONGODB_AUTH_DATABASE=admin + + mongo: + network_mode: host + image: mongo:latest + volumes: + - ./mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro + command: mongod --replSet rs0 --bind_ip 127.0.0.1 + + prometheus: + network_mode: host + image: prom/prometheus:latest + command: + - --config.file=/config/prometheus.yml + volumes: + - ./config:/config:ro + + minio: + restart: always + network_mode: host + image: bitnami/minio:latest + environment: + - MINIO_ACCESS_KEY=kspace-mugshot + - MINIO_SECRET_KEY=2mSI6HdbJ8 + - MINIO_DEFAULT_BUCKETS=kspace-mugshot:download + - MINIO_CONSOLE_PORT_NUMBER=9001 + + mjpg-streamer: + network_mode: host + restart: always + image: kvaps/mjpg-streamer + devices: + - /dev/video0 + command: -i "/usr/lib64/input_uvc.so -y -d /dev/video0 -r 1280x720 -f 30" -o "output_http.so -c user:123456"