#!/usr/bin/python3 import aiohttp import asyncio import cv2 import numpy as np import os import sys from kubernetes import client, config from math import ceil from prometheus_client import Counter, Gauge from sanic import Sanic, response from sanic_prometheus import monitor SERVICE_NAMESPACE = os.getenv("SERVICE_NAMESPACE", "camtiler") SERVICE_LABEL_SELECTOR = os.getenv("SERVICE_LABEL_SELECTOR", "component=camera-motion-detect") targets = [(j, j) for j in sys.argv[1:]] if not targets: # If no targets are specified, fall back to Kube API config.load_incluster_config() v1 = client.CoreV1Api() for i in v1.list_namespaced_service(SERVICE_NAMESPACE, label_selector=SERVICE_LABEL_SELECTOR).items: url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port) targets.append((i.metadata.name, url)) print("Running with following targets:") for name, url in targets: print(url) GIT_COMMIT = os.getenv("GIT_COMMIT", "null") GIT_COMMIT_TIMESTAMP = os.getenv("GIT_COMMIT_TIMESTAMP", "null") counter_dropped_bytes = Counter( "camtiler_client_dropped_bytes", "Bytes that were not not handled or part of actual JPEG frames") counter_rx_bytes = Counter( "camtiler_client_rx_bytes", "Bytes received over HTTP stream") counter_tx_bytes = Counter( "camtiler_client_tx_bytes", "Bytes transmitted over HTTP streams") counter_rx_chunks = Counter( "camtiler_client_rx_chunks", "HTTP chunks received") counter_rx_frames = Counter( "camtiler_client_rx_frames", "Frames received") counter_tx_frames = Counter( "camtiler_client_tx_frames", "Frames transmitted") counter_tx_events = Counter( "camtiler_client_tx_events", "Events emitted") counter_errors = Counter( "camtiler_errors", "Upstream connection errors", ["exception"]) gauge_build_info = Gauge( "docker_build_info", "Build info", ["git_commit", "git_commit_timestamp"]) gauge_build_info.labels( GIT_COMMIT, GIT_COMMIT_TIMESTAMP).set(1) app = Sanic("camtiler") STREAM_RESPONSE = b""" --frame Content-Type: image/jpeg """ MERGED_SUBSAMPLE = 3 # Blank frame BLANK = np.full((720, 1280, 3), 0) GREEN = np.full((720, 1280, 3), (0, 135, 0)) def tilera(iterable): hstacking = ceil(len(iterable) ** 0.5) vstacking = ceil(len(iterable) / hstacking) rows = [] for j in range(0, vstacking): row = [] first_frame = True for i in range(0, hstacking): try: tile = iterable[i + hstacking * j] except IndexError: tile = None try: name, frame = tile except TypeError: frame = BLANK if frame is None: msg = "Signal lost" frame = GREEN else: msg = "Live" frame = frame[::MERGED_SUBSAMPLE, ::MERGED_SUBSAMPLE] footer = np.zeros((30, len(frame[0]), 3)) if tile: cv2.putText(footer, msg, (10, 24), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255)) cv2.putText(footer, name, (10, 12), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255)) frame = np.vstack([frame, footer]) if not first_frame: row.append(np.full((len(frame), 1, 3), 0)) first_frame = False row.append(frame) stacked = np.hstack(row) rows.append(stacked) return np.vstack(rows) async def client_connect(name, resp): buf = b"" print("Upstream connection to %s opened with status: %d", (name, resp.status)) async for data, end_of_http_chunk in resp.content.iter_chunks(): if end_of_http_chunk: counter_rx_chunks.inc() if buf: # If we already have something in buffer, seek end in new data marker = data.find(b"\xff\xd9") if marker < 0: # If no end marker was found add it to buffer buf += data continue else: # If end marker was found, decode JPEG frame blob = np.frombuffer(buf + data[:marker + 2], dtype=np.uint8) img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED) app.ctx.frames[name] = img data = data[marker + 2:] buf = b"" counter_rx_frames.inc() # Seek begin in newly received data marker = data.find(b"\xff\xd8") if marker >= 0: data, buf = data[:marker], data[marker:] # Remaining data is effectively dropped counter_dropped_bytes.inc(len(data)) async def client(name, url): print("Opening upstream connection to %s" % url) kwargs = { "headers": { "User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP }, "timeout": aiohttp.ClientTimeout(connect=5, sock_read=2) } while True: app.ctx.frames[name] = None async with aiohttp.ClientSession(**kwargs) as session: try: async with session.get(url) as resp: await client_connect(name, resp) except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) print("Caught exception %s for %s" % (j, name)) counter_errors.labels(exception=j).inc() await asyncio.sleep(1) @app.route("/tiled") @app.route("/m/tiled") async def stream_wrapper(request): async def stream_tiled(response): while True: img = tilera(sorted(app.ctx.frames.items())) _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) data = STREAM_RESPONSE + jpeg.tobytes() await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() await asyncio.sleep(0.1) return response.stream( stream_tiled, content_type="multipart/x-mixed-replace; boundary=frame" ) @app.listener("before_server_start") async def setup_db(app, loop): app.ctx.event_frame = asyncio.Event() app.ctx.frames = {} app.ctx.avg = None app.ctx.motion_frames = 0 app.ctx.motion_start = None app.ctx.motion_end = None for name, url in targets: app.ctx.frames[name] = None asyncio.create_task(client(name, url)) monitor(app).expose_endpoint() try: app.run(host="0.0.0.0", port=5001) except KeyboardInterrupt: asyncio.get_event_loop().close()