This commit is contained in:
Lauri Võsandi 2022-07-10 13:15:54 +03:00
parent 817f148617
commit aa6598ee68
4 changed files with 124 additions and 108 deletions

View File

@ -10,6 +10,7 @@ RUN apt-get update \
python3-numpy \ python3-numpy \
python3-opencv \ python3-opencv \
python3-pip \ python3-pip \
git \
&& pip3 install \ && pip3 install \
aioboto3 \ aioboto3 \
aiohttp \ aiohttp \
@ -17,7 +18,7 @@ RUN apt-get update \
motor \ motor \
prometheus_client \ prometheus_client \
sanic==21.6.2 \ sanic==21.6.2 \
sanic_prometheus \ git+https://github.com/Assarius/sanic-prometheus@Sanic_22 \
&& apt-get remove -y \ && apt-get remove -y \
build-essential \ build-essential \
libjpeg-dev \ libjpeg-dev \

View File

@ -19,8 +19,12 @@ In a nutshell:
Bundled `docker-compose.yml` brings up: Bundled `docker-compose.yml` brings up:
* [camdetect bypass stream](http://localhost:5000/bypass)
* [camdetect debug](http://localhost:5000/debug)
* [Minio](http://localhost:9001/buckets/camdetect/browse) * [Minio](http://localhost:9001/buckets/camdetect/browse)
* [Mongoexpress](http://localhost:8081/db/default/eventlog) * [Mongoexpress](http://localhost:8081/db/default/eventlog)
* [Prometheus](http://localhost:9090/graph)
* [mjpeg-streamer](http://user:123456@localhost:8080/?action=stream)
To manually trigger event: To manually trigger event:

View File

@ -11,23 +11,27 @@ import os
import pymongo import pymongo
import signal import signal
import sys import sys
import botocore.exceptions
from datetime import datetime, timedelta from datetime import datetime, timedelta
from jpeg2dct.numpy import loads from jpeg2dct.numpy import loads
from math import inf
from motor.motor_asyncio import AsyncIOMotorClient from motor.motor_asyncio import AsyncIOMotorClient
from prometheus_client import Counter, Gauge from prometheus_client import Counter, Gauge, Histogram
from sanic import Sanic, response from sanic import Sanic, response
from sanic.response import stream from sanic.log import logger
from sanic_prometheus import monitor from sanic_prometheus import monitor
from sanic.response import stream
from time import time from time import time
_, url = sys.argv _, url = sys.argv
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "camdetect") AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"] AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
S3_ENDPOINT_URL = os.environ["S3_ENDPOINT_URL"] S3_ENDPOINT_URL = os.environ["S3_ENDPOINT_URL"]
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "camdetect") S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "camdetect")
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default")
MONGO_COLLECTION = os.getenv("MONGO_COLLETION", "eventlog") MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "eventlog")
SOURCE_NAME = os.environ["SOURCE_NAME"] SOURCE_NAME = os.environ["SOURCE_NAME"]
SLIDE_WINDOW = 2 SLIDE_WINDOW = 2
DCT_BLOCK_SIZE = 8 DCT_BLOCK_SIZE = 8
@ -38,91 +42,93 @@ THRESHOLD_RATIO = int(os.getenv("THRESHOLD_RATIO", "5"))
CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n" CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n"
counter_dropped_bytes = Counter( hist_active_blocks_ratio = Histogram(
"camdetect_dropped_bytes", "camtiler_active_blocks_ratio",
"Ratio of active DCT blocks",
["roi"],
buckets=(0.01, 0.02, 0.05, 0.1, 0.5, inf))
hist_processing_latency = Histogram(
"camtiler_frame_processing_latency_seconds",
"Frame processing latency",
buckets=(0.01, 0.05, 0.1, 0.5, 1, inf))
hist_upload_latency = Histogram(
"camtiler_frame_upload_latency_seconds",
"Frame processing latency",
buckets=(0.1, 0.5, 1, 5, 10, inf))
counter_events = Counter(
"camtiler_events",
"Count of successfully processed events")
counter_frames = Counter(
"camtiler_frames",
"Count of frames",
["stage"])
counter_dropped_frames = Counter(
"camtiler_dropped_frames",
"Frames that were dropped due to one of queues being full",
["stage"])
counter_discarded_bytes = Counter(
"camtiler_discarded_bytes",
"Bytes that were not not handled or part of actual JPEG frames") "Bytes that were not not handled or part of actual JPEG frames")
counter_rx_bytes = Counter( counter_receive_bytes = Counter(
"camdetect_rx_bytes", "counter_receive_bytes",
"Bytes received over HTTP stream") "Bytes received over HTTP stream")
counter_tx_bytes = Counter( counter_transmit_bytes = Counter(
"camdetect_tx_bytes", "camtiler_transmit_bytes",
"Bytes transmitted over HTTP streams") "Bytes transmitted over HTTP streams")
counter_rx_frames = Counter( counter_receive_frames = Counter(
"camdetect_rx_frames", "camtiler_receive_frames",
"Frames received") "Frames received from upstream")
counter_tx_frames = Counter( counter_transmit_frames = Counter(
"camdetect_tx_frames", "camtiler_transmit_frames",
"Frames transmitted") "Frames transmitted to downstream consumers")
counter_tx_events = Counter( counter_emitted_events = Counter(
"camdetect_tx_events", "camtiler_emitted_events",
"Events emitted") "Events emitted")
counter_rx_chunks = Counter( counter_receive_chunks = Counter(
"camdetect_rx_chunks", "camtiler_receive_chunks",
"HTTP chunks received") "HTTP chunks received")
counter_errors = Counter( counter_errors = Counter(
"camdetect_errors", "camtiler_errors",
"Upstream connection errors", "Upstream connection errors",
["exception"]) ["stage", "exception"])
counter_movement_frames = Counter(
"camdetect_movement_frames",
"Frames with movement detected in them")
counter_uploaded_frames = Counter(
"camdetect_uploaded_frames",
"Frames uploaded via S3")
counter_upload_errors = Counter(
"camdetect_upload_errors",
"Frames upload errors related to S3")
counter_upload_dropped_frames = Counter(
"camdetect_upload_dropped_frames",
"Frames that were dropped due to S3 upload queue being full")
counter_download_dropped_frames = Counter(
"camdetect_download_dropped_frames",
"Frames that were downloaded from camera, but not processed")
gauge_last_frame = Gauge( gauge_last_frame = Gauge(
"camdetect_last_frame", "camtiler_last_frame_timestamp_seconds",
"Timestamp of last frame") "Timestamp of last frame",
gauge_frame_motion_detected = Gauge( ["stage"])
"camdetect_frame_motion_detected", gauge_queue_frames = Gauge(
"Motion detected in frame") "camtiler_queue_frames",
gauge_event_active = Gauge( "Numer of frames in a queue",
"camdetect_event_active", ["stage"])
"Motion event in progress") gauge_build_info = Gauge(
gauge_total_blocks = Gauge( "camtiler_build_info",
"camdetect_total_blocks", "Build info",
"Total DCT blocks") ["git_hash"])
gauge_active_blocks = Gauge(
"camdetect_active_blocks", gauge_build_info.labels(os.getenv("GIT_COMMIT", "null")).set(1)
"Total active, threshold exceeding DCT blocks")
gauge_upload_queue_size = Gauge(
"camdetect_upload_queue_size",
"Number of frames awaiting to be uploaded via S3")
gauge_download_queue_size = Gauge(
"camdetect_download_queue_size",
"Number of frames awaiting to be processed by motion detection loop")
# Reset some gauges # Reset some gauges
gauge_frame_motion_detected.set(0) gauge_queue_frames.labels("download").set(0)
gauge_upload_queue_size.set(0) gauge_queue_frames.labels("hold").set(0)
gauge_download_queue_size.set(0) gauge_queue_frames.labels("upload").set(0)
assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough
async def upload(bucket, blob: bytes, thumb: bytes, event_id): async def upload(bucket, blob: bytes, thumb: bytes, event_id):
""" """
Upload single JPEG blob to S3 bucket Upload single frame to S3 bucket
""" """
# Generate S3 path based on the JPEG blob SHA512 digest # Generate S3 path based on the JPEG blob SHA512 digest
fp = hashlib.sha512(blob).hexdigest() fp = hashlib.sha512(blob).hexdigest()
path = "%s/%s/%s/%s.jpg" % (fp[:4], fp[4:8], fp[8:12], fp[12:]) path = "%s/%s/%s/%s.jpg" % (fp[:4], fp[4:8], fp[8:12], fp[12:])
# First upload the thumbnail try:
await bucket.upload_fileobj(io.BytesIO(thumb), "thumb/%s" % path) await bucket.upload_fileobj(io.BytesIO(thumb), "thumb/%s" % path)
# Proceed to upload the original JPEG frame
await bucket.upload_fileobj(io.BytesIO(blob), path) await bucket.upload_fileobj(io.BytesIO(blob), path)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
counter_errors.labels("upload", j).inc()
# Add screenshot path to the event # Add screenshot path to the event
app.ctx.coll.update_one({ app.ctx.coll.update_one({
@ -133,6 +139,9 @@ async def upload(bucket, blob: bytes, thumb: bytes, event_id):
} }
}) })
counter_frames.labels("stored").inc()
now = datetime.utcnow()
gauge_last_frame.labels("upload").set(now.timestamp())
# TODO: Handle 16MB maximum document size # TODO: Handle 16MB maximum document size
@ -140,16 +149,19 @@ async def uploader(queue):
""" """
Uploader task grabs JPEG blobs from upload queue and uploads them to S3 Uploader task grabs JPEG blobs from upload queue and uploads them to S3
""" """
session = aioboto3.Session( session = aioboto3.Session()
async with session.resource("s3",
aws_access_key_id=AWS_ACCESS_KEY_ID, aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY) aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
async with session.resource("s3", endpoint_url=S3_ENDPOINT_URL) as s3: endpoint_url=S3_ENDPOINT_URL) as s3:
bucket = await s3.Bucket(S3_BUCKET_NAME) bucket = await s3.Bucket(S3_BUCKET_NAME)
while True: while True:
blob, thumb, event_id = await queue.get() dt, blob, thumb, event_id = await queue.get()
gauge_queue_frames.labels("upload").set(queue.qsize())
await upload(bucket, blob, thumb, event_id) await upload(bucket, blob, thumb, event_id)
counter_uploaded_frames.inc() counter_frames.labels("uploaded").inc()
gauge_upload_queue_size.set(queue.qsize()) hist_upload_latency.observe(
(datetime.utcnow() - dt).total_seconds())
class ReferenceFrame(): class ReferenceFrame():
@ -201,6 +213,7 @@ async def motion_detector(reference_frame, download_queue, upload_queue):
while True: while True:
dt, blob, dct, thumb = await download_queue.get() dt, blob, dct, thumb = await download_queue.get()
gauge_queue_frames.labels("download").set(download_queue.qsize())
app.ctx.last_frame, app.ctx.dct = blob, dct app.ctx.last_frame, app.ctx.dct = blob, dct
# Signal /bypass and /debug handlers about new frame # Signal /bypass and /debug handlers about new frame
@ -210,10 +223,6 @@ async def motion_detector(reference_frame, download_queue, upload_queue):
# Separate most significant luma value for each DCT (8x8 pixel) block # Separate most significant luma value for each DCT (8x8 pixel) block
y = np.int16(dct[0][:, :, 0]) y = np.int16(dct[0][:, :, 0])
# Update metrics
gauge_total_blocks.set(y.shape[0] * y.shape[1])
gauge_last_frame.set(dt.timestamp())
reference_frame.put(y) reference_frame.put(y)
try: try:
app.ctx.mask = cv2.inRange(cv2.absdiff(y, app.ctx.mask = cv2.inRange(cv2.absdiff(y,
@ -229,15 +238,16 @@ async def motion_detector(reference_frame, download_queue, upload_queue):
total_blocks = app.ctx.mask.shape[0] * app.ctx.mask.shape[1] total_blocks = app.ctx.mask.shape[0] * app.ctx.mask.shape[1]
threshold_blocks = THRESHOLD_RATIO * total_blocks / 100 threshold_blocks = THRESHOLD_RATIO * total_blocks / 100
average_blocks = sum(differing_blocks) / len(differing_blocks) average_blocks = sum(differing_blocks) / len(differing_blocks)
hist_active_blocks_ratio.labels("main").observe(active_blocks / total_blocks)
motion_detected = average_blocks > threshold_blocks motion_detected = average_blocks > threshold_blocks
# Update metrics now = datetime.utcnow()
gauge_active_blocks.set(active_blocks) gauge_last_frame.labels("processed").set(now.timestamp())
gauge_total_blocks.set(total_blocks) hist_processing_latency.observe((now - dt).total_seconds())
# Propagate SIGUSR1 signal handler # Propagate SIGUSR1 signal handler
if app.ctx.manual_trigger: if app.ctx.manual_trigger:
print("Manually triggering event via SIGUSR1") logger.info("Manually triggering event via SIGUSR1")
motion_detected = True motion_detected = True
app.ctx.manual_trigger = False app.ctx.manual_trigger = False
@ -254,16 +264,16 @@ async def motion_detector(reference_frame, download_queue, upload_queue):
"action": "event", "action": "event",
}) })
app.ctx.event_id = event_id = result.inserted_id app.ctx.event_id = event_id = result.inserted_id
gauge_event_active.set(1)
# Handle buffering frames prior event start # Handle buffering frames prior event start
if hold_queue.full(): if hold_queue.full():
await hold_queue.get() await hold_queue.get()
hold_queue.put_nowait((blob, thumb)) hold_queue.put_nowait((blob, thumb))
gauge_queue_frames.labels("hold").set(hold_queue.qsize())
# Handle image upload # Handle image upload
if motion_detected and event_id: if motion_detected and event_id:
counter_movement_frames.inc() counter_frames.labels("motion").inc()
while True: while True:
if not uploads_skipped: if not uploads_skipped:
uploads_skipped = UPLOAD_FRAMESKIP uploads_skipped = UPLOAD_FRAMESKIP
@ -279,10 +289,10 @@ async def motion_detector(reference_frame, download_queue, upload_queue):
try: try:
# Push JPEG blob into upload queue # Push JPEG blob into upload queue
upload_queue.put_nowait((blob, thumb, event_id)) upload_queue.put_nowait((dt, blob, thumb, event_id))
except asyncio.QueueFull: except asyncio.QueueFull:
counter_upload_dropped_frames.inc() counter_dropped_frames.labels("upload").inc()
gauge_upload_queue_size.set(upload_queue.qsize()) gauge_queue_frames.labels("upload").set(upload_queue.qsize())
# Handle event end # Handle event end
if not motion_detected and event_id: if not motion_detected and event_id:
@ -294,7 +304,6 @@ async def motion_detector(reference_frame, download_queue, upload_queue):
} }
}) })
app.ctx.event_id = event_id = None app.ctx.event_id = event_id = None
gauge_event_active.set(0)
def generate_thumbnail(dct): def generate_thumbnail(dct):
@ -339,11 +348,11 @@ async def download(resp, queue):
DCT coefficients of the frames DCT coefficients of the frames
""" """
buf = b"" buf = b""
print("Upstream connection opened with status:", resp.status) logger.info("Upstream connection opened with status: %d", resp.status)
async for data, end_of_http_chunk in resp.content.iter_chunks(): async for data, end_of_http_chunk in resp.content.iter_chunks():
counter_rx_bytes.inc(len(data)) counter_receive_bytes.inc(len(data))
if end_of_http_chunk: if end_of_http_chunk:
counter_rx_chunks.inc() counter_receive_chunks.inc()
if buf: if buf:
# seek end # seek end
marker = data.find(b"\xff\xd9") marker = data.find(b"\xff\xd9")
@ -357,43 +366,46 @@ async def download(resp, queue):
# Parse DCT coefficients # Parse DCT coefficients
dct = loads(blob) dct = loads(blob)
now = datetime.utcnow()
gauge_last_frame.labels("download").set(now.timestamp())
try: try:
# Convert Y component to 16 bit for easier handling # Convert Y component to 16 bit for easier handling
queue.put_nowait(( queue.put_nowait((
datetime.utcnow(), now,
blob, blob,
dct, dct,
generate_thumbnail(dct))) generate_thumbnail(dct)))
except asyncio.QueueFull: except asyncio.QueueFull:
counter_download_dropped_frames.inc() counter_dropped_frames.labels("download").inc()
gauge_queue_frames.labels("download").set(queue.qsize())
data = data[marker+2:] data = data[marker+2:]
buf = b"" buf = b""
counter_rx_frames.inc() counter_receive_frames.inc()
# seek begin # seek begin
marker = data.find(b"\xff\xd8") marker = data.find(b"\xff\xd8")
if marker >= 0: if marker >= 0:
buf = data[marker:] buf = data[marker:]
else: else:
counter_dropped_bytes.inc(len(data)) counter_discarded_bytes.inc(len(data))
async def downloader(queue: asyncio.Queue): async def downloader(queue: asyncio.Queue):
""" """
Downloader task connects to MJPEG source and Downloader task connects to MJPEG source and
pushes the JPEG frames to a queue pushes the JPEG frames to download queue
""" """
while True: while True:
to = aiohttp.ClientTimeout(connect=5, sock_read=2) to = aiohttp.ClientTimeout(connect=5, sock_read=2)
async with aiohttp.ClientSession(timeout=to) as session: async with aiohttp.ClientSession(timeout=to) as session:
print("Opening upstream connection to %s" % url) logger.info("Opening connection to %s", url)
try: try:
async with session.get(url) as resp: async with session.get(url) as resp:
await download(resp, queue) await download(resp, queue)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
print("Caught exception %s" % j) logger.info("Caught exception %s", j)
counter_errors.labels(exception=j).inc() counter_errors.labels("download", j).inc()
await asyncio.sleep(1) await asyncio.sleep(1)
app = Sanic("camdetect") app = Sanic("camdetect")
@ -414,8 +426,8 @@ async def bypass_stream_wrapper(request):
ts = time() ts = time()
data = CHUNK_BOUNDARY + app.ctx.last_frame data = CHUNK_BOUNDARY + app.ctx.last_frame
await response.write(data) await response.write(data)
counter_tx_bytes.inc(len(data)) counter_transmit_bytes.inc(len(data))
counter_tx_frames.inc() counter_transmit_frames.inc()
return response.stream( return response.stream(
stream_camera, stream_camera,
content_type="multipart/x-mixed-replace; boundary=frame") content_type="multipart/x-mixed-replace; boundary=frame")
@ -446,8 +458,8 @@ async def stream_wrapper(request):
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
data = CHUNK_BOUNDARY + jpeg.tobytes() data = CHUNK_BOUNDARY + jpeg.tobytes()
await response.write(data) await response.write(data)
counter_tx_bytes.inc(len(data)) counter_transmit_bytes.inc(len(data))
counter_tx_frames.inc() counter_transmit_frames.inc()
# Transmit as chunked MJPEG stream # Transmit as chunked MJPEG stream
return response.stream( return response.stream(
@ -477,7 +489,7 @@ async def wrapper_stream_event(request):
continue continue
s = "data: " + json.dumps(app.ctx.mask.tolist()) + "\r\n\r\n" s = "data: " + json.dumps(app.ctx.mask.tolist()) + "\r\n\r\n"
await response.write(s.encode()) await response.write(s.encode())
counter_tx_events.inc() counter_emitted_events.inc()
return stream(stream_event, content_type="text/event-stream") return stream(stream_event, content_type="text/event-stream")

View File

@ -3,16 +3,15 @@ version: '3.7'
# All keys here are for dev instance only, do not put prod keys here # All keys here are for dev instance only, do not put prod keys here
x-common: &common x-common: &common
AWS_ACCESS_KEY_ID: camdetect AWS_ACCESS_KEY_ID: camdetect
MINIO_ROOT_USER: camdetect
AWS_SECRET_ACCESS_KEY: 2mSI6HdbJ8 AWS_SECRET_ACCESS_KEY: 2mSI6HdbJ8
MINIO_ROOT_PASSWORD: 2mSI6HdbJ8
ME_CONFIG_MONGODB_ENABLE_ADMIN: 'true' ME_CONFIG_MONGODB_ENABLE_ADMIN: 'true'
ME_CONFIG_MONGODB_SERVER: '127.0.0.1' ME_CONFIG_MONGODB_SERVER: '127.0.0.1'
ME_CONFIG_MONGODB_AUTH_DATABASE: admin ME_CONFIG_MONGODB_AUTH_DATABASE: admin
MINIO_ACCESS_KEY: camdetect
MINIO_SECRET_KEY: 2mSI6HdbJ8
MINIO_DEFAULT_BUCKETS: camdetect MINIO_DEFAULT_BUCKETS: camdetect
MINIO_URI: 'http://camdetect:2mSI6HdbJ8@127.0.0.1:9000/camdetect' MINIO_URI: 'http://camdetect:2mSI6HdbJ8@127.0.0.1:9000/camdetect'
S3_ENDPOINT_URL: http://127.0.0.1:9000 S3_ENDPOINT_URL: http://127.0.0.1:9000
MINIO_DEFAULT_BUCKETS: camdetect
MINIO_CONSOLE_PORT_NUMBER: 9001 MINIO_CONSOLE_PORT_NUMBER: 9001
MJPEGSTREAMER_CREDENTIALS: user:123456 MJPEGSTREAMER_CREDENTIALS: user:123456
SOURCE_NAME: dummy SOURCE_NAME: dummy