#!/usr/bin/python3 import aiohttp import asyncio import cv2 import io import numpy as np import os import socket import sys from datetime import datetime from kubernetes import client, config from math import ceil from prometheus_client import Counter, Gauge from sanic import Sanic, response from sanic.response import stream from sanic_prometheus import monitor SERVICE_NAMESPACE = os.getenv("SERVICE_NAMESPACE", "camtiler") SERVICE_LABEL_SELECTOR = os.getenv("SERVICE_LABEL_SELECTOR", "component=camera-motion-detect") targets = [(j,j) for j in sys.argv[1:]] if not targets: # If no targets are specified, fall back to Kube API config.load_incluster_config() v1 = client.CoreV1Api() for i in v1.list_namespaced_service(SERVICE_NAMESPACE, label_selector=SERVICE_LABEL_SELETOR).items: url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port) targets.append((i.metadata.name, url)) print("Running with following targets:") for name, url in targets: print(url) GIT_COMMIT = os.getenv("GIT_COMMIT", "null") GIT_COMMIT_TIMESTAMP = os.getenv("GIT_COMMIT_TIMESTAMP", "null") counter_dropped_bytes = Counter( "camtiler_client_dropped_bytes", "Bytes that were not not handled or part of actual JPEG frames") counter_rx_bytes = Counter( "camtiler_client_rx_bytes", "Bytes received over HTTP stream") counter_tx_bytes = Counter( "camtiler_client_tx_bytes", "Bytes transmitted over HTTP streams") counter_rx_chunks = Counter( "camtiler_client_rx_chunks", "HTTP chunks received") counter_rx_frames = Counter( "camtiler_client_rx_frames", "Frames received") counter_tx_frames = Counter( "camtiler_client_tx_frames", "Frames transmitted") counter_tx_events = Counter( "camtiler_client_tx_events", "Events emitted") counter_errors = Counter( "camtiler_errors", "Upstream connection errors", ["exception"]) gauge_build_info = Gauge( "docker_build_info", "Build info", ["git_commit", "git_commit_timestamp"]) gauge_build_info.labels( GIT_COMMIT, GIT_COMMIT_TIMESTAMP).set(1) app = Sanic("camtiler") STREAM_RESPONSE = \ b""" --frame Content-Type: image/jpeg """ MERGED_SUBSAMPLE=3 # Blank frame BLANK = np.full((720, 1280, 3), 0) GREEN = np.full((720, 1280, 3), (0, 135, 0)) def tilera(iterable): hstacking = ceil(len(iterable) ** 0.5) vstacking = ceil(len(iterable) / hstacking) rows = [] for j in range(0, vstacking): row = [] first_frame = True for i in range(0, hstacking): try: tile = iterable[i + hstacking * j] except IndexError: tile = None try: name, frame = tile except TypeError: frame = BLANK if frame is None: msg = "Signal lost" frame = GREEN else: msg = "Live" frame = frame[::MERGED_SUBSAMPLE, ::MERGED_SUBSAMPLE] footer = np.zeros((30, len(frame[0]), 3)) if tile: cv2.putText(footer, msg, (10, 24), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255)) cv2.putText(footer, name, (10, 12), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255)) frame = np.vstack([frame, footer]) if not first_frame: row.append(np.full((len(frame), 1, 3), 0)) first_frame = False row.append(frame) stacked = np.hstack(row) rows.append(stacked) return np.vstack(rows) async def client_connect(name, resp): buf = b"" print("Upstream connection to %s opened with status: %d", (name, resp.status)) async for data, end_of_http_chunk in resp.content.iter_chunks(): if end_of_http_chunk: counter_rx_chunks.inc() if buf: # If we already have something in buffer, seek end in new data marker = data.find(b'\xff\xd9') if marker < 0: # If no end marker was found add it to buffer buf += data continue else: # If end marker was found, decode JPEG frame blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8) img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED) app.ctx.frames[name] = img data = data[marker+2:] buf = b"" counter_rx_frames.inc() # Seek begin in newly received data marker = data.find(b'\xff\xd8') if marker >= 0: data, buf = data[:marker], data[marker:] # Remaining data is effectively dropped counter_dropped_bytes.inc(len(data)) async def client(name, url): print("Opening upstream connection to %s" % url) kwargs = dict( headers = { "User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP }, skip_auto_headers = True, timeout = aiohttp.ClientTimeout(connect=5, sock_read=2)) while True: app.ctx.frames[name] = None async with aiohttp.ClientSession(**kwargs) as session: try: async with session.get(url) as resp: await client_connect(name, resp) except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) print("Caught exception %s for %s" % (j, name)) counter_errors.labels(exception=j).inc() await asyncio.sleep(1) @app.route("/tiled") async def stream_wrapper(request): async def stream_tiled(response): while True: img = tilera(sorted(app.ctx.frames.items())) _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) data = STREAM_RESPONSE + jpeg.tobytes() await response.write(data) counter_tx_bytes.inc(len(data)) counter_tx_frames.inc() await asyncio.sleep(0.1) return response.stream( stream_tiled, content_type='multipart/x-mixed-replace; boundary=frame' ) @app.listener('before_server_start') async def setup_db(app, loop): app.ctx.event_frame = asyncio.Event() app.ctx.frames = {} app.ctx.avg = None app.ctx.motion_frames = 0 app.ctx.motion_start = None app.ctx.motion_end = None for name, url in targets: app.ctx.frames[name] = None task = asyncio.create_task(client(name, url)) monitor(app).expose_endpoint() try: app.run(host="0.0.0.0", port=5001) except KeyboardInterrupt: asyncio.get_event_loop().close()