Compare commits

...

4 Commits

Author SHA1 Message Date
Lauri Võsandi 8548163d44 Add backwards compatibility
continuous-integration/drone Build is passing Details
2023-01-25 09:41:35 +02:00
Lauri Võsandi 53ab49c661 Fix formatting errors
continuous-integration/drone Build is passing Details
2022-12-02 19:41:21 +02:00
Lauri Võsandi ef7b0f7b7c Make service listing configurable 2022-12-02 19:34:11 +02:00
Lauri Võsandi b962f2695c Make service listing configurable 2022-12-02 12:47:16 +02:00
1 changed files with 25 additions and 25 deletions

View File

@ -1,27 +1,25 @@
#!/usr/bin/python3
import aiohttp
import asyncio
import cv2
import io
import numpy as np
import os
import socket
import sys
from datetime import datetime
from kubernetes import client, config
from math import ceil
from prometheus_client import Counter, Gauge
from sanic import Sanic, response
from sanic.response import stream
from sanic_prometheus import monitor
targets = [(j,j) for j in sys.argv[1:]]
SERVICE_NAMESPACE = os.getenv("SERVICE_NAMESPACE", "camtiler")
SERVICE_LABEL_SELECTOR = os.getenv("SERVICE_LABEL_SELECTOR", "component=camera-motion-detect")
targets = [(j, j) for j in sys.argv[1:]]
if not targets:
# If no targets are specified, fall back to Kube API
config.load_incluster_config()
v1 = client.CoreV1Api()
for i in v1.list_namespaced_service("camtiler", label_selector="component=camdetect").items:
for i in v1.list_namespaced_service(SERVICE_NAMESPACE, label_selector=SERVICE_LABEL_SELECTOR).items:
url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port)
targets.append((i.metadata.name, url))
@ -53,7 +51,7 @@ counter_tx_frames = Counter(
counter_tx_events = Counter(
"camtiler_client_tx_events",
"Events emitted")
counter_errors = Counter(
counter_errors = Counter(
"camtiler_errors",
"Upstream connection errors",
["exception"])
@ -67,19 +65,19 @@ gauge_build_info.labels(
app = Sanic("camtiler")
STREAM_RESPONSE = \
b"""
STREAM_RESPONSE = b"""
--frame
Content-Type: image/jpeg
"""
MERGED_SUBSAMPLE=3
MERGED_SUBSAMPLE = 3
# Blank frame
BLANK = np.full((720, 1280, 3), 0)
GREEN = np.full((720, 1280, 3), (0, 135, 0))
def tilera(iterable):
hstacking = ceil(len(iterable) ** 0.5)
vstacking = ceil(len(iterable) / hstacking)
@ -128,22 +126,22 @@ async def client_connect(name, resp):
if buf:
# If we already have something in buffer, seek end in new data
marker = data.find(b'\xff\xd9')
marker = data.find(b"\xff\xd9")
if marker < 0:
# If no end marker was found add it to buffer
buf += data
continue
else:
# If end marker was found, decode JPEG frame
blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8)
blob = np.frombuffer(buf + data[:marker + 2], dtype=np.uint8)
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
app.ctx.frames[name] = img
data = data[marker+2:]
data = data[marker + 2:]
buf = b""
counter_rx_frames.inc()
# Seek begin in newly received data
marker = data.find(b'\xff\xd8')
marker = data.find(b"\xff\xd8")
if marker >= 0:
data, buf = data[:marker], data[marker:]
@ -153,12 +151,12 @@ async def client_connect(name, resp):
async def client(name, url):
print("Opening upstream connection to %s" % url)
kwargs = dict(
headers = {
kwargs = {
"headers": {
"User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP
},
skip_auto_headers = True,
timeout = aiohttp.ClientTimeout(connect=5, sock_read=2))
"timeout": aiohttp.ClientTimeout(connect=5, sock_read=2)
}
while True:
app.ctx.frames[name] = None
async with aiohttp.ClientSession(**kwargs) as session:
@ -166,13 +164,14 @@ async def client(name, url):
async with session.get(url) as resp:
await client_connect(name, resp)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
print("Caught exception %s for %s" % (j, name))
counter_errors.labels(exception=j).inc()
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
print("Caught exception %s for %s" % (j, name))
counter_errors.labels(exception=j).inc()
await asyncio.sleep(1)
@app.route("/tiled")
@app.route("/m/tiled")
async def stream_wrapper(request):
async def stream_tiled(response):
while True:
@ -186,10 +185,11 @@ async def stream_wrapper(request):
return response.stream(
stream_tiled,
content_type='multipart/x-mixed-replace; boundary=frame'
content_type="multipart/x-mixed-replace; boundary=frame"
)
@app.listener('before_server_start')
@app.listener("before_server_start")
async def setup_db(app, loop):
app.ctx.event_frame = asyncio.Event()
app.ctx.frames = {}
@ -199,7 +199,7 @@ async def setup_db(app, loop):
app.ctx.motion_end = None
for name, url in targets:
app.ctx.frames[name] = None
task = asyncio.create_task(client(name, url))
asyncio.create_task(client(name, url))
monitor(app).expose_endpoint()