Initial commit
All checks were successful
continuous-integration/drone Build is passing

This commit is contained in:
Lauri Võsandi 2022-02-13 10:09:18 +02:00 committed by Lauri Võsandi
commit 7510dc8b6f
4 changed files with 242 additions and 0 deletions

16
.drone.yml Normal file
View File

@ -0,0 +1,16 @@
---
kind: pipeline
type: kubernetes
name: default
steps:
- name: docker
image: plugins/docker
settings:
repo: harbor.k-space.ee/${DRONE_REPO}
registry: harbor.k-space.ee
mtu: 1300
username:
from_secret: docker_username
password:
from_secret: docker_password

20
Dockerfile Normal file
View File

@ -0,0 +1,20 @@
FROM ubuntu
WORKDIR /app
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
gstreamer1.0-libav \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-ugly \
gstreamer1.0-tools \
python3-gevent \
python3-numpy \
python3-opencv \
python3-pip \
&& apt-get clean
RUN pip3 install prometheus_client aiohttp sanic==21.6.2 sanic_prometheus motor kubernetes
COPY camtiler.py /app
ENTRYPOINT /app/camtiler.py
EXPOSE 5001
ENV PYTHONUNBUFFERED=1

185
camtiler.py Executable file
View File

@ -0,0 +1,185 @@
#!/usr/bin/python3
import aiohttp
import asyncio
import cv2
import io
import numpy as np
import os
import socket
import sys
from datetime import datetime
from kubernetes import client, config
from math import ceil
from prometheus_client import Counter, Gauge
from sanic import Sanic, response
from sanic.response import stream
from sanic_prometheus import monitor
targets = [(j,j) for j in sys.argv[1:]]
if not targets:
# If no targets are specified, fall back to Kube API
config.load_kube_config()
v1 = client.CoreV1Api()
for i in v1.list_namespaced_service("camtiler", label_selector="component=camdetect").items:
app.ctx.frames[i.metadata.name] = None
url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port)
targets.append((i.metadata.name, url))
print("Running with following targets:")
for name, url in targets:
print(url)
counter_dropped_bytes = Counter(
"camtiler_client_dropped_bytes",
"Bytes that were not not handled or part of actual JPEG frames")
counter_rx_bytes = Counter(
"camtiler_client_rx_bytes",
"Bytes received over HTTP stream")
counter_tx_bytes = Counter(
"camtiler_client_tx_bytes",
"Bytes transmitted over HTTP streams")
counter_rx_frames = Counter(
"camtiler_client_rx_frames",
"Frames received")
counter_tx_frames = Counter(
"camtiler_client_tx_frames",
"Frames transmitted")
counter_tx_events = Counter(
"camtiler_client_tx_events",
"Events emitted")
counter_eos = Counter(
"camtiler_client_eos",
"Count of End of Stream occurrences")
counter_timeout_errors = Counter(
"camtiler_client_timeout_errors",
"Upstream connection timeout errors")
counter_cancelled_errors = Counter(
"camtiler_client_cancelled_errors",
"Upstream connection cancelled errors")
counter_incomplete_read_errors = Counter(
"camtiler_client_incomplete_read_errors",
"Upstream incomplete read errors")
app = Sanic("camtiler")
STREAM_RESPONSE = \
b"""
--frame
Content-Type: image/jpeg
"""
MERGED_SUBSAMPLE=3
# Consider camera gone after 5sec
TIMEOUT = 5.0
# Blank frame
BLANK = np.full((720 // MERGED_SUBSAMPLE, 1280 // MERGED_SUBSAMPLE, 3), 0)
def tilera(iterable, filler):
hstacking = ceil(len(iterable) ** 0.5)
vstacking = ceil(len(iterable) / hstacking)
rows = []
for j in range(0, vstacking):
row = []
first_frame = True
for i in range(0, hstacking):
try:
frame = iterable[i + hstacking * j]
except IndexError:
frame = filler
if not first_frame:
row.append(np.full((len(frame), 1, 3), 0))
first_frame = False
row.append(frame)
stacked = np.hstack(row)
rows.append(stacked)
return np.vstack(rows)
async def client_connect(name, resp):
buf = b""
print("Upstream connection opened with status:", resp.status)
async for data, end_of_http_chunk in resp.content.iter_chunks():
counter_rx_bytes.inc(len(data))
if end_of_http_chunk:
counter_eos.inc()
break
if buf:
# seek end
marker = data.find(b'\xff\xd9')
if marker < 0:
buf += data
continue
else:
blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8)
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
app.ctx.frames[name] = img
data = data[marker+2:]
buf = b""
counter_rx_frames.inc()
# seek begin
marker = data.find(b'\xff\xd8')
if marker >= 0:
buf = data[marker:]
else:
counter_dropped_bytes.inc(len(data))
async def client(name, url):
print("Opening upstream connection to %s" % url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
try:
await client_connect(name, resp)
except asyncio.TimeoutError:
counter_timeout_errors.inc()
except asyncio.CancelledError:
counter_cancelled_errors.inc()
except asyncio.IncompleteReadError:
counter_incomplete_read_errors.inc()
@app.route("/tiled")
async def stream_wrapper(request):
async def stream_tiled(response):
while True:
frames = [value for key, value in sorted(app.ctx.frames.items())]
img = tilera(frames, np.vstack([BLANK, np.zeros((30, 1280 // MERGED_SUBSAMPLE, 3))]))
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
data = STREAM_RESPONSE + jpeg.tobytes()
await response.write(data)
counter_tx_bytes.inc(len(data))
counter_tx_frames.inc()
await asyncio.sleep(0.2)
return response.stream(
stream_tiled,
content_type='multipart/x-mixed-replace; boundary=frame'
)
@app.listener('before_server_start')
async def setup_db(app, loop):
app.ctx.last_frame = None
app.ctx.event_frame = asyncio.Event()
app.ctx.frames = {}
app.ctx.avg = None
app.ctx.motion_frames = 0
app.ctx.motion_start = None
app.ctx.motion_end = None
for name, url in targets:
app.ctx.frames[name] = None
task = asyncio.create_task(client(name, url))
monitor(app).expose_endpoint()
try:
app.run(port=5001)
except KeyboardInterrupt:
asyncio.get_event_loop().close()

21
docker-compose.yml Normal file
View File

@ -0,0 +1,21 @@
version: '3.7'
# All keys here are for dev instance only, do not put prod keys here
# To override and use inventory from prod use .env file
services:
camtiler:
restart: always
network_mode: host
build:
context: .
entrypoint: /app/camtiler.py
command: http://127.0.0.1:8080?action=stream http://127.0.0.2:8080?action=stream http://127.0.0.3:8080?action=stream http://127.0.0.4:8080?action=stream
mjpg-streamer:
network_mode: host
restart: always
image: kvaps/mjpg-streamer
devices:
- /dev/video0
command: -i "/usr/lib64/input_uvc.so -y -d /dev/video0 -r 1280x720 -f 30" -o "output_http.so"