Compare commits

..

No commits in common. "master" and "report-user-agent" have entirely different histories.

View File

@ -1,25 +1,27 @@
#!/usr/bin/python3 #!/usr/bin/python3
import aiohttp import aiohttp
import asyncio import asyncio
import cv2 import cv2
import io
import numpy as np import numpy as np
import os import os
import socket
import sys import sys
from datetime import datetime
from kubernetes import client, config from kubernetes import client, config
from math import ceil from math import ceil
from prometheus_client import Counter, Gauge from prometheus_client import Counter, Gauge
from sanic import Sanic, response from sanic import Sanic, response
from sanic.response import stream
from sanic_prometheus import monitor from sanic_prometheus import monitor
SERVICE_NAMESPACE = os.getenv("SERVICE_NAMESPACE", "camtiler") targets = [(j,j) for j in sys.argv[1:]]
SERVICE_LABEL_SELECTOR = os.getenv("SERVICE_LABEL_SELECTOR", "component=camera-motion-detect")
targets = [(j, j) for j in sys.argv[1:]]
if not targets: if not targets:
# If no targets are specified, fall back to Kube API # If no targets are specified, fall back to Kube API
config.load_incluster_config() config.load_incluster_config()
v1 = client.CoreV1Api() v1 = client.CoreV1Api()
for i in v1.list_namespaced_service(SERVICE_NAMESPACE, label_selector=SERVICE_LABEL_SELECTOR).items: for i in v1.list_namespaced_service("camtiler", label_selector="component=camdetect").items:
url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port) url = "http://%s:%d/bypass" % (i.metadata.name, i.spec.ports[0].port)
targets.append((i.metadata.name, url)) targets.append((i.metadata.name, url))
@ -51,7 +53,7 @@ counter_tx_frames = Counter(
counter_tx_events = Counter( counter_tx_events = Counter(
"camtiler_client_tx_events", "camtiler_client_tx_events",
"Events emitted") "Events emitted")
counter_errors = Counter( counter_errors = Counter(
"camtiler_errors", "camtiler_errors",
"Upstream connection errors", "Upstream connection errors",
["exception"]) ["exception"])
@ -65,19 +67,19 @@ gauge_build_info.labels(
app = Sanic("camtiler") app = Sanic("camtiler")
STREAM_RESPONSE = b""" STREAM_RESPONSE = \
b"""
--frame --frame
Content-Type: image/jpeg Content-Type: image/jpeg
""" """
MERGED_SUBSAMPLE = 3 MERGED_SUBSAMPLE=3
# Blank frame # Blank frame
BLANK = np.full((720, 1280, 3), 0) BLANK = np.full((720, 1280, 3), 0)
GREEN = np.full((720, 1280, 3), (0, 135, 0)) GREEN = np.full((720, 1280, 3), (0, 135, 0))
def tilera(iterable): def tilera(iterable):
hstacking = ceil(len(iterable) ** 0.5) hstacking = ceil(len(iterable) ** 0.5)
vstacking = ceil(len(iterable) / hstacking) vstacking = ceil(len(iterable) / hstacking)
@ -126,22 +128,22 @@ async def client_connect(name, resp):
if buf: if buf:
# If we already have something in buffer, seek end in new data # If we already have something in buffer, seek end in new data
marker = data.find(b"\xff\xd9") marker = data.find(b'\xff\xd9')
if marker < 0: if marker < 0:
# If no end marker was found add it to buffer # If no end marker was found add it to buffer
buf += data buf += data
continue continue
else: else:
# If end marker was found, decode JPEG frame # If end marker was found, decode JPEG frame
blob = np.frombuffer(buf + data[:marker + 2], dtype=np.uint8) blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8)
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED) img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
app.ctx.frames[name] = img app.ctx.frames[name] = img
data = data[marker + 2:] data = data[marker+2:]
buf = b"" buf = b""
counter_rx_frames.inc() counter_rx_frames.inc()
# Seek begin in newly received data # Seek begin in newly received data
marker = data.find(b"\xff\xd8") marker = data.find(b'\xff\xd8')
if marker >= 0: if marker >= 0:
data, buf = data[:marker], data[marker:] data, buf = data[:marker], data[marker:]
@ -151,12 +153,12 @@ async def client_connect(name, resp):
async def client(name, url): async def client(name, url):
print("Opening upstream connection to %s" % url) print("Opening upstream connection to %s" % url)
kwargs = { kwargs = dict(
"headers": { headers = {
"User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP "User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP
}, },
"timeout": aiohttp.ClientTimeout(connect=5, sock_read=2) skip_auto_headers = True,
} timeout = aiohttp.ClientTimeout(connect=5, sock_read=2))
while True: while True:
app.ctx.frames[name] = None app.ctx.frames[name] = None
async with aiohttp.ClientSession(**kwargs) as session: async with aiohttp.ClientSession(**kwargs) as session:
@ -164,14 +166,13 @@ async def client(name, url):
async with session.get(url) as resp: async with session.get(url) as resp:
await client_connect(name, resp) await client_connect(name, resp)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
print("Caught exception %s for %s" % (j, name)) print("Caught exception %s for %s" % (j, name))
counter_errors.labels(exception=j).inc() counter_errors.labels(exception=j).inc()
await asyncio.sleep(1) await asyncio.sleep(1)
@app.route("/tiled") @app.route("/tiled")
@app.route("/m/tiled")
async def stream_wrapper(request): async def stream_wrapper(request):
async def stream_tiled(response): async def stream_tiled(response):
while True: while True:
@ -185,11 +186,10 @@ async def stream_wrapper(request):
return response.stream( return response.stream(
stream_tiled, stream_tiled,
content_type="multipart/x-mixed-replace; boundary=frame" content_type='multipart/x-mixed-replace; boundary=frame'
) )
@app.listener('before_server_start')
@app.listener("before_server_start")
async def setup_db(app, loop): async def setup_db(app, loop):
app.ctx.event_frame = asyncio.Event() app.ctx.event_frame = asyncio.Event()
app.ctx.frames = {} app.ctx.frames = {}
@ -199,7 +199,7 @@ async def setup_db(app, loop):
app.ctx.motion_end = None app.ctx.motion_end = None
for name, url in targets: for name, url in targets:
app.ctx.frames[name] = None app.ctx.frames[name] = None
asyncio.create_task(client(name, url)) task = asyncio.create_task(client(name, url))
monitor(app).expose_endpoint() monitor(app).expose_endpoint()