Cloud native motion detection microservice
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
camera-motion-detect/camdetect.py

587 lines
20 KiB

12 months ago
#!/usr/bin/env python3
import aioboto3
12 months ago
import aiohttp
import asyncio
import cv2
import hashlib
import io
import json
12 months ago
import numpy as np
import os
import pymongo
import signal
12 months ago
import sys
7 months ago
import botocore.exceptions
from datetime import datetime, timedelta
from jpeg2dct.numpy import loads
7 months ago
from math import inf
from motor.motor_asyncio import AsyncIOMotorClient
7 months ago
from prometheus_client import Counter, Gauge, Histogram
from pymongo import ReturnDocument
12 months ago
from sanic import Sanic, response
7 months ago
from sanic_json_logging import setup_json_logging
7 months ago
from sanic.log import logger
12 months ago
from sanic_prometheus import monitor
7 months ago
from sanic.response import stream
from time import time
from urllib.parse import urlparse
_, target = sys.argv
# Override basic auth password from env var
basic_auth_password = os.getenv("BASIC_AUTH_PASSWORD")
if basic_auth_password:
o = urlparse(target)
netloc = o.netloc
username = ""
if "@" in netloc:
username, netloc = o.netloc.split("@", 1)
if ":" in username:
username, _ = username.split(":")
target = o._replace(netloc="%s:%s@%s" % (username, basic_auth_password, netloc)).geturl()
12 months ago
7 months ago
AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
7 months ago
S3_ENDPOINT_URL = os.environ["S3_ENDPOINT_URL"]
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "camdetect")
12 months ago
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default")
7 months ago
MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "eventlog")
SOURCE_NAME = os.environ["SOURCE_NAME"]
2 months ago
SLIDE_WINDOW = 4
12 months ago
DCT_BLOCK_SIZE = 8
UPLOAD_FRAMESKIP = 3
CLOCK_SKEW_TOLERANCE = timedelta(seconds=3)
12 months ago
# Percentage of blocks active to consider movement in whole frame
THRESHOLD_RATIO = int(os.getenv("THRESHOLD_RATIO", "5"))
CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n"
12 months ago
7 months ago
hist_active_blocks_ratio = Histogram(
"camtiler_active_blocks_ratio",
"Ratio of active DCT blocks",
["roi"],
buckets=(0.01, 0.02, 0.05, 0.1, 0.5, inf))
hist_processing_latency = Histogram(
"camtiler_frame_processing_latency_seconds",
"Frame processing latency",
buckets=(0.01, 0.05, 0.1, 0.5, 1, inf))
hist_upload_latency = Histogram(
"camtiler_frame_upload_latency_seconds",
"Frame processing latency",
buckets=(0.1, 0.5, 1, 5, 10, inf))
counter_events = Counter(
"camtiler_events",
"Count of successfully processed events")
counter_frames = Counter(
"camtiler_frames",
"Count of frames",
["stage"])
counter_dropped_frames = Counter(
"camtiler_dropped_frames",
"Frames that were dropped due to one of queues being full",
["stage"])
counter_discarded_bytes = Counter(
"camtiler_discarded_bytes",
12 months ago
"Bytes that were not not handled or part of actual JPEG frames")
7 months ago
counter_receive_bytes = Counter(
"counter_receive_bytes",
12 months ago
"Bytes received over HTTP stream")
7 months ago
counter_transmit_bytes = Counter(
"camtiler_transmit_bytes",
12 months ago
"Bytes transmitted over HTTP streams")
7 months ago
counter_receive_frames = Counter(
"camtiler_receive_frames",
"Frames received from upstream")
counter_transmit_frames = Counter(
"camtiler_transmit_frames",
"Frames transmitted to downstream consumers")
counter_emitted_events = Counter(
"camtiler_emitted_events",
12 months ago
"Events emitted")
7 months ago
counter_receive_chunks = Counter(
"camtiler_receive_chunks",
"HTTP chunks received")
counter_errors = Counter(
7 months ago
"camtiler_errors",
"Upstream connection errors",
7 months ago
["stage", "exception"])
gauge_last_frame = Gauge(
7 months ago
"camtiler_last_frame_timestamp_seconds",
"Timestamp of last frame",
["stage"])
gauge_queue_frames = Gauge(
"camtiler_queue_frames",
"Numer of frames in a queue",
["stage"])
gauge_build_info = Gauge(
"docker_build_info",
7 months ago
"Build info",
["git_commit", "git_commit_timestamp"])
7 months ago
gauge_build_info.labels(
os.getenv("GIT_COMMIT", "null"),
os.getenv("GIT_COMMIT_TIMESTAMP", "null")).set(1)
# Reset some gauges
7 months ago
gauge_queue_frames.labels("download").set(0)
gauge_queue_frames.labels("hold").set(0)
gauge_queue_frames.labels("upload").set(0)
counter_frames.labels("motion").inc(0)
counter_frames.labels("downloaded").inc(0)
assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough
12 months ago
async def upload(bucket, blob: bytes, thumb: bytes, event_id):
"""
7 months ago
Upload single frame to S3 bucket
"""
# Generate S3 path based on the JPEG blob SHA512 digest
fp = hashlib.sha512(blob).hexdigest()
path = "%s/%s/%s/%s.jpg" % (fp[:4], fp[4:8], fp[8:12], fp[12:])
12 months ago
7 months ago
try:
await bucket.upload_fileobj(io.BytesIO(thumb), "thumb/%s" % path)
await bucket.upload_fileobj(io.BytesIO(blob), path)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
counter_errors.labels("upload", j).inc()
# Add screenshot path to the event
app.ctx.coll.update_one({
"_id": event_id
}, {
"$addToSet": {
"screenshots": path,
}
})
7 months ago
counter_frames.labels("stored").inc()
now = datetime.utcnow()
gauge_last_frame.labels("upload").set(now.timestamp())
# TODO: Handle 16MB maximum document size
async def uploader(queue):
"""
Uploader task grabs JPEG blobs from upload queue and uploads them to S3
"""
7 months ago
session = aioboto3.Session()
async with session.resource("s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=S3_ENDPOINT_URL) as s3:
bucket = await s3.Bucket(S3_BUCKET_NAME)
while True:
7 months ago
dt, blob, thumb, event_id = await queue.get()
gauge_queue_frames.labels("upload").set(queue.qsize())
12 months ago
await upload(bucket, blob, thumb, event_id)
7 months ago
counter_frames.labels("uploaded").inc()
hist_upload_latency.observe(
(datetime.utcnow() - dt).total_seconds())
class ReferenceFrame():
"""
ReferenceFrame keeps last 2 ^ size frames to infer the background scene
compared to which motion is detected
This is pretty much what background subtractor does in OpenCV,
only difference is that we want have better performance instead of
accuracy
"""
class NotEnoughFrames(Exception):
pass
def __init__(self, size=SLIDE_WINDOW):
self.y = []
self.cumulative = None
self.size = size
def put(self, y):
if self.cumulative is None:
self.cumulative = np.copy(y)
else:
self.cumulative += y
self.y.append(y)
if len(self.y) > 2 ** self.size:
self.cumulative -= self.y[0]
self.y = self.y[1:]
def get(self):
if len(self.y) == 2 ** self.size:
return self.cumulative >> SLIDE_WINDOW
else:
raise self.NotEnoughFrames()
12 months ago
async def motion_detector(reference_frame, download_queue, upload_queue):
"""
Motion detector grabs JPEG blobs and Y channel coefficients
from download queue, performs motion detection and pushes relevant
JPEG blobs to upload queue going to S3
"""
event_id = None
differing_blocks = []
uploads_skipped = 0
# Hold queue keeps frames that we have before motion event start timestamp
hold_queue = asyncio.Queue(2 ** SLIDE_WINDOW)
while True:
12 months ago
dt, blob, dct, thumb = await download_queue.get()
7 months ago
gauge_queue_frames.labels("download").set(download_queue.qsize())
12 months ago
app.ctx.last_frame, app.ctx.dct = blob, dct
# Signal /bypass and /debug handlers about new frame
app.ctx.event_frame.set()
app.ctx.event_frame.clear()
12 months ago
# Separate most significant luma value for each DCT (8x8 pixel) block
y = np.int16(dct[0][:, :, 0])
reference_frame.put(y)
try:
app.ctx.mask = cv2.inRange(cv2.absdiff(y,
reference_frame.get()), 50, 65535)
except ReferenceFrame.NotEnoughFrames:
app.ctx.mask = None
motion_detected = False
else:
# Implement dumb Kalman filter
active_blocks = np.count_nonzero(app.ctx.mask)
differing_blocks.append(active_blocks)
differing_blocks[:] = differing_blocks[-10:]
total_blocks = app.ctx.mask.shape[0] * app.ctx.mask.shape[1]
threshold_blocks = THRESHOLD_RATIO * total_blocks / 100
average_blocks = sum(differing_blocks) / len(differing_blocks)
7 months ago
hist_active_blocks_ratio.labels("main").observe(active_blocks / total_blocks)
motion_detected = average_blocks > threshold_blocks
7 months ago
now = datetime.utcnow()
gauge_last_frame.labels("processed").set(now.timestamp())
hist_processing_latency.observe((now - dt).total_seconds())
# Propagate SIGUSR1 signal handler
if app.ctx.manual_trigger:
7 months ago
logger.info("Manually triggering event via SIGUSR1")
motion_detected = True
app.ctx.manual_trigger = False
# Handle event start
if motion_detected and not event_id:
result = await app.ctx.coll.find_one_and_update({
"@timestamp": {
"$lte": dt + CLOCK_SKEW_TOLERANCE,
"$gte": dt - CLOCK_SKEW_TOLERANCE,
},
6 months ago
"source": SOURCE_NAME,
}, {
"$setOnInsert": {
"@timestamp": dt,
"source": SOURCE_NAME,
6 months ago
"event": "motion-detected",
"started": dt,
"finished": dt + timedelta(minutes=2),
"component": "camdetect",
"screenshots": [],
"action": "event",
}
}, upsert=True, return_document=ReturnDocument.AFTER)
app.ctx.event_id = event_id = result["_id"]
# Handle buffering frames prior event start
if hold_queue.full():
await hold_queue.get()
hold_queue.put_nowait((blob, thumb))
7 months ago
gauge_queue_frames.labels("hold").set(hold_queue.qsize())
# Handle image upload
if motion_detected and event_id:
7 months ago
counter_frames.labels("motion").inc()
while True:
if not uploads_skipped:
uploads_skipped = UPLOAD_FRAMESKIP
else:
uploads_skipped -= 1
continue
# Drain queue of frames prior event start
try:
blob, thumb = hold_queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
# Push JPEG blob into upload queue
7 months ago
upload_queue.put_nowait((dt, blob, thumb, event_id))
except asyncio.QueueFull:
7 months ago
counter_dropped_frames.labels("upload").inc()
gauge_queue_frames.labels("upload").set(upload_queue.qsize())
# Handle event end
if not motion_detected and event_id:
app.ctx.coll.update_one({
"_id": event_id
}, {
"$set": {
"finished": dt,
}
})
app.ctx.event_id = event_id = None
counter_events.inc()
12 months ago
def generate_thumbnail(dct):
"""
This is highly efficient and highly inaccurate function to generate
thumbnail based purely on JPEG coefficients
"""
y, cr, cb = dct
# Determine aspect ratio and minimum dimension for cropping
ar = y.shape[0] < y.shape[1]
dm = (y.shape[0] if ar else y.shape[1]) & 0xfffffff8
# Determine cropping slices to make it square
jl = ((y.shape[1] >> 1) - (dm >> 1) if ar else 0)
jt = (0 if ar else (y.shape[0] >> 1) - (dm >> 1))
jr = jl + dm
jb = jt + dm
# Do the actual crop
ty = y[jt:jb, jl:jr, 0]
tb = cb[jt >> 1:jb >> 1, jl >> 1:jr >> 1, 0]
tr = cr[jt >> 1:jb >> 1, jl >> 1:jr >> 1, 0]
# Upsample chroma, dummy convert first coeff and stack all channels
m = np.dstack((
np.array((ty >> 3) + 127, dtype=np.uint8),
np.array(
(tb.repeat(2, 1).repeat(2, 0) >> 3) + 127, dtype=np.uint8)[:dm],
np.array(
(tr.repeat(2, 1).repeat(2, 0) >> 3) + 127, dtype=np.uint8)[:dm]))
_, jpeg = cv2.imencode(".jpg",
cv2.cvtColor(m, cv2.COLOR_YCrCb2BGR),
12 months ago
(cv2.IMWRITE_JPEG_QUALITY, 80))
return jpeg
async def download(resp, queue):
"""
This coroutine iterates over HTTP connection chunks
assembling the original JPEG blobs and decodes the
DCT coefficients of the frames
"""
12 months ago
buf = b""
7 months ago
logger.info("Upstream connection opened with status: %d", resp.status)
12 months ago
async for data, end_of_http_chunk in resp.content.iter_chunks():
7 months ago
counter_receive_bytes.inc(len(data))
12 months ago
if end_of_http_chunk:
7 months ago
counter_receive_chunks.inc()
12 months ago
if buf:
# seek end
marker = data.find(b"\xff\xd9")
12 months ago
if marker < 0:
buf += data
continue
else:
# Assemble JPEG blob
6 months ago
blob = buf + data[:marker + 2]
12 months ago
# Parse DCT coefficients
try:
dct = loads(blob)
except RuntimeError:
counter_frames.labels("corrupted").inc()
else:
now = datetime.utcnow()
gauge_last_frame.labels("download").set(now.timestamp())
try:
queue.put_nowait((
now,
blob,
dct,
generate_thumbnail(dct)))
except asyncio.QueueFull:
counter_dropped_frames.labels("download").inc()
else:
counter_frames.labels("downloaded").inc()
gauge_queue_frames.labels("download").set(queue.qsize())
6 months ago
data = data[marker + 2:]
12 months ago
buf = b""
7 months ago
counter_receive_frames.inc()
12 months ago
# seek begin
marker = data.rfind(b"\xff\xd8")
12 months ago
if marker >= 0:
buf = data[marker:]
else:
7 months ago
counter_discarded_bytes.inc(len(data))
12 months ago
async def downloader(queue: asyncio.Queue):
"""
Downloader task connects to MJPEG source and
7 months ago
pushes the JPEG frames to download queue
"""
while True:
to = aiohttp.ClientTimeout(connect=5, sock_read=2)
async with aiohttp.ClientSession(timeout=to) as session:
logger.info("Opening connection to %s", target)
12 months ago
try:
async with session.get(target) as resp:
await download(resp, queue)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
7 months ago
logger.info("Caught exception %s", j)
counter_errors.labels("download", j).inc()
await asyncio.sleep(1)
12 months ago
app = Sanic("camdetect")
7 months ago
setup_json_logging(app)
12 months ago
@app.route("/bypass")
async def bypass_stream_wrapper(request):
# Desired frame interval, by default 500ms
interval = float(request.args.get("interval", 500)) / 1000.0
12 months ago
async def stream_camera(response):
ts = 0
12 months ago
while True:
while True:
await app.ctx.event_frame.wait()
if time() > ts + interval:
break
ts = time()
data = CHUNK_BOUNDARY + app.ctx.last_frame
12 months ago
await response.write(data)
7 months ago
counter_transmit_bytes.inc(len(data))
counter_transmit_frames.inc()
12 months ago
return response.stream(
6 months ago
stream_camera,
content_type="multipart/x-mixed-replace; boundary=frame")
12 months ago
12 months ago
@app.route("/debug")
async def stream_wrapper(request):
async def stream_camera(response):
while True:
await app.ctx.event_frame.wait()
# Parse JPEG blob
arr = np.frombuffer(app.ctx.last_frame, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
# Highlight green or red channel depending on whether
# motion event is in progress or not
channel = 2 if app.ctx.event_id else 1
if app.ctx.mask is not None:
for y in range(0, app.ctx.mask.shape[0]):
for x in range(0, app.ctx.mask.shape[1]):
if app.ctx.mask[y][x] > 0:
6 months ago
img[y * DCT_BLOCK_SIZE:(y + 1) * DCT_BLOCK_SIZE,
x * DCT_BLOCK_SIZE:(x + 1) * DCT_BLOCK_SIZE,
channel] = 255
12 months ago
# Compress modified frame as JPEG frame
12 months ago
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
data = CHUNK_BOUNDARY + jpeg.tobytes()
12 months ago
await response.write(data)
7 months ago
counter_transmit_bytes.inc(len(data))
counter_transmit_frames.inc()
12 months ago
# Transmit as chunked MJPEG stream
12 months ago
return response.stream(
stream_camera,
content_type="multipart/x-mixed-replace; boundary=frame"
12 months ago
)
@app.route("/readyz")
async def ready_check(request):
logger.info("Testing if Mongo is accessible")
try:
async for i in app.ctx.coll.find().limit(1):
break
except pymongo.errors.ServerSelectionTimeoutError:
return response.text("MongoDB server selection timeout", status=503)
session = aioboto3.Session()
logger.info("Testing if S3 is writable")
async with session.resource("s3",
6 months ago
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=S3_ENDPOINT_URL) as s3:
bucket = await s3.Bucket(S3_BUCKET_NAME)
await bucket.upload_fileobj(io.BytesIO(b"test"), "test")
return response.text("OK")
print("Checking if there are any frames received")
if app.ctx.mask is None:
return response.text("Not enough frames", status=503)
@app.route("/healthz")
async def health_check(request):
if app.ctx.mask is None:
return response.text("Not enough frames", status=503)
@app.route("/event")
12 months ago
async def wrapper_stream_event(request):
async def stream_event(response):
while True:
await app.ctx.event_frame.wait()
if app.ctx.mask is not None:
12 months ago
continue
s = "data: " + json.dumps(app.ctx.mask.tolist()) + "\r\n\r\n"
12 months ago
await response.write(s.encode())
7 months ago
counter_emitted_events.inc()
return stream(stream_event, content_type="text/event-stream")
12 months ago
def handler(signum, frame):
# SIGUSR1 handler for manually triggering an event
app.ctx.manual_trigger = True
@app.listener("before_server_start")
12 months ago
async def setup_db(app, loop):
app.ctx.mask = None
app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database()
app.ctx.coll = app.ctx.db[MONGO_COLLECTION]
app.ctx.coll.create_index([
("@timestamp", pymongo.ASCENDING)])
app.ctx.coll.create_index([
("source", pymongo.ASCENDING),
("@timestamp", pymongo.ASCENDING)], unique=True)
12 months ago
app.ctx.last_frame = None
app.ctx.event_frame = asyncio.Event()
app.ctx.event_id = None
app.ctx.manual_trigger = False
signal.signal(signal.SIGUSR1, handler)
# Set up processing pipeline
download_queue = asyncio.Queue(50)
upload_queue = asyncio.Queue(50)
asyncio.create_task(uploader(
upload_queue))
asyncio.create_task(downloader(
download_queue))
asyncio.create_task(motion_detector(
ReferenceFrame(),
download_queue,
upload_queue))
12 months ago
monitor(app).expose_endpoint()
app.run(host="0.0.0.0",
port=5000,
debug=bool(os.getenv("DEBUG", 0)))