camera-motion-detect/camdetect.py

534 lines
18 KiB
Python
Raw Normal View History

2022-02-12 18:44:20 +00:00
#!/usr/bin/env python3
import aioboto3
2022-02-12 18:44:20 +00:00
import aiohttp
import asyncio
import cv2
import hashlib
import io
import json
2022-02-12 18:44:20 +00:00
import numpy as np
import os
2022-02-27 15:02:55 +00:00
import pymongo
import signal
2022-02-12 18:44:20 +00:00
import sys
2022-07-10 10:15:54 +00:00
import botocore.exceptions
from datetime import datetime, timedelta
2022-02-16 20:32:56 +00:00
from jpeg2dct.numpy import loads
2022-07-10 10:15:54 +00:00
from math import inf
from motor.motor_asyncio import AsyncIOMotorClient
2022-07-10 10:15:54 +00:00
from prometheus_client import Counter, Gauge, Histogram
2022-02-12 18:44:20 +00:00
from sanic import Sanic, response
2022-07-11 10:28:22 +00:00
from sanic_json_logging import setup_json_logging
2022-07-10 10:15:54 +00:00
from sanic.log import logger
2022-02-12 18:44:20 +00:00
from sanic_prometheus import monitor
2022-07-10 10:15:54 +00:00
from sanic.response import stream
from time import time
2022-02-12 18:44:20 +00:00
_, url = sys.argv
2022-07-10 10:15:54 +00:00
AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
2022-07-10 10:15:54 +00:00
S3_ENDPOINT_URL = os.environ["S3_ENDPOINT_URL"]
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "camdetect")
2022-02-12 18:44:20 +00:00
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default")
2022-07-10 10:15:54 +00:00
MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "eventlog")
SOURCE_NAME = os.environ["SOURCE_NAME"]
2022-02-12 18:44:20 +00:00
SLIDE_WINDOW = 2
DCT_BLOCK_SIZE = 8
2022-03-06 06:25:54 +00:00
UPLOAD_FRAMESKIP = 3
2022-02-12 18:44:20 +00:00
# Percentage of blocks active to consider movement in whole frame
THRESHOLD_RATIO = int(os.getenv("THRESHOLD_RATIO", "5"))
2022-02-16 20:32:56 +00:00
CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n"
2022-02-12 18:44:20 +00:00
2022-07-10 10:15:54 +00:00
hist_active_blocks_ratio = Histogram(
"camtiler_active_blocks_ratio",
"Ratio of active DCT blocks",
["roi"],
buckets=(0.01, 0.02, 0.05, 0.1, 0.5, inf))
hist_processing_latency = Histogram(
"camtiler_frame_processing_latency_seconds",
"Frame processing latency",
buckets=(0.01, 0.05, 0.1, 0.5, 1, inf))
hist_upload_latency = Histogram(
"camtiler_frame_upload_latency_seconds",
"Frame processing latency",
buckets=(0.1, 0.5, 1, 5, 10, inf))
counter_events = Counter(
"camtiler_events",
"Count of successfully processed events")
counter_frames = Counter(
"camtiler_frames",
"Count of frames",
["stage"])
counter_dropped_frames = Counter(
"camtiler_dropped_frames",
"Frames that were dropped due to one of queues being full",
["stage"])
counter_discarded_bytes = Counter(
"camtiler_discarded_bytes",
2022-02-12 18:44:20 +00:00
"Bytes that were not not handled or part of actual JPEG frames")
2022-07-10 10:15:54 +00:00
counter_receive_bytes = Counter(
"counter_receive_bytes",
2022-02-12 18:44:20 +00:00
"Bytes received over HTTP stream")
2022-07-10 10:15:54 +00:00
counter_transmit_bytes = Counter(
"camtiler_transmit_bytes",
2022-02-12 18:44:20 +00:00
"Bytes transmitted over HTTP streams")
2022-07-10 10:15:54 +00:00
counter_receive_frames = Counter(
"camtiler_receive_frames",
"Frames received from upstream")
counter_transmit_frames = Counter(
"camtiler_transmit_frames",
"Frames transmitted to downstream consumers")
counter_emitted_events = Counter(
"camtiler_emitted_events",
2022-02-12 18:44:20 +00:00
"Events emitted")
2022-07-10 10:15:54 +00:00
counter_receive_chunks = Counter(
"camtiler_receive_chunks",
2022-02-16 20:32:56 +00:00
"HTTP chunks received")
counter_errors = Counter(
2022-07-10 10:15:54 +00:00
"camtiler_errors",
2022-02-13 17:53:47 +00:00
"Upstream connection errors",
2022-07-10 10:15:54 +00:00
["stage", "exception"])
2022-02-20 07:53:19 +00:00
gauge_last_frame = Gauge(
2022-07-10 10:15:54 +00:00
"camtiler_last_frame_timestamp_seconds",
"Timestamp of last frame",
["stage"])
gauge_queue_frames = Gauge(
"camtiler_queue_frames",
"Numer of frames in a queue",
["stage"])
gauge_build_info = Gauge(
"camtiler_build_info",
"Build info",
["git_hash"])
gauge_build_info.labels(os.getenv("GIT_COMMIT", "null")).set(1)
# Reset some gauges
2022-07-10 10:15:54 +00:00
gauge_queue_frames.labels("download").set(0)
gauge_queue_frames.labels("hold").set(0)
gauge_queue_frames.labels("upload").set(0)
assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough
2022-02-27 11:12:22 +00:00
async def upload(bucket, blob: bytes, thumb: bytes, event_id):
"""
2022-07-10 10:15:54 +00:00
Upload single frame to S3 bucket
"""
# Generate S3 path based on the JPEG blob SHA512 digest
fp = hashlib.sha512(blob).hexdigest()
path = "%s/%s/%s/%s.jpg" % (fp[:4], fp[4:8], fp[8:12], fp[12:])
2022-02-27 11:12:22 +00:00
2022-07-10 10:15:54 +00:00
try:
await bucket.upload_fileobj(io.BytesIO(thumb), "thumb/%s" % path)
await bucket.upload_fileobj(io.BytesIO(blob), path)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
counter_errors.labels("upload", j).inc()
# Add screenshot path to the event
app.ctx.coll.update_one({
"_id": event_id
}, {
"$addToSet": {
"screenshots": path,
}
})
2022-07-10 10:15:54 +00:00
counter_frames.labels("stored").inc()
now = datetime.utcnow()
gauge_last_frame.labels("upload").set(now.timestamp())
# TODO: Handle 16MB maximum document size
async def uploader(queue):
"""
Uploader task grabs JPEG blobs from upload queue and uploads them to S3
"""
2022-07-10 10:15:54 +00:00
session = aioboto3.Session()
async with session.resource("s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=S3_ENDPOINT_URL) as s3:
bucket = await s3.Bucket(S3_BUCKET_NAME)
while True:
2022-07-10 10:15:54 +00:00
dt, blob, thumb, event_id = await queue.get()
gauge_queue_frames.labels("upload").set(queue.qsize())
2022-02-27 11:12:22 +00:00
await upload(bucket, blob, thumb, event_id)
2022-07-10 10:15:54 +00:00
counter_frames.labels("uploaded").inc()
hist_upload_latency.observe(
(datetime.utcnow() - dt).total_seconds())
class ReferenceFrame():
"""
ReferenceFrame keeps last 2 ^ size frames to infer the background scene
compared to which motion is detected
This is pretty much what background subtractor does in OpenCV,
only difference is that we want have better performance instead of
accuracy
"""
class NotEnoughFrames(Exception):
pass
def __init__(self, size=SLIDE_WINDOW):
self.y = []
self.cumulative = None
self.size = size
def put(self, y):
if self.cumulative is None:
self.cumulative = np.copy(y)
else:
self.cumulative += y
self.y.append(y)
if len(self.y) > 2 ** self.size:
self.cumulative -= self.y[0]
self.y = self.y[1:]
def get(self):
if len(self.y) == 2 ** self.size:
return self.cumulative >> SLIDE_WINDOW
else:
raise self.NotEnoughFrames()
2022-02-12 18:44:20 +00:00
2022-02-16 20:32:56 +00:00
async def motion_detector(reference_frame, download_queue, upload_queue):
"""
Motion detector grabs JPEG blobs and Y channel coefficients
from download queue, performs motion detection and pushes relevant
JPEG blobs to upload queue going to S3
"""
event_id = None
differing_blocks = []
2022-03-06 06:25:54 +00:00
uploads_skipped = 0
# Hold queue keeps frames that we have before motion event start timestamp
hold_queue = asyncio.Queue(2 ** SLIDE_WINDOW)
while True:
2022-02-27 11:12:22 +00:00
dt, blob, dct, thumb = await download_queue.get()
2022-07-10 10:15:54 +00:00
gauge_queue_frames.labels("download").set(download_queue.qsize())
2022-02-27 11:12:22 +00:00
app.ctx.last_frame, app.ctx.dct = blob, dct
# Signal /bypass and /debug handlers about new frame
app.ctx.event_frame.set()
app.ctx.event_frame.clear()
2022-02-27 11:12:22 +00:00
# Separate most significant luma value for each DCT (8x8 pixel) block
y = np.int16(dct[0][:, :, 0])
reference_frame.put(y)
try:
app.ctx.mask = cv2.inRange(cv2.absdiff(y,
reference_frame.get()), 25, 65535)
except ReferenceFrame.NotEnoughFrames:
app.ctx.mask = None
motion_detected = False
else:
# Implement dumb Kalman filter
active_blocks = np.count_nonzero(app.ctx.mask)
differing_blocks.append(active_blocks)
differing_blocks[:] = differing_blocks[-10:]
total_blocks = app.ctx.mask.shape[0] * app.ctx.mask.shape[1]
threshold_blocks = THRESHOLD_RATIO * total_blocks / 100
average_blocks = sum(differing_blocks) / len(differing_blocks)
2022-07-10 10:15:54 +00:00
hist_active_blocks_ratio.labels("main").observe(active_blocks / total_blocks)
motion_detected = average_blocks > threshold_blocks
2022-07-10 10:15:54 +00:00
now = datetime.utcnow()
gauge_last_frame.labels("processed").set(now.timestamp())
hist_processing_latency.observe((now - dt).total_seconds())
# Propagate SIGUSR1 signal handler
if app.ctx.manual_trigger:
2022-07-10 10:15:54 +00:00
logger.info("Manually triggering event via SIGUSR1")
motion_detected = True
app.ctx.manual_trigger = False
# Handle event start
if motion_detected and not event_id:
result = await app.ctx.coll.insert_one({
"timestamp": dt,
"event": "motion-detected",
"started": dt,
"finished": dt + timedelta(minutes=2),
"component": "camdetect",
"source": SOURCE_NAME,
"screenshots": [],
"action": "event",
})
app.ctx.event_id = event_id = result.inserted_id
2022-03-06 06:25:54 +00:00
# Handle buffering frames prior event start
if hold_queue.full():
await hold_queue.get()
hold_queue.put_nowait((blob, thumb))
2022-07-10 10:15:54 +00:00
gauge_queue_frames.labels("hold").set(hold_queue.qsize())
2022-03-06 06:25:54 +00:00
# Handle image upload
if motion_detected and event_id:
2022-07-10 10:15:54 +00:00
counter_frames.labels("motion").inc()
2022-03-06 06:25:54 +00:00
while True:
if not uploads_skipped:
uploads_skipped = UPLOAD_FRAMESKIP
else:
uploads_skipped -= 1
continue
# Drain queue of frames prior event start
try:
blob, thumb = hold_queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
# Push JPEG blob into upload queue
2022-07-10 10:15:54 +00:00
upload_queue.put_nowait((dt, blob, thumb, event_id))
2022-03-06 06:25:54 +00:00
except asyncio.QueueFull:
2022-07-10 10:15:54 +00:00
counter_dropped_frames.labels("upload").inc()
gauge_queue_frames.labels("upload").set(upload_queue.qsize())
# Handle event end
if not motion_detected and event_id:
app.ctx.coll.update_one({
"_id": event_id
}, {
"$set": {
"finished": dt,
}
})
app.ctx.event_id = event_id = None
2022-02-27 11:12:22 +00:00
def generate_thumbnail(dct):
"""
This is highly efficient and highly inaccurate function to generate
thumbnail based purely on JPEG coefficients
"""
y, cr, cb = dct
# Determine aspect ratio and minimum dimension for cropping
ar = y.shape[0] < y.shape[1]
dm = (y.shape[0] if ar else y.shape[1]) & 0xfffffff8
# Determine cropping slices to make it square
jl = ((y.shape[1] >> 1) - (dm >> 1) if ar else 0)
jt = (0 if ar else (y.shape[0] >> 1) - (dm >> 1))
jr = jl + dm
jb = jt + dm
# Do the actual crop
ty = y[jt:jb, jl:jr, 0]
tb = cb[jt >> 1:jb >> 1, jl >> 1:jr >> 1, 0]
tr = cr[jt >> 1:jb >> 1, jl >> 1:jr >> 1, 0]
# Upsample chroma, dummy convert first coeff and stack all channels
m = np.dstack((
np.array((ty >> 3) + 127, dtype=np.uint8),
np.array(
(tb.repeat(2, 1).repeat(2, 0) >> 3) + 127, dtype=np.uint8)[:dm],
np.array(
(tr.repeat(2, 1).repeat(2, 0) >> 3) + 127, dtype=np.uint8)[:dm]))
_, jpeg = cv2.imencode(".jpg",
2022-03-06 06:25:54 +00:00
cv2.cvtColor(m, cv2.COLOR_YCrCb2BGR),
2022-02-27 11:12:22 +00:00
(cv2.IMWRITE_JPEG_QUALITY, 80))
return jpeg
async def download(resp, queue):
"""
This coroutine iterates over HTTP connection chunks
assembling the original JPEG blobs and decodes the
DCT coefficients of the frames
"""
2022-02-12 18:44:20 +00:00
buf = b""
2022-07-10 10:15:54 +00:00
logger.info("Upstream connection opened with status: %d", resp.status)
2022-02-12 18:44:20 +00:00
async for data, end_of_http_chunk in resp.content.iter_chunks():
2022-07-10 10:15:54 +00:00
counter_receive_bytes.inc(len(data))
2022-02-12 18:44:20 +00:00
if end_of_http_chunk:
2022-07-10 10:15:54 +00:00
counter_receive_chunks.inc()
2022-02-12 18:44:20 +00:00
if buf:
# seek end
2022-02-16 20:32:56 +00:00
marker = data.find(b"\xff\xd9")
2022-02-12 18:44:20 +00:00
if marker < 0:
buf += data
continue
else:
# Assemble JPEG blob
blob = buf + data[:marker+2]
2022-02-27 11:12:22 +00:00
# Parse DCT coefficients
dct = loads(blob)
2022-07-10 10:15:54 +00:00
now = datetime.utcnow()
gauge_last_frame.labels("download").set(now.timestamp())
try:
# Convert Y component to 16 bit for easier handling
queue.put_nowait((
2022-07-10 10:15:54 +00:00
now,
blob,
2022-02-27 11:12:22 +00:00
dct,
generate_thumbnail(dct)))
except asyncio.QueueFull:
2022-07-10 10:15:54 +00:00
counter_dropped_frames.labels("download").inc()
gauge_queue_frames.labels("download").set(queue.qsize())
2022-02-12 18:44:20 +00:00
data = data[marker+2:]
buf = b""
2022-07-10 10:15:54 +00:00
counter_receive_frames.inc()
2022-02-12 18:44:20 +00:00
# seek begin
2022-02-16 20:32:56 +00:00
marker = data.find(b"\xff\xd8")
2022-02-12 18:44:20 +00:00
if marker >= 0:
buf = data[marker:]
else:
2022-07-10 10:15:54 +00:00
counter_discarded_bytes.inc(len(data))
2022-02-12 18:44:20 +00:00
async def downloader(queue: asyncio.Queue):
"""
Downloader task connects to MJPEG source and
2022-07-10 10:15:54 +00:00
pushes the JPEG frames to download queue
"""
2022-02-13 17:53:47 +00:00
while True:
2022-02-16 20:32:56 +00:00
to = aiohttp.ClientTimeout(connect=5, sock_read=2)
async with aiohttp.ClientSession(timeout=to) as session:
2022-07-10 10:15:54 +00:00
logger.info("Opening connection to %s", url)
2022-02-12 18:44:20 +00:00
try:
2022-02-13 17:53:47 +00:00
async with session.get(url) as resp:
await download(resp, queue)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
2022-07-10 10:15:54 +00:00
logger.info("Caught exception %s", j)
counter_errors.labels("download", j).inc()
await asyncio.sleep(1)
2022-02-12 18:44:20 +00:00
app = Sanic("camdetect")
2022-07-11 10:28:22 +00:00
setup_json_logging(app)
2022-02-12 18:44:20 +00:00
@app.route("/bypass")
async def bypass_stream_wrapper(request):
# Desired frame interval, by default 500ms
interval = float(request.args.get("interval", 500)) / 1000.0
2022-03-06 06:25:54 +00:00
2022-02-12 18:44:20 +00:00
async def stream_camera(response):
ts = 0
2022-02-12 18:44:20 +00:00
while True:
while True:
await app.ctx.event_frame.wait()
if time() > ts + interval:
break
ts = time()
data = CHUNK_BOUNDARY + app.ctx.last_frame
2022-02-12 18:44:20 +00:00
await response.write(data)
2022-07-10 10:15:54 +00:00
counter_transmit_bytes.inc(len(data))
counter_transmit_frames.inc()
2022-02-12 18:44:20 +00:00
return response.stream(
stream_camera,
content_type="multipart/x-mixed-replace; boundary=frame")
2022-02-12 18:44:20 +00:00
2022-02-16 20:32:56 +00:00
2022-02-12 18:44:20 +00:00
@app.route("/debug")
async def stream_wrapper(request):
async def stream_camera(response):
while True:
await app.ctx.event_frame.wait()
# Parse JPEG blob
arr = np.frombuffer(app.ctx.last_frame, dtype=np.uint8)
2022-02-16 20:32:56 +00:00
img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
# Highlight green or red channel depending on whether
# motion event is in progress or not
channel = 2 if app.ctx.event_id else 1
if app.ctx.mask is not None:
for y in range(0, app.ctx.mask.shape[0]):
for x in range(0, app.ctx.mask.shape[1]):
if app.ctx.mask[y][x] > 0:
2022-02-16 20:32:56 +00:00
img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE,
x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE,
channel] = 255
2022-02-12 18:44:20 +00:00
# Compress modified frame as JPEG frame
2022-02-12 18:44:20 +00:00
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
2022-02-16 20:32:56 +00:00
data = CHUNK_BOUNDARY + jpeg.tobytes()
2022-02-12 18:44:20 +00:00
await response.write(data)
2022-07-10 10:15:54 +00:00
counter_transmit_bytes.inc(len(data))
counter_transmit_frames.inc()
2022-02-12 18:44:20 +00:00
# Transmit as chunked MJPEG stream
2022-02-12 18:44:20 +00:00
return response.stream(
stream_camera,
2022-02-16 20:32:56 +00:00
content_type="multipart/x-mixed-replace; boundary=frame"
2022-02-12 18:44:20 +00:00
)
2022-02-13 11:17:29 +00:00
@app.route("/readyz")
async def ready_check(request):
2022-02-27 15:02:55 +00:00
try:
async for i in app.ctx.coll.find().limit(1):
break
except pymongo.errors.ServerSelectionTimeoutError:
return response.text("MongoDB server selection timeout", status=503)
if app.ctx.mask is None:
return response.text("Not enough frames", status=503)
return response.text("OK")
2022-02-13 11:17:29 +00:00
2022-02-16 20:32:56 +00:00
@app.route("/event")
2022-02-12 18:44:20 +00:00
async def wrapper_stream_event(request):
async def stream_event(response):
while True:
await app.ctx.event_frame.wait()
if app.ctx.mask is not None:
2022-02-12 18:44:20 +00:00
continue
s = "data: " + json.dumps(app.ctx.mask.tolist()) + "\r\n\r\n"
2022-02-12 18:44:20 +00:00
await response.write(s.encode())
2022-07-10 10:15:54 +00:00
counter_emitted_events.inc()
2022-02-16 20:32:56 +00:00
return stream(stream_event, content_type="text/event-stream")
2022-02-12 18:44:20 +00:00
def handler(signum, frame):
# SIGUSR1 handler for manually triggering an event
app.ctx.manual_trigger = True
2022-02-16 20:32:56 +00:00
@app.listener("before_server_start")
2022-02-12 18:44:20 +00:00
async def setup_db(app, loop):
app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database()
app.ctx.coll = app.ctx.db[MONGO_COLLECTION]
2022-02-12 18:44:20 +00:00
app.ctx.last_frame = None
app.ctx.event_frame = asyncio.Event()
app.ctx.event_id = None
app.ctx.manual_trigger = False
signal.signal(signal.SIGUSR1, handler)
# Set up processing pipeline
download_queue = asyncio.Queue()
upload_queue = asyncio.Queue()
asyncio.create_task(uploader(
upload_queue))
asyncio.create_task(downloader(
download_queue))
asyncio.create_task(motion_detector(
ReferenceFrame(),
download_queue,
upload_queue))
2022-02-12 18:44:20 +00:00
monitor(app).expose_endpoint()
try:
app.run(host="0.0.0.0",
port=5000,
debug=bool(os.getenv("DEBUG", 0)))
2022-02-12 18:44:20 +00:00
except KeyboardInterrupt:
asyncio.get_event_loop().close()