camera-tiler/camtiler.py

210 lines
6.4 KiB
Python
Executable File

#!/usr/bin/python3
import aiohttp
import asyncio
import cv2
import numpy as np
import os
import sys
from kubernetes import client, config
from math import ceil
from prometheus_client import Counter, Gauge
from sanic import Sanic, response
from sanic_prometheus import monitor
SERVICE_NAMESPACE = os.getenv("SERVICE_NAMESPACE", "camtiler")
SERVICE_LABEL_SELECTOR = os.getenv("SERVICE_LABEL_SELECTOR", "component=camera-motion-detect")
targets = [(j, j) for j in sys.argv[1:]]
if not targets:
# If no targets are specified, fall back to Kube API
config.load_incluster_config()
v1 = client.CoreV1Api()
for i in v1.list_namespaced_service(SERVICE_NAMESPACE, label_selector=SERVICE_LABEL_SELECTOR).items:
url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port)
targets.append((i.metadata.name, url))
print("Running with following targets:")
for name, url in targets:
print(url)
GIT_COMMIT = os.getenv("GIT_COMMIT", "null")
GIT_COMMIT_TIMESTAMP = os.getenv("GIT_COMMIT_TIMESTAMP", "null")
counter_dropped_bytes = Counter(
"camtiler_client_dropped_bytes",
"Bytes that were not not handled or part of actual JPEG frames")
counter_rx_bytes = Counter(
"camtiler_client_rx_bytes",
"Bytes received over HTTP stream")
counter_tx_bytes = Counter(
"camtiler_client_tx_bytes",
"Bytes transmitted over HTTP streams")
counter_rx_chunks = Counter(
"camtiler_client_rx_chunks",
"HTTP chunks received")
counter_rx_frames = Counter(
"camtiler_client_rx_frames",
"Frames received")
counter_tx_frames = Counter(
"camtiler_client_tx_frames",
"Frames transmitted")
counter_tx_events = Counter(
"camtiler_client_tx_events",
"Events emitted")
counter_errors = Counter(
"camtiler_errors",
"Upstream connection errors",
["exception"])
gauge_build_info = Gauge(
"docker_build_info",
"Build info",
["git_commit", "git_commit_timestamp"])
gauge_build_info.labels(
GIT_COMMIT,
GIT_COMMIT_TIMESTAMP).set(1)
app = Sanic("camtiler")
STREAM_RESPONSE = b"""
--frame
Content-Type: image/jpeg
"""
MERGED_SUBSAMPLE = 3
# Blank frame
BLANK = np.full((720, 1280, 3), 0)
GREEN = np.full((720, 1280, 3), (0, 135, 0))
def tilera(iterable):
hstacking = ceil(len(iterable) ** 0.5)
vstacking = ceil(len(iterable) / hstacking)
rows = []
for j in range(0, vstacking):
row = []
first_frame = True
for i in range(0, hstacking):
try:
tile = iterable[i + hstacking * j]
except IndexError:
tile = None
try:
name, frame = tile
except TypeError:
frame = BLANK
if frame is None:
msg = "Signal lost"
frame = GREEN
else:
msg = "Live"
frame = frame[::MERGED_SUBSAMPLE, ::MERGED_SUBSAMPLE]
footer = np.zeros((30, len(frame[0]), 3))
if tile:
cv2.putText(footer, msg, (10, 24), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255))
cv2.putText(footer, name, (10, 12), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255))
frame = np.vstack([frame, footer])
if not first_frame:
row.append(np.full((len(frame), 1, 3), 0))
first_frame = False
row.append(frame)
stacked = np.hstack(row)
rows.append(stacked)
return np.vstack(rows)
async def client_connect(name, resp):
buf = b""
print("Upstream connection to %s opened with status: %d", (name, resp.status))
async for data, end_of_http_chunk in resp.content.iter_chunks():
if end_of_http_chunk:
counter_rx_chunks.inc()
if buf:
# If we already have something in buffer, seek end in new data
marker = data.find(b"\xff\xd9")
if marker < 0:
# If no end marker was found add it to buffer
buf += data
continue
else:
# If end marker was found, decode JPEG frame
blob = np.frombuffer(buf + data[:marker + 2], dtype=np.uint8)
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
app.ctx.frames[name] = img
data = data[marker + 2:]
buf = b""
counter_rx_frames.inc()
# Seek begin in newly received data
marker = data.find(b"\xff\xd8")
if marker >= 0:
data, buf = data[:marker], data[marker:]
# Remaining data is effectively dropped
counter_dropped_bytes.inc(len(data))
async def client(name, url):
print("Opening upstream connection to %s" % url)
kwargs = {
"headers": {
"User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP
},
"timeout": aiohttp.ClientTimeout(connect=5, sock_read=2)
}
while True:
app.ctx.frames[name] = None
async with aiohttp.ClientSession(**kwargs) as session:
try:
async with session.get(url) as resp:
await client_connect(name, resp)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
print("Caught exception %s for %s" % (j, name))
counter_errors.labels(exception=j).inc()
await asyncio.sleep(1)
@app.route("/tiled")
@app.route("/m/tiled")
async def stream_wrapper(request):
async def stream_tiled(response):
while True:
img = tilera(sorted(app.ctx.frames.items()))
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
data = STREAM_RESPONSE + jpeg.tobytes()
await response.write(data)
counter_tx_bytes.inc(len(data))
counter_tx_frames.inc()
await asyncio.sleep(0.1)
return response.stream(
stream_tiled,
content_type="multipart/x-mixed-replace; boundary=frame"
)
@app.listener("before_server_start")
async def setup_db(app, loop):
app.ctx.event_frame = asyncio.Event()
app.ctx.frames = {}
app.ctx.avg = None
app.ctx.motion_frames = 0
app.ctx.motion_start = None
app.ctx.motion_end = None
for name, url in targets:
app.ctx.frames[name] = None
asyncio.create_task(client(name, url))
monitor(app).expose_endpoint()
try:
app.run(host="0.0.0.0", port=5001)
except KeyboardInterrupt:
asyncio.get_event_loop().close()