commit 7510dc8b6f20cccb4112859f2cbeaa2f7776fbde Author: Lauri Võsandi Date: Sun Feb 13 10:09:18 2022 +0200 Initial commit diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..7e9134b --- /dev/null +++ b/.drone.yml @@ -0,0 +1,16 @@ +--- +kind: pipeline +type: kubernetes +name: default + +steps: +- name: docker + image: plugins/docker + settings: + repo: harbor.k-space.ee/${DRONE_REPO} + registry: harbor.k-space.ee + mtu: 1300 + username: + from_secret: docker_username + password: + from_secret: docker_password diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9be215a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM ubuntu +WORKDIR /app +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update && apt-get install -y \ + gstreamer1.0-libav \ + gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-base \ + gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-ugly \ + gstreamer1.0-tools \ + python3-gevent \ + python3-numpy \ + python3-opencv \ + python3-pip \ + && apt-get clean +RUN pip3 install prometheus_client aiohttp sanic==21.6.2 sanic_prometheus motor kubernetes +COPY camtiler.py /app +ENTRYPOINT /app/camtiler.py +EXPOSE 5001 +ENV PYTHONUNBUFFERED=1 diff --git a/camtiler.py b/camtiler.py new file mode 100755 index 0000000..605bf7c --- /dev/null +++ b/camtiler.py @@ -0,0 +1,185 @@ +#!/usr/bin/python3 + +import aiohttp +import asyncio +import cv2 +import io +import numpy as np +import os +import socket +import sys +from datetime import datetime +from kubernetes import client, config +from math import ceil +from prometheus_client import Counter, Gauge +from sanic import Sanic, response +from sanic.response import stream +from sanic_prometheus import monitor + +targets = [(j,j) for j in sys.argv[1:]] +if not targets: + # If no targets are specified, fall back to Kube API + config.load_kube_config() + v1 = client.CoreV1Api() + for i in v1.list_namespaced_service("camtiler", label_selector="component=camdetect").items: + app.ctx.frames[i.metadata.name] = None + url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port) + targets.append((i.metadata.name, url)) + +print("Running with following targets:") +for name, url in targets: + print(url) + +counter_dropped_bytes = Counter( + "camtiler_client_dropped_bytes", + "Bytes that were not not handled or part of actual JPEG frames") +counter_rx_bytes = Counter( + "camtiler_client_rx_bytes", + "Bytes received over HTTP stream") +counter_tx_bytes = Counter( + "camtiler_client_tx_bytes", + "Bytes transmitted over HTTP streams") +counter_rx_frames = Counter( + "camtiler_client_rx_frames", + "Frames received") +counter_tx_frames = Counter( + "camtiler_client_tx_frames", + "Frames transmitted") +counter_tx_events = Counter( + "camtiler_client_tx_events", + "Events emitted") +counter_eos = Counter( + "camtiler_client_eos", + "Count of End of Stream occurrences") +counter_timeout_errors = Counter( + "camtiler_client_timeout_errors", + "Upstream connection timeout errors") +counter_cancelled_errors = Counter( + "camtiler_client_cancelled_errors", + "Upstream connection cancelled errors") +counter_incomplete_read_errors = Counter( + "camtiler_client_incomplete_read_errors", + "Upstream incomplete read errors") + +app = Sanic("camtiler") + +STREAM_RESPONSE = \ +b""" +--frame +Content-Type: image/jpeg + +""" + +MERGED_SUBSAMPLE=3 + +# Consider camera gone after 5sec +TIMEOUT = 5.0 + +# Blank frame +BLANK = np.full((720 // MERGED_SUBSAMPLE, 1280 // MERGED_SUBSAMPLE, 3), 0) + + +def tilera(iterable, filler): + hstacking = ceil(len(iterable) ** 0.5) + vstacking = ceil(len(iterable) / hstacking) + rows = [] + for j in range(0, vstacking): + row = [] + first_frame = True + for i in range(0, hstacking): + try: + frame = iterable[i + hstacking * j] + except IndexError: + frame = filler + + if not first_frame: + row.append(np.full((len(frame), 1, 3), 0)) + first_frame = False + row.append(frame) + stacked = np.hstack(row) + rows.append(stacked) + return np.vstack(rows) + + +async def client_connect(name, resp): + buf = b"" + print("Upstream connection opened with status:", resp.status) + async for data, end_of_http_chunk in resp.content.iter_chunks(): + counter_rx_bytes.inc(len(data)) + if end_of_http_chunk: + counter_eos.inc() + break + + if buf: + # seek end + marker = data.find(b'\xff\xd9') + if marker < 0: + buf += data + continue + else: + blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8) + img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED) + app.ctx.frames[name] = img + data = data[marker+2:] + buf = b"" + counter_rx_frames.inc() + + # seek begin + marker = data.find(b'\xff\xd8') + if marker >= 0: + buf = data[marker:] + else: + counter_dropped_bytes.inc(len(data)) + + +async def client(name, url): + print("Opening upstream connection to %s" % url) + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + try: + await client_connect(name, resp) + except asyncio.TimeoutError: + counter_timeout_errors.inc() + except asyncio.CancelledError: + counter_cancelled_errors.inc() + except asyncio.IncompleteReadError: + counter_incomplete_read_errors.inc() + + +@app.route("/tiled") +async def stream_wrapper(request): + async def stream_tiled(response): + while True: + frames = [value for key, value in sorted(app.ctx.frames.items())] + img = tilera(frames, np.vstack([BLANK, np.zeros((30, 1280 // MERGED_SUBSAMPLE, 3))])) + _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) + data = STREAM_RESPONSE + jpeg.tobytes() + await response.write(data) + counter_tx_bytes.inc(len(data)) + counter_tx_frames.inc() + await asyncio.sleep(0.2) + + return response.stream( + stream_tiled, + content_type='multipart/x-mixed-replace; boundary=frame' + ) + +@app.listener('before_server_start') +async def setup_db(app, loop): + app.ctx.last_frame = None + app.ctx.event_frame = asyncio.Event() + app.ctx.frames = {} + app.ctx.avg = None + app.ctx.motion_frames = 0 + app.ctx.motion_start = None + app.ctx.motion_end = None + for name, url in targets: + app.ctx.frames[name] = None + task = asyncio.create_task(client(name, url)) + +monitor(app).expose_endpoint() + +try: + app.run(port=5001) +except KeyboardInterrupt: + asyncio.get_event_loop().close() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7ceddf4 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,21 @@ +version: '3.7' + +# All keys here are for dev instance only, do not put prod keys here +# To override and use inventory from prod use .env file + +services: + camtiler: + restart: always + network_mode: host + build: + context: . + entrypoint: /app/camtiler.py + command: http://127.0.0.1:8080?action=stream http://127.0.0.2:8080?action=stream http://127.0.0.3:8080?action=stream http://127.0.0.4:8080?action=stream + + mjpg-streamer: + network_mode: host + restart: always + image: kvaps/mjpg-streamer + devices: + - /dev/video0 + command: -i "/usr/lib64/input_uvc.so -y -d /dev/video0 -r 1280x720 -f 30" -o "output_http.so"