camera-motion-detect/camdetect.py

250 lines
8.0 KiB
Python
Raw Normal View History

2022-02-12 18:44:20 +00:00
#!/usr/bin/env python3
import aiohttp
import asyncio
import cv2
import numpy as np
import os
import json
import socket
import sys
from datetime import datetime
2022-02-16 20:32:56 +00:00
from jpeg2dct.numpy import loads
2022-02-12 18:44:20 +00:00
from prometheus_client import Counter, Gauge
from sanic import Sanic, response
from sanic.response import stream
from sanic_prometheus import monitor
2022-02-20 07:53:19 +00:00
from time import time
2022-02-12 18:44:20 +00:00
_, url = sys.argv
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default")
FQDN = socket.getfqdn()
SLIDE_WINDOW = 2
DCT_BLOCK_SIZE = 8
# How many blocks have changes to consider movement in frame
THRESHOLD_BLOCKS = 20
THRESHOLD_MOTION_START = 2
2022-02-16 20:32:56 +00:00
CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n"
2022-02-12 18:44:20 +00:00
counter_dropped_bytes = Counter(
"camdetect_dropped_bytes",
2022-02-12 18:44:20 +00:00
"Bytes that were not not handled or part of actual JPEG frames")
counter_rx_bytes = Counter(
"camdetect_rx_bytes",
2022-02-12 18:44:20 +00:00
"Bytes received over HTTP stream")
counter_tx_bytes = Counter(
"camdetect_tx_bytes",
2022-02-12 18:44:20 +00:00
"Bytes transmitted over HTTP streams")
counter_rx_frames = Counter(
"camdetect_rx_frames",
2022-02-12 18:44:20 +00:00
"Frames received")
counter_tx_frames = Counter(
"camdetect_tx_frames",
2022-02-12 18:44:20 +00:00
"Frames transmitted")
counter_tx_events = Counter(
"camdetect_tx_events",
2022-02-12 18:44:20 +00:00
"Events emitted")
2022-02-16 20:32:56 +00:00
counter_rx_chunks = Counter(
"camdetect_rx_chunks",
"HTTP chunks received")
counter_errors = Counter(
"camdetect_errors",
2022-02-13 17:53:47 +00:00
"Upstream connection errors",
["exception"])
2022-02-12 18:44:20 +00:00
counter_movement_frames = Counter(
"camdetect_movement_frames",
2022-02-12 18:44:20 +00:00
"Frames with movement detected in them")
2022-02-20 07:53:19 +00:00
gauge_last_frame = Gauge(
"camdetect_last_frame",
"Timestamp of last frame")
2022-02-12 18:44:20 +00:00
gauge_total_blocks = Gauge(
"camdetect_total_blocks",
2022-02-12 18:44:20 +00:00
"Total DCT blocks")
gauge_active_blocks = Gauge(
"camdetect_active_blocks",
2022-02-12 18:44:20 +00:00
"Total active, threshold exceeding DCT blocks")
2022-02-16 20:32:56 +00:00
2022-02-12 18:44:20 +00:00
class Frame(object):
def __init__(self, blob):
self.blob = blob
self.y, self.cb, self.cr = loads(blob)
2022-02-16 20:32:56 +00:00
self.mask = np.int16(self.y[:, :, 0])
2022-02-12 18:44:20 +00:00
async def client_connect(resp):
buf = b""
print("Upstream connection opened with status:", resp.status)
async for data, end_of_http_chunk in resp.content.iter_chunks():
counter_rx_bytes.inc(len(data))
if end_of_http_chunk:
2022-02-16 20:32:56 +00:00
counter_rx_chunks.inc()
2022-02-12 18:44:20 +00:00
if buf:
# seek end
2022-02-16 20:32:56 +00:00
marker = data.find(b"\xff\xd9")
2022-02-12 18:44:20 +00:00
if marker < 0:
buf += data
continue
else:
app.ctx.last_frame = Frame(buf + data[:marker+2])
2022-02-20 07:53:19 +00:00
gauge_last_frame.set(time())
2022-02-12 18:44:20 +00:00
reference = app.ctx.last_frame.mask
app.ctx.frames.append(reference)
if app.ctx.avg is None:
app.ctx.avg = np.copy(reference)
else:
app.ctx.avg += reference
if len(app.ctx.frames) > 2 ** SLIDE_WINDOW:
app.ctx.avg -= app.ctx.frames[0]
app.ctx.frames = app.ctx.frames[1:]
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW:
2022-02-16 20:32:56 +00:00
app.ctx.thresh = cv2.inRange(cv2.absdiff(
app.ctx.last_frame.mask,
app.ctx.avg >> SLIDE_WINDOW), 25, 65535)
2022-02-12 18:44:20 +00:00
else:
app.ctx.thresh = None
2022-02-16 20:32:56 +00:00
gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] *
app.ctx.last_frame.mask.shape[1])
2022-02-12 18:44:20 +00:00
movement_detected = False
if app.ctx.thresh is not None:
differing_blocks = np.count_nonzero(app.ctx.thresh)
gauge_active_blocks.set(differing_blocks)
if differing_blocks > THRESHOLD_BLOCKS:
counter_movement_frames.inc()
movement_detected = True
if movement_detected:
if app.ctx.motion_frames < 30:
app.ctx.motion_frames += 1
else:
if app.ctx.motion_frames > 0:
app.ctx.motion_frames -= 1
if app.ctx.motion_frames > 20:
if not app.ctx.motion_start:
app.ctx.motion_start = datetime.utcnow()
print("Movement start")
elif app.ctx.motion_frames < 5:
app.ctx.motion_start = None
print("Movement end")
app.ctx.event_frame.set()
app.ctx.event_frame.clear()
data = data[marker+2:]
buf = b""
counter_rx_frames.inc()
# seek begin
2022-02-16 20:32:56 +00:00
marker = data.find(b"\xff\xd8")
2022-02-12 18:44:20 +00:00
if marker >= 0:
buf = data[marker:]
else:
counter_dropped_bytes.inc(len(data))
async def client():
2022-02-13 17:53:47 +00:00
while True:
2022-02-16 20:32:56 +00:00
to = aiohttp.ClientTimeout(connect=5, sock_read=2)
async with aiohttp.ClientSession(timeout=to) as session:
2022-02-13 17:53:47 +00:00
print("Opening upstream connection to %s" % url)
2022-02-12 18:44:20 +00:00
try:
2022-02-13 17:53:47 +00:00
async with session.get(url) as resp:
await client_connect(resp)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
print("Caught exception %s" % j)
counter_errors.labels(exception=j).inc()
await asyncio.sleep(1)
2022-02-12 18:44:20 +00:00
app = Sanic("lease")
app.config["WTF_CSRF_ENABLED"] = False
@app.route("/bypass")
async def bypass_stream_wrapper(request):
async def stream_camera(response):
while True:
await app.ctx.event_frame.wait()
2022-02-16 20:32:56 +00:00
data = CHUNK_BOUNDARY + app.ctx.last_frame.blob
2022-02-12 18:44:20 +00:00
await response.write(data)
counter_tx_bytes.inc(len(data))
counter_tx_frames.inc()
return response.stream(
stream_camera,
2022-02-16 20:32:56 +00:00
content_type="multipart/x-mixed-replace; boundary=frame"
2022-02-12 18:44:20 +00:00
)
2022-02-16 20:32:56 +00:00
2022-02-12 18:44:20 +00:00
@app.route("/debug")
async def stream_wrapper(request):
async def stream_camera(response):
while True:
await app.ctx.event_frame.wait()
2022-02-16 20:32:56 +00:00
arr = np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
2022-02-12 18:44:20 +00:00
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW:
for y in range(0, len(app.ctx.last_frame.mask)):
for x in range(0, len(app.ctx.last_frame.mask[0])):
if app.ctx.thresh[y][x] > 0:
2022-02-16 20:32:56 +00:00
img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE,
x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE, 2] = 255
2022-02-12 18:44:20 +00:00
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
2022-02-16 20:32:56 +00:00
data = CHUNK_BOUNDARY + jpeg.tobytes()
2022-02-12 18:44:20 +00:00
await response.write(data)
counter_tx_bytes.inc(len(data))
counter_tx_frames.inc()
return response.stream(
stream_camera,
2022-02-16 20:32:56 +00:00
content_type="multipart/x-mixed-replace; boundary=frame"
2022-02-12 18:44:20 +00:00
)
2022-02-13 11:17:29 +00:00
@app.route("/readyz")
async def ready_check(request):
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW:
return response.text("OK")
return response.text("Not enough frames", status=503)
2022-02-16 20:32:56 +00:00
@app.route("/event")
2022-02-12 18:44:20 +00:00
async def wrapper_stream_event(request):
async def stream_event(response):
while True:
await app.ctx.event_frame.wait()
if len(app.ctx.frames) < 2 ** SLIDE_WINDOW:
continue
2022-02-16 20:32:56 +00:00
s = "data: " + json.dumps(app.ctx.thresh.tolist()) + "\r\n\r\n"
2022-02-12 18:44:20 +00:00
await response.write(s.encode())
counter_tx_events.inc()
2022-02-16 20:32:56 +00:00
return stream(stream_event, content_type="text/event-stream")
2022-02-12 18:44:20 +00:00
2022-02-16 20:32:56 +00:00
@app.listener("before_server_start")
2022-02-12 18:44:20 +00:00
async def setup_db(app, loop):
app.ctx.last_frame = None
app.ctx.event_frame = asyncio.Event()
app.ctx.frames = []
app.ctx.avg = None
app.ctx.motion_frames = 0
app.ctx.motion_start = None
app.ctx.motion_end = None
2022-02-16 20:32:56 +00:00
asyncio.create_task(client())
2022-02-12 18:44:20 +00:00
monitor(app).expose_endpoint()
try:
2022-02-13 11:18:00 +00:00
app.run(host="0.0.0.0", port=5000)
2022-02-12 18:44:20 +00:00
except KeyboardInterrupt:
asyncio.get_event_loop().close()