This commit is contained in:
parent
ef7b0f7b7c
commit
53ab49c661
43
camtiler.py
43
camtiler.py
@ -3,23 +3,19 @@
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
import asyncio
|
import asyncio
|
||||||
import cv2
|
import cv2
|
||||||
import io
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import os
|
import os
|
||||||
import socket
|
|
||||||
import sys
|
import sys
|
||||||
from datetime import datetime
|
|
||||||
from kubernetes import client, config
|
from kubernetes import client, config
|
||||||
from math import ceil
|
from math import ceil
|
||||||
from prometheus_client import Counter, Gauge
|
from prometheus_client import Counter, Gauge
|
||||||
from sanic import Sanic, response
|
from sanic import Sanic, response
|
||||||
from sanic.response import stream
|
|
||||||
from sanic_prometheus import monitor
|
from sanic_prometheus import monitor
|
||||||
|
|
||||||
SERVICE_NAMESPACE = os.getenv("SERVICE_NAMESPACE", "camtiler")
|
SERVICE_NAMESPACE = os.getenv("SERVICE_NAMESPACE", "camtiler")
|
||||||
SERVICE_LABEL_SELECTOR = os.getenv("SERVICE_LABEL_SELECTOR", "component=camera-motion-detect")
|
SERVICE_LABEL_SELECTOR = os.getenv("SERVICE_LABEL_SELECTOR", "component=camera-motion-detect")
|
||||||
|
|
||||||
targets = [(j,j) for j in sys.argv[1:]]
|
targets = [(j, j) for j in sys.argv[1:]]
|
||||||
if not targets:
|
if not targets:
|
||||||
# If no targets are specified, fall back to Kube API
|
# If no targets are specified, fall back to Kube API
|
||||||
config.load_incluster_config()
|
config.load_incluster_config()
|
||||||
@ -56,7 +52,7 @@ counter_tx_frames = Counter(
|
|||||||
counter_tx_events = Counter(
|
counter_tx_events = Counter(
|
||||||
"camtiler_client_tx_events",
|
"camtiler_client_tx_events",
|
||||||
"Events emitted")
|
"Events emitted")
|
||||||
counter_errors = Counter(
|
counter_errors = Counter(
|
||||||
"camtiler_errors",
|
"camtiler_errors",
|
||||||
"Upstream connection errors",
|
"Upstream connection errors",
|
||||||
["exception"])
|
["exception"])
|
||||||
@ -70,19 +66,19 @@ gauge_build_info.labels(
|
|||||||
|
|
||||||
app = Sanic("camtiler")
|
app = Sanic("camtiler")
|
||||||
|
|
||||||
STREAM_RESPONSE = \
|
STREAM_RESPONSE = b"""
|
||||||
b"""
|
|
||||||
--frame
|
--frame
|
||||||
Content-Type: image/jpeg
|
Content-Type: image/jpeg
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
MERGED_SUBSAMPLE=3
|
MERGED_SUBSAMPLE = 3
|
||||||
|
|
||||||
# Blank frame
|
# Blank frame
|
||||||
BLANK = np.full((720, 1280, 3), 0)
|
BLANK = np.full((720, 1280, 3), 0)
|
||||||
GREEN = np.full((720, 1280, 3), (0, 135, 0))
|
GREEN = np.full((720, 1280, 3), (0, 135, 0))
|
||||||
|
|
||||||
|
|
||||||
def tilera(iterable):
|
def tilera(iterable):
|
||||||
hstacking = ceil(len(iterable) ** 0.5)
|
hstacking = ceil(len(iterable) ** 0.5)
|
||||||
vstacking = ceil(len(iterable) / hstacking)
|
vstacking = ceil(len(iterable) / hstacking)
|
||||||
@ -131,22 +127,22 @@ async def client_connect(name, resp):
|
|||||||
|
|
||||||
if buf:
|
if buf:
|
||||||
# If we already have something in buffer, seek end in new data
|
# If we already have something in buffer, seek end in new data
|
||||||
marker = data.find(b'\xff\xd9')
|
marker = data.find(b"\xff\xd9")
|
||||||
if marker < 0:
|
if marker < 0:
|
||||||
# If no end marker was found add it to buffer
|
# If no end marker was found add it to buffer
|
||||||
buf += data
|
buf += data
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# If end marker was found, decode JPEG frame
|
# If end marker was found, decode JPEG frame
|
||||||
blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8)
|
blob = np.frombuffer(buf + data[:marker + 2], dtype=np.uint8)
|
||||||
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
|
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
|
||||||
app.ctx.frames[name] = img
|
app.ctx.frames[name] = img
|
||||||
data = data[marker+2:]
|
data = data[marker + 2:]
|
||||||
buf = b""
|
buf = b""
|
||||||
counter_rx_frames.inc()
|
counter_rx_frames.inc()
|
||||||
|
|
||||||
# Seek begin in newly received data
|
# Seek begin in newly received data
|
||||||
marker = data.find(b'\xff\xd8')
|
marker = data.find(b"\xff\xd8")
|
||||||
if marker >= 0:
|
if marker >= 0:
|
||||||
data, buf = data[:marker], data[marker:]
|
data, buf = data[:marker], data[marker:]
|
||||||
|
|
||||||
@ -156,12 +152,12 @@ async def client_connect(name, resp):
|
|||||||
|
|
||||||
async def client(name, url):
|
async def client(name, url):
|
||||||
print("Opening upstream connection to %s" % url)
|
print("Opening upstream connection to %s" % url)
|
||||||
kwargs = dict(
|
kwargs = {
|
||||||
headers = {
|
"headers": {
|
||||||
"User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP
|
"User-Agent": "camtiler/%s" % GIT_COMMIT_TIMESTAMP
|
||||||
},
|
},
|
||||||
skip_auto_headers = True,
|
"timeout": aiohttp.ClientTimeout(connect=5, sock_read=2)
|
||||||
timeout = aiohttp.ClientTimeout(connect=5, sock_read=2))
|
}
|
||||||
while True:
|
while True:
|
||||||
app.ctx.frames[name] = None
|
app.ctx.frames[name] = None
|
||||||
async with aiohttp.ClientSession(**kwargs) as session:
|
async with aiohttp.ClientSession(**kwargs) as session:
|
||||||
@ -169,9 +165,9 @@ async def client(name, url):
|
|||||||
async with session.get(url) as resp:
|
async with session.get(url) as resp:
|
||||||
await client_connect(name, resp)
|
await client_connect(name, resp)
|
||||||
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
|
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
|
||||||
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
|
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
|
||||||
print("Caught exception %s for %s" % (j, name))
|
print("Caught exception %s for %s" % (j, name))
|
||||||
counter_errors.labels(exception=j).inc()
|
counter_errors.labels(exception=j).inc()
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
@ -189,10 +185,11 @@ async def stream_wrapper(request):
|
|||||||
|
|
||||||
return response.stream(
|
return response.stream(
|
||||||
stream_tiled,
|
stream_tiled,
|
||||||
content_type='multipart/x-mixed-replace; boundary=frame'
|
content_type="multipart/x-mixed-replace; boundary=frame"
|
||||||
)
|
)
|
||||||
|
|
||||||
@app.listener('before_server_start')
|
|
||||||
|
@app.listener("before_server_start")
|
||||||
async def setup_db(app, loop):
|
async def setup_db(app, loop):
|
||||||
app.ctx.event_frame = asyncio.Event()
|
app.ctx.event_frame = asyncio.Event()
|
||||||
app.ctx.frames = {}
|
app.ctx.frames = {}
|
||||||
@ -202,7 +199,7 @@ async def setup_db(app, loop):
|
|||||||
app.ctx.motion_end = None
|
app.ctx.motion_end = None
|
||||||
for name, url in targets:
|
for name, url in targets:
|
||||||
app.ctx.frames[name] = None
|
app.ctx.frames[name] = None
|
||||||
task = asyncio.create_task(client(name, url))
|
asyncio.create_task(client(name, url))
|
||||||
|
|
||||||
monitor(app).expose_endpoint()
|
monitor(app).expose_endpoint()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user