|
|
|
@ -50,15 +50,10 @@ counter_tx_frames = Counter( |
|
|
|
|
counter_tx_events = Counter( |
|
|
|
|
"camtiler_client_tx_events", |
|
|
|
|
"Events emitted") |
|
|
|
|
counter_timeout_errors = Counter( |
|
|
|
|
"camtiler_client_timeout_errors", |
|
|
|
|
"Upstream connection timeout errors") |
|
|
|
|
counter_cancelled_errors = Counter( |
|
|
|
|
"camtiler_client_cancelled_errors", |
|
|
|
|
"Upstream connection cancelled errors") |
|
|
|
|
counter_incomplete_read_errors = Counter( |
|
|
|
|
"camtiler_client_incomplete_read_errors", |
|
|
|
|
"Upstream incomplete read errors") |
|
|
|
|
counter_errors = Counter( |
|
|
|
|
"camtiler_errors", |
|
|
|
|
"Upstream connection errors", |
|
|
|
|
["exception"]) |
|
|
|
|
|
|
|
|
|
app = Sanic("camtiler") |
|
|
|
|
|
|
|
|
@ -71,13 +66,11 @@ Content-Type: image/jpeg |
|
|
|
|
|
|
|
|
|
MERGED_SUBSAMPLE=3 |
|
|
|
|
|
|
|
|
|
# Consider camera gone after 5sec |
|
|
|
|
TIMEOUT = 5.0 |
|
|
|
|
|
|
|
|
|
# Blank frame |
|
|
|
|
BLANK = np.full((720, 1280, 3), 0) |
|
|
|
|
GREEN = np.full((720, 1280, 3), (0, 135, 0)) |
|
|
|
|
|
|
|
|
|
def tilera(iterable, filler): |
|
|
|
|
def tilera(iterable): |
|
|
|
|
hstacking = ceil(len(iterable) ** 0.5) |
|
|
|
|
vstacking = ceil(len(iterable) / hstacking) |
|
|
|
|
rows = [] |
|
|
|
@ -86,13 +79,27 @@ def tilera(iterable, filler): |
|
|
|
|
first_frame = True |
|
|
|
|
for i in range(0, hstacking): |
|
|
|
|
try: |
|
|
|
|
frame = iterable[i + hstacking * j] |
|
|
|
|
tile = iterable[i + hstacking * j] |
|
|
|
|
except IndexError: |
|
|
|
|
frame = None |
|
|
|
|
tile = None |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
name, frame = tile |
|
|
|
|
except TypeError: |
|
|
|
|
frame = BLANK |
|
|
|
|
|
|
|
|
|
if frame is None: |
|
|
|
|
frame = filler |
|
|
|
|
msg = "Signal lost" |
|
|
|
|
frame = GREEN |
|
|
|
|
else: |
|
|
|
|
msg = "Live" |
|
|
|
|
frame = frame[::MERGED_SUBSAMPLE, ::MERGED_SUBSAMPLE] |
|
|
|
|
footer = np.zeros((30, len(frame[0]), 3)) |
|
|
|
|
|
|
|
|
|
if tile: |
|
|
|
|
cv2.putText(footer, msg, (10, 24), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255)) |
|
|
|
|
cv2.putText(footer, name, (10, 12), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255)) |
|
|
|
|
frame = np.vstack([frame, footer]) |
|
|
|
|
if not first_frame: |
|
|
|
|
row.append(np.full((len(frame), 1, 3), 0)) |
|
|
|
|
first_frame = False |
|
|
|
@ -104,7 +111,7 @@ def tilera(iterable, filler): |
|
|
|
|
|
|
|
|
|
async def client_connect(name, resp): |
|
|
|
|
buf = b"" |
|
|
|
|
print("Upstream connection opened with status:", resp.status) |
|
|
|
|
print("Upstream connection to %s opened with status: %d", (name, resp.status)) |
|
|
|
|
async for data, end_of_http_chunk in resp.content.iter_chunks(): |
|
|
|
|
if end_of_http_chunk: |
|
|
|
|
counter_rx_chunks.inc() |
|
|
|
@ -137,30 +144,29 @@ async def client_connect(name, resp): |
|
|
|
|
async def client(name, url): |
|
|
|
|
print("Opening upstream connection to %s" % url) |
|
|
|
|
while True: |
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
|
async with session.get(url) as resp: |
|
|
|
|
app.ctx.frames[name] = None |
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(connect=5, sock_read=2)) as session: |
|
|
|
|
try: |
|
|
|
|
await client_connect(name, resp) |
|
|
|
|
except asyncio.TimeoutError: |
|
|
|
|
counter_timeout_errors.inc() |
|
|
|
|
except asyncio.CancelledError: |
|
|
|
|
counter_cancelled_errors.inc() |
|
|
|
|
except asyncio.IncompleteReadError: |
|
|
|
|
counter_incomplete_read_errors.inc() |
|
|
|
|
async with session.get(url) as resp: |
|
|
|
|
await client_connect(name, resp) |
|
|
|
|
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: |
|
|
|
|
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) |
|
|
|
|
print("Caught exception %s for %s" % (j, name)) |
|
|
|
|
counter_errors.labels(exception=j).inc() |
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/tiled") |
|
|
|
|
async def stream_wrapper(request): |
|
|
|
|
async def stream_tiled(response): |
|
|
|
|
while True: |
|
|
|
|
frames = [value for key, value in sorted(app.ctx.frames.items())] |
|
|
|
|
img = tilera(frames, BLANK) |
|
|
|
|
img = tilera(sorted(app.ctx.frames.items())) |
|
|
|
|
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) |
|
|
|
|
data = STREAM_RESPONSE + jpeg.tobytes() |
|
|
|
|
await response.write(data) |
|
|
|
|
counter_tx_bytes.inc(len(data)) |
|
|
|
|
counter_tx_frames.inc() |
|
|
|
|
await asyncio.sleep(0.2) |
|
|
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
|
|
|
|
return response.stream( |
|
|
|
|
stream_tiled, |
|
|
|
|