|
|
|
@ -2,14 +2,13 @@
@@ -2,14 +2,13 @@
|
|
|
|
|
import aiohttp |
|
|
|
|
import asyncio |
|
|
|
|
import cv2 |
|
|
|
|
import io |
|
|
|
|
import numpy as np |
|
|
|
|
import os |
|
|
|
|
import json |
|
|
|
|
import socket |
|
|
|
|
import sys |
|
|
|
|
from datetime import datetime |
|
|
|
|
from jpeg2dct.numpy import load, loads |
|
|
|
|
from jpeg2dct.numpy import loads |
|
|
|
|
from prometheus_client import Counter, Gauge |
|
|
|
|
from sanic import Sanic, response |
|
|
|
|
from sanic.response import stream |
|
|
|
@ -25,6 +24,7 @@ DCT_BLOCK_SIZE = 8
@@ -25,6 +24,7 @@ DCT_BLOCK_SIZE = 8
|
|
|
|
|
# How many blocks have changes to consider movement in frame |
|
|
|
|
THRESHOLD_BLOCKS = 20 |
|
|
|
|
THRESHOLD_MOTION_START = 2 |
|
|
|
|
CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n" |
|
|
|
|
|
|
|
|
|
counter_dropped_bytes = Counter( |
|
|
|
|
"camdetect_dropped_bytes", |
|
|
|
@ -44,7 +44,10 @@ counter_tx_frames = Counter(
@@ -44,7 +44,10 @@ counter_tx_frames = Counter(
|
|
|
|
|
counter_tx_events = Counter( |
|
|
|
|
"camdetect_tx_events", |
|
|
|
|
"Events emitted") |
|
|
|
|
counter_errors = Counter( |
|
|
|
|
counter_rx_chunks = Counter( |
|
|
|
|
"camdetect_rx_chunks", |
|
|
|
|
"HTTP chunks received") |
|
|
|
|
counter_errors = Counter( |
|
|
|
|
"camdetect_errors", |
|
|
|
|
"Upstream connection errors", |
|
|
|
|
["exception"]) |
|
|
|
@ -59,11 +62,13 @@ gauge_active_blocks = Gauge(
@@ -59,11 +62,13 @@ gauge_active_blocks = Gauge(
|
|
|
|
|
"camdetect_active_blocks", |
|
|
|
|
"Total active, threshold exceeding DCT blocks") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Frame(object): |
|
|
|
|
def __init__(self, blob): |
|
|
|
|
self.blob = blob |
|
|
|
|
self.y, self.cb, self.cr = loads(blob) |
|
|
|
|
self.mask = np.int16(self.y[:,:,0]) |
|
|
|
|
self.mask = np.int16(self.y[:, :, 0]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def client_connect(resp): |
|
|
|
|
buf = b"" |
|
|
|
@ -71,12 +76,11 @@ async def client_connect(resp):
@@ -71,12 +76,11 @@ async def client_connect(resp):
|
|
|
|
|
async for data, end_of_http_chunk in resp.content.iter_chunks(): |
|
|
|
|
counter_rx_bytes.inc(len(data)) |
|
|
|
|
if end_of_http_chunk: |
|
|
|
|
counter_eos.inc() |
|
|
|
|
break |
|
|
|
|
counter_rx_chunks.inc() |
|
|
|
|
|
|
|
|
|
if buf: |
|
|
|
|
# seek end |
|
|
|
|
marker = data.find(b'\xff\xd9') |
|
|
|
|
marker = data.find(b"\xff\xd9") |
|
|
|
|
if marker < 0: |
|
|
|
|
buf += data |
|
|
|
|
continue |
|
|
|
@ -95,10 +99,13 @@ async def client_connect(resp):
@@ -95,10 +99,13 @@ async def client_connect(resp):
|
|
|
|
|
app.ctx.frames = app.ctx.frames[1:] |
|
|
|
|
|
|
|
|
|
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: |
|
|
|
|
app.ctx.thresh = cv2.inRange(cv2.absdiff(app.ctx.last_frame.mask, app.ctx.avg >> SLIDE_WINDOW), 25, 65535) |
|
|
|
|
app.ctx.thresh = cv2.inRange(cv2.absdiff( |
|
|
|
|
app.ctx.last_frame.mask, |
|
|
|
|
app.ctx.avg >> SLIDE_WINDOW), 25, 65535) |
|
|
|
|
else: |
|
|
|
|
app.ctx.thresh = None |
|
|
|
|
gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] * app.ctx.last_frame.mask.shape[1]) |
|
|
|
|
gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] * |
|
|
|
|
app.ctx.last_frame.mask.shape[1]) |
|
|
|
|
|
|
|
|
|
movement_detected = False |
|
|
|
|
if app.ctx.thresh is not None: |
|
|
|
@ -131,7 +138,7 @@ async def client_connect(resp):
@@ -131,7 +138,7 @@ async def client_connect(resp):
|
|
|
|
|
counter_rx_frames.inc() |
|
|
|
|
|
|
|
|
|
# seek begin |
|
|
|
|
marker = data.find(b'\xff\xd8') |
|
|
|
|
marker = data.find(b"\xff\xd8") |
|
|
|
|
if marker >= 0: |
|
|
|
|
buf = data[marker:] |
|
|
|
|
else: |
|
|
|
@ -140,7 +147,8 @@ async def client_connect(resp):
@@ -140,7 +147,8 @@ async def client_connect(resp):
|
|
|
|
|
|
|
|
|
|
async def client(): |
|
|
|
|
while True: |
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(connect=5, sock_read=2)) as session: |
|
|
|
|
to = aiohttp.ClientTimeout(connect=5, sock_read=2) |
|
|
|
|
async with aiohttp.ClientSession(timeout=to) as session: |
|
|
|
|
print("Opening upstream connection to %s" % url) |
|
|
|
|
try: |
|
|
|
|
async with session.get(url) as resp: |
|
|
|
@ -155,49 +163,46 @@ async def client():
@@ -155,49 +163,46 @@ async def client():
|
|
|
|
|
app = Sanic("lease") |
|
|
|
|
app.config["WTF_CSRF_ENABLED"] = False |
|
|
|
|
|
|
|
|
|
STREAM_RESPONSE = \ |
|
|
|
|
b""" |
|
|
|
|
--frame |
|
|
|
|
Content-Type: image/jpeg |
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
@app.route("/bypass") |
|
|
|
|
async def bypass_stream_wrapper(request): |
|
|
|
|
async def stream_camera(response): |
|
|
|
|
while True: |
|
|
|
|
await app.ctx.event_frame.wait() |
|
|
|
|
data = STREAM_RESPONSE + app.ctx.last_frame.blob |
|
|
|
|
data = CHUNK_BOUNDARY + app.ctx.last_frame.blob |
|
|
|
|
await response.write(data) |
|
|
|
|
counter_tx_bytes.inc(len(data)) |
|
|
|
|
counter_tx_frames.inc() |
|
|
|
|
|
|
|
|
|
return response.stream( |
|
|
|
|
stream_camera, |
|
|
|
|
content_type='multipart/x-mixed-replace; boundary=frame' |
|
|
|
|
content_type="multipart/x-mixed-replace; boundary=frame" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/debug") |
|
|
|
|
async def stream_wrapper(request): |
|
|
|
|
async def stream_camera(response): |
|
|
|
|
while True: |
|
|
|
|
await app.ctx.event_frame.wait() |
|
|
|
|
img = cv2.imdecode(np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8), cv2.IMREAD_UNCHANGED) |
|
|
|
|
arr = np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8) |
|
|
|
|
img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED) |
|
|
|
|
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: |
|
|
|
|
for y in range(0, len(app.ctx.last_frame.mask)): |
|
|
|
|
for x in range(0, len(app.ctx.last_frame.mask[0])): |
|
|
|
|
if app.ctx.thresh[y][x] > 0: |
|
|
|
|
img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE,x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE,2] = 255 |
|
|
|
|
img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE, |
|
|
|
|
x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE, 2] = 255 |
|
|
|
|
|
|
|
|
|
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) |
|
|
|
|
data = STREAM_RESPONSE + jpeg.tobytes() |
|
|
|
|
data = CHUNK_BOUNDARY + jpeg.tobytes() |
|
|
|
|
await response.write(data) |
|
|
|
|
counter_tx_bytes.inc(len(data)) |
|
|
|
|
counter_tx_frames.inc() |
|
|
|
|
|
|
|
|
|
return response.stream( |
|
|
|
|
stream_camera, |
|
|
|
|
content_type='multipart/x-mixed-replace; boundary=frame' |
|
|
|
|
content_type="multipart/x-mixed-replace; boundary=frame" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -208,20 +213,20 @@ async def ready_check(request):
@@ -208,20 +213,20 @@ async def ready_check(request):
|
|
|
|
|
return response.text("Not enough frames", status=503) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/event') |
|
|
|
|
@app.route("/event") |
|
|
|
|
async def wrapper_stream_event(request): |
|
|
|
|
async def stream_event(response): |
|
|
|
|
while True: |
|
|
|
|
await app.ctx.event_frame.wait() |
|
|
|
|
if len(app.ctx.frames) < 2 ** SLIDE_WINDOW: |
|
|
|
|
continue |
|
|
|
|
s = 'data: ' + json.dumps(app.ctx.thresh.tolist()) + '\r\n\r\n' |
|
|
|
|
s = "data: " + json.dumps(app.ctx.thresh.tolist()) + "\r\n\r\n" |
|
|
|
|
await response.write(s.encode()) |
|
|
|
|
counter_tx_events.inc() |
|
|
|
|
return stream(stream_event, content_type='text/event-stream') |
|
|
|
|
return stream(stream_event, content_type="text/event-stream") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.listener('before_server_start') |
|
|
|
|
@app.listener("before_server_start") |
|
|
|
|
async def setup_db(app, loop): |
|
|
|
|
app.ctx.last_frame = None |
|
|
|
|
app.ctx.event_frame = asyncio.Event() |
|
|
|
@ -230,7 +235,7 @@ async def setup_db(app, loop):
@@ -230,7 +235,7 @@ async def setup_db(app, loop):
|
|
|
|
|
app.ctx.motion_frames = 0 |
|
|
|
|
app.ctx.motion_start = None |
|
|
|
|
app.ctx.motion_end = None |
|
|
|
|
task = asyncio.create_task(client()) |
|
|
|
|
asyncio.create_task(client()) |
|
|
|
|
|
|
|
|
|
monitor(app).expose_endpoint() |
|
|
|
|
|
|
|
|
|