185 lines
5.4 KiB
Python
Executable File
185 lines
5.4 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import aiohttp
|
|
import asyncio
|
|
import cv2
|
|
import io
|
|
import numpy as np
|
|
import os
|
|
import socket
|
|
import sys
|
|
from datetime import datetime
|
|
from kubernetes import client, config
|
|
from math import ceil
|
|
from prometheus_client import Counter, Gauge
|
|
from sanic import Sanic, response
|
|
from sanic.response import stream
|
|
from sanic_prometheus import monitor
|
|
|
|
targets = [(j,j) for j in sys.argv[1:]]
|
|
if not targets:
|
|
# If no targets are specified, fall back to Kube API
|
|
config.load_incluster_config()
|
|
v1 = client.CoreV1Api()
|
|
for i in v1.list_namespaced_service("camtiler", label_selector="component=camdetect").items:
|
|
url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port)
|
|
targets.append((i.metadata.name, url))
|
|
|
|
print("Running with following targets:")
|
|
for name, url in targets:
|
|
print(url)
|
|
|
|
counter_dropped_bytes = Counter(
|
|
"camtiler_client_dropped_bytes",
|
|
"Bytes that were not not handled or part of actual JPEG frames")
|
|
counter_rx_bytes = Counter(
|
|
"camtiler_client_rx_bytes",
|
|
"Bytes received over HTTP stream")
|
|
counter_tx_bytes = Counter(
|
|
"camtiler_client_tx_bytes",
|
|
"Bytes transmitted over HTTP streams")
|
|
counter_rx_frames = Counter(
|
|
"camtiler_client_rx_frames",
|
|
"Frames received")
|
|
counter_tx_frames = Counter(
|
|
"camtiler_client_tx_frames",
|
|
"Frames transmitted")
|
|
counter_tx_events = Counter(
|
|
"camtiler_client_tx_events",
|
|
"Events emitted")
|
|
counter_eos = Counter(
|
|
"camtiler_client_eos",
|
|
"Count of End of Stream occurrences")
|
|
counter_timeout_errors = Counter(
|
|
"camtiler_client_timeout_errors",
|
|
"Upstream connection timeout errors")
|
|
counter_cancelled_errors = Counter(
|
|
"camtiler_client_cancelled_errors",
|
|
"Upstream connection cancelled errors")
|
|
counter_incomplete_read_errors = Counter(
|
|
"camtiler_client_incomplete_read_errors",
|
|
"Upstream incomplete read errors")
|
|
|
|
app = Sanic("camtiler")
|
|
|
|
STREAM_RESPONSE = \
|
|
b"""
|
|
--frame
|
|
Content-Type: image/jpeg
|
|
|
|
"""
|
|
|
|
MERGED_SUBSAMPLE=3
|
|
|
|
# Consider camera gone after 5sec
|
|
TIMEOUT = 5.0
|
|
|
|
# Blank frame
|
|
BLANK = np.full((720 // MERGED_SUBSAMPLE, 1280 // MERGED_SUBSAMPLE, 3), 0)
|
|
|
|
|
|
def tilera(iterable, filler):
|
|
hstacking = ceil(len(iterable) ** 0.5)
|
|
vstacking = ceil(len(iterable) / hstacking)
|
|
rows = []
|
|
for j in range(0, vstacking):
|
|
row = []
|
|
first_frame = True
|
|
for i in range(0, hstacking):
|
|
try:
|
|
frame = iterable[i + hstacking * j]
|
|
except IndexError:
|
|
frame = filler
|
|
|
|
if not first_frame:
|
|
row.append(np.full((len(frame), 1, 3), 0))
|
|
first_frame = False
|
|
row.append(frame)
|
|
stacked = np.hstack(row)
|
|
rows.append(stacked)
|
|
return np.vstack(rows)
|
|
|
|
|
|
async def client_connect(name, resp):
|
|
buf = b""
|
|
print("Upstream connection opened with status:", resp.status)
|
|
async for data, end_of_http_chunk in resp.content.iter_chunks():
|
|
counter_rx_bytes.inc(len(data))
|
|
if end_of_http_chunk:
|
|
counter_eos.inc()
|
|
break
|
|
|
|
if buf:
|
|
# seek end
|
|
marker = data.find(b'\xff\xd9')
|
|
if marker < 0:
|
|
buf += data
|
|
continue
|
|
else:
|
|
blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8)
|
|
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
|
|
app.ctx.frames[name] = img
|
|
data = data[marker+2:]
|
|
buf = b""
|
|
counter_rx_frames.inc()
|
|
|
|
# seek begin
|
|
marker = data.find(b'\xff\xd8')
|
|
if marker >= 0:
|
|
buf = data[marker:]
|
|
else:
|
|
counter_dropped_bytes.inc(len(data))
|
|
|
|
|
|
async def client(name, url):
|
|
print("Opening upstream connection to %s" % url)
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
try:
|
|
await client_connect(name, resp)
|
|
except asyncio.TimeoutError:
|
|
counter_timeout_errors.inc()
|
|
except asyncio.CancelledError:
|
|
counter_cancelled_errors.inc()
|
|
except asyncio.IncompleteReadError:
|
|
counter_incomplete_read_errors.inc()
|
|
|
|
|
|
@app.route("/tiled")
|
|
async def stream_wrapper(request):
|
|
async def stream_tiled(response):
|
|
while True:
|
|
frames = [value for key, value in sorted(app.ctx.frames.items())]
|
|
img = tilera(frames, np.vstack([BLANK, np.zeros((30, 1280 // MERGED_SUBSAMPLE, 3))]))
|
|
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
|
|
data = STREAM_RESPONSE + jpeg.tobytes()
|
|
await response.write(data)
|
|
counter_tx_bytes.inc(len(data))
|
|
counter_tx_frames.inc()
|
|
await asyncio.sleep(0.2)
|
|
|
|
return response.stream(
|
|
stream_tiled,
|
|
content_type='multipart/x-mixed-replace; boundary=frame'
|
|
)
|
|
|
|
@app.listener('before_server_start')
|
|
async def setup_db(app, loop):
|
|
app.ctx.last_frame = None
|
|
app.ctx.event_frame = asyncio.Event()
|
|
app.ctx.frames = {}
|
|
app.ctx.avg = None
|
|
app.ctx.motion_frames = 0
|
|
app.ctx.motion_start = None
|
|
app.ctx.motion_end = None
|
|
for name, url in targets:
|
|
app.ctx.frames[name] = None
|
|
task = asyncio.create_task(client(name, url))
|
|
|
|
monitor(app).expose_endpoint()
|
|
|
|
try:
|
|
app.run(host="0.0.0.0", port=5001)
|
|
except KeyboardInterrupt:
|
|
asyncio.get_event_loop().close()
|