From aa6598ee682207df78ed779cf9ef7b51a27ddd52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Sun, 10 Jul 2022 13:15:54 +0300 Subject: [PATCH] Cleanups --- Dockerfile | 3 +- README.md | 4 + camdetect.py | 220 ++++++++++++++++++++++++--------------------- docker-compose.yml | 5 +- 4 files changed, 124 insertions(+), 108 deletions(-) diff --git a/Dockerfile b/Dockerfile index 4e27a16..72eb464 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,7 @@ RUN apt-get update \ python3-numpy \ python3-opencv \ python3-pip \ + git \ && pip3 install \ aioboto3 \ aiohttp \ @@ -17,7 +18,7 @@ RUN apt-get update \ motor \ prometheus_client \ sanic==21.6.2 \ - sanic_prometheus \ + git+https://github.com/Assarius/sanic-prometheus@Sanic_22 \ && apt-get remove -y \ build-essential \ libjpeg-dev \ diff --git a/README.md b/README.md index be28497..1b066bd 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,12 @@ In a nutshell: Bundled `docker-compose.yml` brings up: +* [camdetect bypass stream](http://localhost:5000/bypass) +* [camdetect debug](http://localhost:5000/debug) * [Minio](http://localhost:9001/buckets/camdetect/browse) * [Mongoexpress](http://localhost:8081/db/default/eventlog) +* [Prometheus](http://localhost:9090/graph) +* [mjpeg-streamer](http://user:123456@localhost:8080/?action=stream) To manually trigger event: diff --git a/camdetect.py b/camdetect.py index 8e5d010..2c304e4 100755 --- a/camdetect.py +++ b/camdetect.py @@ -11,23 +11,27 @@ import os import pymongo import signal import sys +import botocore.exceptions from datetime import datetime, timedelta from jpeg2dct.numpy import loads +from math import inf from motor.motor_asyncio import AsyncIOMotorClient -from prometheus_client import Counter, Gauge +from prometheus_client import Counter, Gauge, Histogram from sanic import Sanic, response -from sanic.response import stream +from sanic.log import logger from sanic_prometheus import monitor +from sanic.response import stream from time import time _, url = sys.argv -AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "camdetect") +AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"] AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"] + S3_ENDPOINT_URL = os.environ["S3_ENDPOINT_URL"] S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "camdetect") MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") -MONGO_COLLECTION = os.getenv("MONGO_COLLETION", "eventlog") +MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "eventlog") SOURCE_NAME = os.environ["SOURCE_NAME"] SLIDE_WINDOW = 2 DCT_BLOCK_SIZE = 8 @@ -38,91 +42,93 @@ THRESHOLD_RATIO = int(os.getenv("THRESHOLD_RATIO", "5")) CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n" -counter_dropped_bytes = Counter( - "camdetect_dropped_bytes", +hist_active_blocks_ratio = Histogram( + "camtiler_active_blocks_ratio", + "Ratio of active DCT blocks", + ["roi"], + buckets=(0.01, 0.02, 0.05, 0.1, 0.5, inf)) +hist_processing_latency = Histogram( + "camtiler_frame_processing_latency_seconds", + "Frame processing latency", + buckets=(0.01, 0.05, 0.1, 0.5, 1, inf)) +hist_upload_latency = Histogram( + "camtiler_frame_upload_latency_seconds", + "Frame processing latency", + buckets=(0.1, 0.5, 1, 5, 10, inf)) +counter_events = Counter( + "camtiler_events", + "Count of successfully processed events") +counter_frames = Counter( + "camtiler_frames", + "Count of frames", + ["stage"]) +counter_dropped_frames = Counter( + "camtiler_dropped_frames", + "Frames that were dropped due to one of queues being full", + ["stage"]) +counter_discarded_bytes = Counter( + "camtiler_discarded_bytes", "Bytes that were not not handled or part of actual JPEG frames") -counter_rx_bytes = Counter( - "camdetect_rx_bytes", +counter_receive_bytes = Counter( + "counter_receive_bytes", "Bytes received over HTTP stream") -counter_tx_bytes = Counter( - "camdetect_tx_bytes", +counter_transmit_bytes = Counter( + "camtiler_transmit_bytes", "Bytes transmitted over HTTP streams") -counter_rx_frames = Counter( - "camdetect_rx_frames", - "Frames received") -counter_tx_frames = Counter( - "camdetect_tx_frames", - "Frames transmitted") -counter_tx_events = Counter( - "camdetect_tx_events", +counter_receive_frames = Counter( + "camtiler_receive_frames", + "Frames received from upstream") +counter_transmit_frames = Counter( + "camtiler_transmit_frames", + "Frames transmitted to downstream consumers") +counter_emitted_events = Counter( + "camtiler_emitted_events", "Events emitted") -counter_rx_chunks = Counter( - "camdetect_rx_chunks", +counter_receive_chunks = Counter( + "camtiler_receive_chunks", "HTTP chunks received") counter_errors = Counter( - "camdetect_errors", + "camtiler_errors", "Upstream connection errors", - ["exception"]) -counter_movement_frames = Counter( - "camdetect_movement_frames", - "Frames with movement detected in them") -counter_uploaded_frames = Counter( - "camdetect_uploaded_frames", - "Frames uploaded via S3") -counter_upload_errors = Counter( - "camdetect_upload_errors", - "Frames upload errors related to S3") -counter_upload_dropped_frames = Counter( - "camdetect_upload_dropped_frames", - "Frames that were dropped due to S3 upload queue being full") -counter_download_dropped_frames = Counter( - "camdetect_download_dropped_frames", - "Frames that were downloaded from camera, but not processed") - + ["stage", "exception"]) gauge_last_frame = Gauge( - "camdetect_last_frame", - "Timestamp of last frame") -gauge_frame_motion_detected = Gauge( - "camdetect_frame_motion_detected", - "Motion detected in frame") -gauge_event_active = Gauge( - "camdetect_event_active", - "Motion event in progress") -gauge_total_blocks = Gauge( - "camdetect_total_blocks", - "Total DCT blocks") -gauge_active_blocks = Gauge( - "camdetect_active_blocks", - "Total active, threshold exceeding DCT blocks") -gauge_upload_queue_size = Gauge( - "camdetect_upload_queue_size", - "Number of frames awaiting to be uploaded via S3") -gauge_download_queue_size = Gauge( - "camdetect_download_queue_size", - "Number of frames awaiting to be processed by motion detection loop") + "camtiler_last_frame_timestamp_seconds", + "Timestamp of last frame", + ["stage"]) +gauge_queue_frames = Gauge( + "camtiler_queue_frames", + "Numer of frames in a queue", + ["stage"]) +gauge_build_info = Gauge( + "camtiler_build_info", + "Build info", + ["git_hash"]) + +gauge_build_info.labels(os.getenv("GIT_COMMIT", "null")).set(1) # Reset some gauges -gauge_frame_motion_detected.set(0) -gauge_upload_queue_size.set(0) -gauge_download_queue_size.set(0) +gauge_queue_frames.labels("download").set(0) +gauge_queue_frames.labels("hold").set(0) +gauge_queue_frames.labels("upload").set(0) assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough async def upload(bucket, blob: bytes, thumb: bytes, event_id): """ - Upload single JPEG blob to S3 bucket + Upload single frame to S3 bucket """ # Generate S3 path based on the JPEG blob SHA512 digest fp = hashlib.sha512(blob).hexdigest() path = "%s/%s/%s/%s.jpg" % (fp[:4], fp[4:8], fp[8:12], fp[12:]) - # First upload the thumbnail - await bucket.upload_fileobj(io.BytesIO(thumb), "thumb/%s" % path) - - # Proceed to upload the original JPEG frame - await bucket.upload_fileobj(io.BytesIO(blob), path) + try: + await bucket.upload_fileobj(io.BytesIO(thumb), "thumb/%s" % path) + await bucket.upload_fileobj(io.BytesIO(blob), path) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) + counter_errors.labels("upload", j).inc() # Add screenshot path to the event app.ctx.coll.update_one({ @@ -133,6 +139,9 @@ async def upload(bucket, blob: bytes, thumb: bytes, event_id): } }) + counter_frames.labels("stored").inc() + now = datetime.utcnow() + gauge_last_frame.labels("upload").set(now.timestamp()) # TODO: Handle 16MB maximum document size @@ -140,16 +149,19 @@ async def uploader(queue): """ Uploader task grabs JPEG blobs from upload queue and uploads them to S3 """ - session = aioboto3.Session( - aws_access_key_id=AWS_ACCESS_KEY_ID, - aws_secret_access_key=AWS_SECRET_ACCESS_KEY) - async with session.resource("s3", endpoint_url=S3_ENDPOINT_URL) as s3: + session = aioboto3.Session() + async with session.resource("s3", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + endpoint_url=S3_ENDPOINT_URL) as s3: bucket = await s3.Bucket(S3_BUCKET_NAME) while True: - blob, thumb, event_id = await queue.get() + dt, blob, thumb, event_id = await queue.get() + gauge_queue_frames.labels("upload").set(queue.qsize()) await upload(bucket, blob, thumb, event_id) - counter_uploaded_frames.inc() - gauge_upload_queue_size.set(queue.qsize()) + counter_frames.labels("uploaded").inc() + hist_upload_latency.observe( + (datetime.utcnow() - dt).total_seconds()) class ReferenceFrame(): @@ -201,6 +213,7 @@ async def motion_detector(reference_frame, download_queue, upload_queue): while True: dt, blob, dct, thumb = await download_queue.get() + gauge_queue_frames.labels("download").set(download_queue.qsize()) app.ctx.last_frame, app.ctx.dct = blob, dct # Signal /bypass and /debug handlers about new frame @@ -210,10 +223,6 @@ async def motion_detector(reference_frame, download_queue, upload_queue): # Separate most significant luma value for each DCT (8x8 pixel) block y = np.int16(dct[0][:, :, 0]) - # Update metrics - gauge_total_blocks.set(y.shape[0] * y.shape[1]) - gauge_last_frame.set(dt.timestamp()) - reference_frame.put(y) try: app.ctx.mask = cv2.inRange(cv2.absdiff(y, @@ -229,15 +238,16 @@ async def motion_detector(reference_frame, download_queue, upload_queue): total_blocks = app.ctx.mask.shape[0] * app.ctx.mask.shape[1] threshold_blocks = THRESHOLD_RATIO * total_blocks / 100 average_blocks = sum(differing_blocks) / len(differing_blocks) + hist_active_blocks_ratio.labels("main").observe(active_blocks / total_blocks) motion_detected = average_blocks > threshold_blocks - # Update metrics - gauge_active_blocks.set(active_blocks) - gauge_total_blocks.set(total_blocks) + now = datetime.utcnow() + gauge_last_frame.labels("processed").set(now.timestamp()) + hist_processing_latency.observe((now - dt).total_seconds()) # Propagate SIGUSR1 signal handler if app.ctx.manual_trigger: - print("Manually triggering event via SIGUSR1") + logger.info("Manually triggering event via SIGUSR1") motion_detected = True app.ctx.manual_trigger = False @@ -254,16 +264,16 @@ async def motion_detector(reference_frame, download_queue, upload_queue): "action": "event", }) app.ctx.event_id = event_id = result.inserted_id - gauge_event_active.set(1) # Handle buffering frames prior event start if hold_queue.full(): await hold_queue.get() hold_queue.put_nowait((blob, thumb)) + gauge_queue_frames.labels("hold").set(hold_queue.qsize()) # Handle image upload if motion_detected and event_id: - counter_movement_frames.inc() + counter_frames.labels("motion").inc() while True: if not uploads_skipped: uploads_skipped = UPLOAD_FRAMESKIP @@ -279,10 +289,10 @@ async def motion_detector(reference_frame, download_queue, upload_queue): try: # Push JPEG blob into upload queue - upload_queue.put_nowait((blob, thumb, event_id)) + upload_queue.put_nowait((dt, blob, thumb, event_id)) except asyncio.QueueFull: - counter_upload_dropped_frames.inc() - gauge_upload_queue_size.set(upload_queue.qsize()) + counter_dropped_frames.labels("upload").inc() + gauge_queue_frames.labels("upload").set(upload_queue.qsize()) # Handle event end if not motion_detected and event_id: @@ -294,7 +304,6 @@ async def motion_detector(reference_frame, download_queue, upload_queue): } }) app.ctx.event_id = event_id = None - gauge_event_active.set(0) def generate_thumbnail(dct): @@ -339,11 +348,11 @@ async def download(resp, queue): DCT coefficients of the frames """ buf = b"" - print("Upstream connection opened with status:", resp.status) + logger.info("Upstream connection opened with status: %d", resp.status) async for data, end_of_http_chunk in resp.content.iter_chunks(): - counter_rx_bytes.inc(len(data)) + counter_receive_bytes.inc(len(data)) if end_of_http_chunk: - counter_rx_chunks.inc() + counter_receive_chunks.inc() if buf: # seek end marker = data.find(b"\xff\xd9") @@ -357,43 +366,46 @@ async def download(resp, queue): # Parse DCT coefficients dct = loads(blob) + now = datetime.utcnow() + gauge_last_frame.labels("download").set(now.timestamp()) try: # Convert Y component to 16 bit for easier handling queue.put_nowait(( - datetime.utcnow(), + now, blob, dct, generate_thumbnail(dct))) except asyncio.QueueFull: - counter_download_dropped_frames.inc() + counter_dropped_frames.labels("download").inc() + gauge_queue_frames.labels("download").set(queue.qsize()) data = data[marker+2:] buf = b"" - counter_rx_frames.inc() + counter_receive_frames.inc() # seek begin marker = data.find(b"\xff\xd8") if marker >= 0: buf = data[marker:] else: - counter_dropped_bytes.inc(len(data)) + counter_discarded_bytes.inc(len(data)) async def downloader(queue: asyncio.Queue): """ Downloader task connects to MJPEG source and - pushes the JPEG frames to a queue + pushes the JPEG frames to download queue """ while True: to = aiohttp.ClientTimeout(connect=5, sock_read=2) async with aiohttp.ClientSession(timeout=to) as session: - print("Opening upstream connection to %s" % url) + logger.info("Opening connection to %s", url) try: async with session.get(url) as resp: await download(resp, queue) except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) - print("Caught exception %s" % j) - counter_errors.labels(exception=j).inc() + logger.info("Caught exception %s", j) + counter_errors.labels("download", j).inc() await asyncio.sleep(1) app = Sanic("camdetect") @@ -414,8 +426,8 @@ async def bypass_stream_wrapper(request): ts = time() data = CHUNK_BOUNDARY + app.ctx.last_frame await response.write(data) - counter_tx_bytes.inc(len(data)) - counter_tx_frames.inc() + counter_transmit_bytes.inc(len(data)) + counter_transmit_frames.inc() return response.stream( stream_camera, content_type="multipart/x-mixed-replace; boundary=frame") @@ -446,8 +458,8 @@ async def stream_wrapper(request): _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) data = CHUNK_BOUNDARY + jpeg.tobytes() await response.write(data) - counter_tx_bytes.inc(len(data)) - counter_tx_frames.inc() + counter_transmit_bytes.inc(len(data)) + counter_transmit_frames.inc() # Transmit as chunked MJPEG stream return response.stream( @@ -477,7 +489,7 @@ async def wrapper_stream_event(request): continue s = "data: " + json.dumps(app.ctx.mask.tolist()) + "\r\n\r\n" await response.write(s.encode()) - counter_tx_events.inc() + counter_emitted_events.inc() return stream(stream_event, content_type="text/event-stream") diff --git a/docker-compose.yml b/docker-compose.yml index a723f0c..4635d5d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,16 +3,15 @@ version: '3.7' # All keys here are for dev instance only, do not put prod keys here x-common: &common AWS_ACCESS_KEY_ID: camdetect + MINIO_ROOT_USER: camdetect AWS_SECRET_ACCESS_KEY: 2mSI6HdbJ8 + MINIO_ROOT_PASSWORD: 2mSI6HdbJ8 ME_CONFIG_MONGODB_ENABLE_ADMIN: 'true' ME_CONFIG_MONGODB_SERVER: '127.0.0.1' ME_CONFIG_MONGODB_AUTH_DATABASE: admin - MINIO_ACCESS_KEY: camdetect - MINIO_SECRET_KEY: 2mSI6HdbJ8 MINIO_DEFAULT_BUCKETS: camdetect MINIO_URI: 'http://camdetect:2mSI6HdbJ8@127.0.0.1:9000/camdetect' S3_ENDPOINT_URL: http://127.0.0.1:9000 - MINIO_DEFAULT_BUCKETS: camdetect MINIO_CONSOLE_PORT_NUMBER: 9001 MJPEGSTREAMER_CREDENTIALS: user:123456 SOURCE_NAME: dummy