camera-tiler/camtiler.py

210 lines
6.4 KiB
Python
Raw Normal View History

2022-02-13 08:09:18 +00:00
#!/usr/bin/python3
import aiohttp
import asyncio
import cv2
import numpy as np
import os
import sys
from kubernetes import client, config
from math import ceil
from prometheus_client import Counter, Gauge
from sanic import Sanic, response
from sanic_prometheus import monitor
2022-12-02 10:47:16 +00:00
SERVICE_NAMESPACE = os.getenv("SERVICE_NAMESPACE", "camtiler")
SERVICE_LABEL_SELECTOR = os.getenv("SERVICE_LABEL_SELECTOR", "component=camera-motion-detect")
2022-12-02 11:00:20 +00:00
targets = [(j, j) for j in sys.argv[1:]]
2022-02-13 08:09:18 +00:00
if not targets:
# If no targets are specified, fall back to Kube API
config.load_incluster_config()
2022-02-13 08:09:18 +00:00
v1 = client.CoreV1Api()
2022-12-02 10:47:16 +00:00
for i in v1.list_namespaced_service(SERVICE_NAMESPACE, label_selector=SERVICE_LABEL_SELECTOR).items:
2022-02-13 08:09:18 +00:00
url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port)
targets.append((i.metadata.name, url))
print("Running with following targets:")
for name, url in targets:
print(url)
2022-09-04 06:01:22 +00:00
GIT_COMMIT = os.getenv("GIT_COMMIT", "null")
GIT_COMMIT_TIMESTAMP = os.getenv("GIT_COMMIT_TIMESTAMP", "null")
2022-02-13 08:09:18 +00:00
counter_dropped_bytes = Counter(
"camtiler_client_dropped_bytes",
"Bytes that were not not handled or part of actual JPEG frames")
counter_rx_bytes = Counter(
"camtiler_client_rx_bytes",
"Bytes received over HTTP stream")
counter_tx_bytes = Counter(
"camtiler_client_tx_bytes",
"Bytes transmitted over HTTP streams")
2022-02-13 18:54:24 +00:00
counter_rx_chunks = Counter(
"camtiler_client_rx_chunks",
"HTTP chunks received")
2022-02-13 08:09:18 +00:00
counter_rx_frames = Counter(
"camtiler_client_rx_frames",
"Frames received")
counter_tx_frames = Counter(
"camtiler_client_tx_frames",
"Frames transmitted")
counter_tx_events = Counter(
"camtiler_client_tx_events",
"Events emitted")
2022-12-02 11:00:20 +00:00
counter_errors = Counter(
2022-02-13 20:54:40 +00:00
"camtiler_errors",
"Upstream connection errors",
["exception"])
2022-09-04 06:01:22 +00:00
gauge_build_info = Gauge(
"docker_build_info",
"Build info",
["git_commit", "git_commit_timestamp"])
gauge_build_info.labels(
GIT_COMMIT,
GIT_COMMIT_TIMESTAMP).set(1)
2022-02-13 08:09:18 +00:00
app = Sanic("camtiler")
2022-12-02 11:00:20 +00:00
STREAM_RESPONSE = b"""
2022-02-13 08:09:18 +00:00
--frame
Content-Type: image/jpeg
"""
2022-12-02 11:00:20 +00:00
MERGED_SUBSAMPLE = 3
2022-02-13 08:09:18 +00:00
# Blank frame
2022-02-13 15:49:49 +00:00
BLANK = np.full((720, 1280, 3), 0)
2022-02-13 20:54:40 +00:00
GREEN = np.full((720, 1280, 3), (0, 135, 0))
2022-02-13 08:09:18 +00:00
2022-12-02 11:00:20 +00:00
2022-02-13 20:54:40 +00:00
def tilera(iterable):
2022-02-13 08:09:18 +00:00
hstacking = ceil(len(iterable) ** 0.5)
vstacking = ceil(len(iterable) / hstacking)
rows = []
for j in range(0, vstacking):
row = []
first_frame = True
for i in range(0, hstacking):
try:
2022-02-13 20:54:40 +00:00
tile = iterable[i + hstacking * j]
2022-02-13 08:09:18 +00:00
except IndexError:
2022-02-13 20:54:40 +00:00
tile = None
try:
name, frame = tile
except TypeError:
frame = BLANK
2022-02-13 15:49:49 +00:00
if frame is None:
2022-02-13 20:54:40 +00:00
msg = "Signal lost"
frame = GREEN
else:
msg = "Live"
2022-02-13 15:49:49 +00:00
frame = frame[::MERGED_SUBSAMPLE, ::MERGED_SUBSAMPLE]
2022-02-13 20:54:40 +00:00
footer = np.zeros((30, len(frame[0]), 3))
2022-02-13 08:09:18 +00:00
2022-02-13 20:54:40 +00:00
if tile:
cv2.putText(footer, msg, (10, 24), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255))
cv2.putText(footer, name, (10, 12), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255))
frame = np.vstack([frame, footer])
2022-02-13 08:09:18 +00:00
if not first_frame:
row.append(np.full((len(frame), 1, 3), 0))
first_frame = False
row.append(frame)
stacked = np.hstack(row)
rows.append(stacked)
return np.vstack(rows)
async def client_connect(name, resp):
buf = b""
2022-02-13 20:54:40 +00:00
print("Upstream connection to %s opened with status: %d", (name, resp.status))
2022-02-13 08:09:18 +00:00
async for data, end_of_http_chunk in resp.content.iter_chunks():
if end_of_http_chunk:
2022-02-13 18:54:24 +00:00
counter_rx_chunks.inc()
2022-02-13 08:09:18 +00:00
if buf:
2022-02-13 18:54:24 +00:00
# If we already have something in buffer, seek end in new data
2022-12-02 11:00:20 +00:00
marker = data.find(b"\xff\xd9")
2022-02-13 08:09:18 +00:00
if marker < 0:
2022-02-13 18:54:24 +00:00
# If no end marker was found add it to buffer
2022-02-13 08:09:18 +00:00
buf += data
continue
else:
2022-02-13 18:54:24 +00:00
# If end marker was found, decode JPEG frame
2022-12-02 11:00:20 +00:00
blob = np.frombuffer(buf + data[:marker + 2], dtype=np.uint8)
2022-02-13 08:09:18 +00:00
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
app.ctx.frames[name] = img
2022-12-02 11:00:20 +00:00
data = data[marker + 2:]
2022-02-13 08:09:18 +00:00
buf = b""
counter_rx_frames.inc()
2022-02-13 18:54:24 +00:00
# Seek begin in newly received data
2022-12-02 11:00:20 +00:00
marker = data.find(b"\xff\xd8")
2022-02-13 08:09:18 +00:00
if marker >= 0:
2022-02-13 18:54:24 +00:00
data, buf = data[:marker], data[marker:]
# Remaining data is effectively dropped
counter_dropped_bytes.inc(len(data))
2022-02-13 08:09:18 +00:00
async def client(name, url):
print("Opening upstream connection to %s" % url)
2022-12-02 11:00:20 +00:00
kwargs = {
"headers": {
2022-09-04 06:01:22 +00:00
"User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP
},
2022-12-02 11:00:20 +00:00
"timeout": aiohttp.ClientTimeout(connect=5, sock_read=2)
}
2022-02-13 18:54:24 +00:00
while True:
2022-02-13 20:54:40 +00:00
app.ctx.frames[name] = None
2022-09-04 06:01:22 +00:00
async with aiohttp.ClientSession(**kwargs) as session:
2022-02-13 08:09:18 +00:00
try:
2022-02-13 20:54:40 +00:00
async with session.get(url) as resp:
await client_connect(name, resp)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
2022-12-02 11:00:20 +00:00
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
print("Caught exception %s for %s" % (j, name))
counter_errors.labels(exception=j).inc()
2022-02-13 20:54:40 +00:00
await asyncio.sleep(1)
2022-02-13 08:09:18 +00:00
@app.route("/tiled")
2023-01-25 07:41:35 +00:00
@app.route("/m/tiled")
2022-02-13 08:09:18 +00:00
async def stream_wrapper(request):
async def stream_tiled(response):
while True:
2022-02-13 20:54:40 +00:00
img = tilera(sorted(app.ctx.frames.items()))
2022-02-13 08:09:18 +00:00
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
data = STREAM_RESPONSE + jpeg.tobytes()
await response.write(data)
counter_tx_bytes.inc(len(data))
counter_tx_frames.inc()
2022-02-13 20:54:40 +00:00
await asyncio.sleep(0.1)
2022-02-13 08:09:18 +00:00
return response.stream(
stream_tiled,
2022-12-02 11:00:20 +00:00
content_type="multipart/x-mixed-replace; boundary=frame"
2022-02-13 08:09:18 +00:00
)
2022-12-02 11:00:20 +00:00
@app.listener("before_server_start")
2022-02-13 08:09:18 +00:00
async def setup_db(app, loop):
app.ctx.event_frame = asyncio.Event()
app.ctx.frames = {}
app.ctx.avg = None
app.ctx.motion_frames = 0
app.ctx.motion_start = None
app.ctx.motion_end = None
for name, url in targets:
app.ctx.frames[name] = None
2022-12-02 11:00:20 +00:00
asyncio.create_task(client(name, url))
2022-02-13 08:09:18 +00:00
monitor(app).expose_endpoint()
try:
2022-02-13 11:41:42 +00:00
app.run(host="0.0.0.0", port=5001)
2022-02-13 08:09:18 +00:00
except KeyboardInterrupt:
asyncio.get_event_loop().close()