210 lines
6.4 KiB
Python
Executable File
210 lines
6.4 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import aiohttp
|
|
import asyncio
|
|
import cv2
|
|
import io
|
|
import numpy as np
|
|
import os
|
|
import socket
|
|
import sys
|
|
from datetime import datetime
|
|
from kubernetes import client, config
|
|
from math import ceil
|
|
from prometheus_client import Counter, Gauge
|
|
from sanic import Sanic, response
|
|
from sanic.response import stream
|
|
from sanic_prometheus import monitor
|
|
|
|
targets = [(j,j) for j in sys.argv[1:]]
|
|
if not targets:
|
|
# If no targets are specified, fall back to Kube API
|
|
config.load_incluster_config()
|
|
v1 = client.CoreV1Api()
|
|
for i in v1.list_namespaced_service("camtiler", label_selector="component=camdetect").items:
|
|
url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port)
|
|
targets.append((i.metadata.name, url))
|
|
|
|
print("Running with following targets:")
|
|
for name, url in targets:
|
|
print(url)
|
|
|
|
GIT_COMMIT = os.getenv("GIT_COMMIT", "null")
|
|
GIT_COMMIT_TIMESTAMP = os.getenv("GIT_COMMIT_TIMESTAMP", "null")
|
|
|
|
counter_dropped_bytes = Counter(
|
|
"camtiler_client_dropped_bytes",
|
|
"Bytes that were not not handled or part of actual JPEG frames")
|
|
counter_rx_bytes = Counter(
|
|
"camtiler_client_rx_bytes",
|
|
"Bytes received over HTTP stream")
|
|
counter_tx_bytes = Counter(
|
|
"camtiler_client_tx_bytes",
|
|
"Bytes transmitted over HTTP streams")
|
|
counter_rx_chunks = Counter(
|
|
"camtiler_client_rx_chunks",
|
|
"HTTP chunks received")
|
|
counter_rx_frames = Counter(
|
|
"camtiler_client_rx_frames",
|
|
"Frames received")
|
|
counter_tx_frames = Counter(
|
|
"camtiler_client_tx_frames",
|
|
"Frames transmitted")
|
|
counter_tx_events = Counter(
|
|
"camtiler_client_tx_events",
|
|
"Events emitted")
|
|
counter_errors = Counter(
|
|
"camtiler_errors",
|
|
"Upstream connection errors",
|
|
["exception"])
|
|
gauge_build_info = Gauge(
|
|
"docker_build_info",
|
|
"Build info",
|
|
["git_commit", "git_commit_timestamp"])
|
|
gauge_build_info.labels(
|
|
GIT_COMMIT,
|
|
GIT_COMMIT_TIMESTAMP).set(1)
|
|
|
|
app = Sanic("camtiler")
|
|
|
|
STREAM_RESPONSE = \
|
|
b"""
|
|
--frame
|
|
Content-Type: image/jpeg
|
|
|
|
"""
|
|
|
|
MERGED_SUBSAMPLE=3
|
|
|
|
# Blank frame
|
|
BLANK = np.full((720, 1280, 3), 0)
|
|
GREEN = np.full((720, 1280, 3), (0, 135, 0))
|
|
|
|
def tilera(iterable):
|
|
hstacking = ceil(len(iterable) ** 0.5)
|
|
vstacking = ceil(len(iterable) / hstacking)
|
|
rows = []
|
|
for j in range(0, vstacking):
|
|
row = []
|
|
first_frame = True
|
|
for i in range(0, hstacking):
|
|
try:
|
|
tile = iterable[i + hstacking * j]
|
|
except IndexError:
|
|
tile = None
|
|
|
|
try:
|
|
name, frame = tile
|
|
except TypeError:
|
|
frame = BLANK
|
|
|
|
if frame is None:
|
|
msg = "Signal lost"
|
|
frame = GREEN
|
|
else:
|
|
msg = "Live"
|
|
frame = frame[::MERGED_SUBSAMPLE, ::MERGED_SUBSAMPLE]
|
|
footer = np.zeros((30, len(frame[0]), 3))
|
|
|
|
if tile:
|
|
cv2.putText(footer, msg, (10, 24), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255))
|
|
cv2.putText(footer, name, (10, 12), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255))
|
|
frame = np.vstack([frame, footer])
|
|
if not first_frame:
|
|
row.append(np.full((len(frame), 1, 3), 0))
|
|
first_frame = False
|
|
row.append(frame)
|
|
stacked = np.hstack(row)
|
|
rows.append(stacked)
|
|
return np.vstack(rows)
|
|
|
|
|
|
async def client_connect(name, resp):
|
|
buf = b""
|
|
print("Upstream connection to %s opened with status: %d", (name, resp.status))
|
|
async for data, end_of_http_chunk in resp.content.iter_chunks():
|
|
if end_of_http_chunk:
|
|
counter_rx_chunks.inc()
|
|
|
|
if buf:
|
|
# If we already have something in buffer, seek end in new data
|
|
marker = data.find(b'\xff\xd9')
|
|
if marker < 0:
|
|
# If no end marker was found add it to buffer
|
|
buf += data
|
|
continue
|
|
else:
|
|
# If end marker was found, decode JPEG frame
|
|
blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8)
|
|
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
|
|
app.ctx.frames[name] = img
|
|
data = data[marker+2:]
|
|
buf = b""
|
|
counter_rx_frames.inc()
|
|
|
|
# Seek begin in newly received data
|
|
marker = data.find(b'\xff\xd8')
|
|
if marker >= 0:
|
|
data, buf = data[:marker], data[marker:]
|
|
|
|
# Remaining data is effectively dropped
|
|
counter_dropped_bytes.inc(len(data))
|
|
|
|
|
|
async def client(name, url):
|
|
print("Opening upstream connection to %s" % url)
|
|
kwargs = dict(
|
|
headers = {
|
|
"User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP
|
|
},
|
|
skip_auto_headers = True,
|
|
timeout = aiohttp.ClientTimeout(connect=5, sock_read=2))
|
|
while True:
|
|
app.ctx.frames[name] = None
|
|
async with aiohttp.ClientSession(**kwargs) as session:
|
|
try:
|
|
async with session.get(url) as resp:
|
|
await client_connect(name, resp)
|
|
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
|
|
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
|
|
print("Caught exception %s for %s" % (j, name))
|
|
counter_errors.labels(exception=j).inc()
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
@app.route("/tiled")
|
|
async def stream_wrapper(request):
|
|
async def stream_tiled(response):
|
|
while True:
|
|
img = tilera(sorted(app.ctx.frames.items()))
|
|
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
|
|
data = STREAM_RESPONSE + jpeg.tobytes()
|
|
await response.write(data)
|
|
counter_tx_bytes.inc(len(data))
|
|
counter_tx_frames.inc()
|
|
await asyncio.sleep(0.1)
|
|
|
|
return response.stream(
|
|
stream_tiled,
|
|
content_type='multipart/x-mixed-replace; boundary=frame'
|
|
)
|
|
|
|
@app.listener('before_server_start')
|
|
async def setup_db(app, loop):
|
|
app.ctx.event_frame = asyncio.Event()
|
|
app.ctx.frames = {}
|
|
app.ctx.avg = None
|
|
app.ctx.motion_frames = 0
|
|
app.ctx.motion_start = None
|
|
app.ctx.motion_end = None
|
|
for name, url in targets:
|
|
app.ctx.frames[name] = None
|
|
task = asyncio.create_task(client(name, url))
|
|
|
|
monitor(app).expose_endpoint()
|
|
|
|
try:
|
|
app.run(host="0.0.0.0", port=5001)
|
|
except KeyboardInterrupt:
|
|
asyncio.get_event_loop().close()
|