Compare commits

...

35 Commits

Author SHA1 Message Date
Erki Aas fd3557c5da Do not create timestamp index twice
continuous-integration/drone Build is passing Details
2024-02-05 16:04:49 +02:00
Madis Mägi e7da187e18 Add TTL index
continuous-integration/drone Build is passing Details
2023-09-22 16:48:41 +03:00
Lauri Võsandi f6d14671f5 Reduce directory nesting
continuous-integration/drone Build is failing Details
2023-09-15 01:24:29 +03:00
Lauri Võsandi eb9961c77f Bump pixel difference thresholds
continuous-integration/drone/push Build is passing Details
continuous-integration/drone Build is passing Details
2022-12-13 23:18:04 +02:00
Lauri Võsandi 3a5461b91e Bump event window
continuous-integration/drone/push Build is passing Details
2022-12-13 23:06:42 +02:00
Lauri Võsandi 02d38d12ef Add timestamp fuzzy matching and rename field to @timestamp
continuous-integration/drone Build is passing Details
2022-12-12 19:56:56 +02:00
Lauri Võsandi 6bdd9e0fbf Add some logging for healthchecks
continuous-integration/drone Build is passing Details
2022-12-02 23:03:58 +02:00
Lauri Võsandi 4d2497fd0d Set app.ctx.mask during startup
continuous-integration/drone Build is passing Details
2022-12-02 22:20:58 +02:00
Lauri Võsandi 15e0ee5bc5 Separate readiness check from health check
continuous-integration/drone Build is passing Details
2022-12-02 19:13:08 +02:00
Lauri Võsandi c23d8856c2 Fix downloaded frame counter
continuous-integration/drone Build is passing Details
2022-12-02 00:15:56 +02:00
Lauri Võsandi 5a1ea54e49 Add downloaded frame counter
continuous-integration/drone Build is passing Details
2022-09-21 09:03:40 +03:00
Lauri Võsandi ce95ffc8d3 Refactor credential passing
continuous-integration/drone Build is passing Details
2022-08-28 23:31:14 +03:00
Lauri Võsandi 135860f4c1 Remove unnecessary KeyboardInterrupt handler
continuous-integration/drone Build is passing Details
2022-08-25 00:24:59 +03:00
Lauri Võsandi a275fe9522 Reduce queue sizes
continuous-integration/drone Build was killed Details
2022-08-24 23:02:36 +03:00
Lauri Võsandi e54ad1ddb7 Reorder readiness check 2022-08-24 23:02:25 +03:00
Lauri Võsandi eebd2f3d96 Fix flake8 errors
continuous-integration/drone Build is passing Details
2022-08-03 08:02:53 +03:00
Lauri Võsandi 05b5bc30f2 Make sure count of frames in motion is reset during start
continuous-integration/drone Build is passing Details
2022-07-30 10:10:21 +03:00
Lauri Võsandi 5fb3eb4541 Set download and upload queue size limits
continuous-integration/drone Build is passing Details
2022-07-28 23:41:26 +03:00
Lauri Võsandi 4814030936 Improved corrupt frame handling
continuous-integration/drone Build was killed Details
2022-07-28 23:37:38 +03:00
Lauri Võsandi 3d1aa220dd Add readiness check for S3 2022-07-28 09:19:14 +03:00
Lauri Võsandi 3472e494f4 Implement event start with upsert
continuous-integration/drone Build is passing Details
2022-07-28 08:48:45 +03:00
Lauri Võsandi 46f5898dc4 Fix event counting 2022-07-27 21:58:18 +03:00
Lauri Võsandi 188de3a192 Add GIT_COMMIT_TIMESTAMP
continuous-integration/drone Build is passing Details
2022-07-11 17:00:23 +03:00
Lauri Võsandi 85c9adfe74 Add JSON logger
continuous-integration/drone Build is passing Details
2022-07-11 13:28:22 +03:00
Lauri Võsandi aa6598ee68 Cleanups 2022-07-11 12:34:13 +03:00
Lauri Võsandi 817f148617 Downgrade to Ubuntu 20.04
continuous-integration/drone Build is passing Details
2022-06-08 08:48:16 +03:00
Lauri Võsandi 7e03b8f23b Switch to Drone template
continuous-integration/drone Build is failing Details
2022-05-04 15:49:41 +03:00
Lauri Võsandi 53ee10b92a Minimize Docker image
continuous-integration/drone Build is failing Details
2022-03-09 21:30:45 +02:00
Lauri Võsandi a79b9f9ca2 Update README.md 2022-03-09 17:12:31 +02:00
Lauri Võsandi c7033ad9b8 Implement frameskip and add log viewer
continuous-integration/drone Build is passing Details
2022-03-06 08:25:54 +02:00
Lauri Võsandi 8fc9a3d065 Add MongoDB check for `/readyz`
continuous-integration/drone Build is passing Details
2022-02-27 17:02:55 +02:00
Lauri Võsandi 2ab39f879d Add `DEBUG` environment variable to toggle Sanic debugging
continuous-integration/drone Build is passing Details
2022-02-27 14:16:33 +02:00
Lauri Võsandi a7158d7580 Add desired frame interval argument for `/bypass` endpoint
continuous-integration/drone Build is passing Details
2022-02-27 13:35:03 +02:00
Lauri Võsandi 8921c6112d Add thumbnailing 2022-02-27 13:12:22 +02:00
Lauri Võsandi a1699fa380 Refactor
continuous-integration/drone Build is passing Details
* Implement multi queue pipelining
* Implement screenshot upload to S3
* Implement event insertion to Mongo
* Add SIGUSR1 handler to manually trigger screenshots
2022-02-25 23:07:03 +02:00
9 changed files with 570 additions and 200 deletions

View File

@ -1,16 +1,2 @@
--- kind: template
kind: pipeline load: docker.yaml
type: kubernetes
name: default
steps:
- name: docker
image: plugins/docker
settings:
repo: harbor.k-space.ee/${DRONE_REPO}
registry: harbor.k-space.ee
mtu: 1300
username:
from_secret: docker_username
password:
from_secret: docker_password

View File

@ -1,3 +0,0 @@
[flake8]
inline-quotes = "
indent-size = 4

4
.gitmodules vendored Normal file
View File

@ -0,0 +1,4 @@
[submodule "log-viewer"]
path = log-viewer
url = https://git.k-space.ee/k-space/log-viewer.git
branch = wip

View File

@ -1,21 +1,31 @@
FROM ubuntu FROM ubuntu:focal
WORKDIR /app WORKDIR /app
ENV DEBIAN_FRONTEND=noninteractive ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \ RUN apt-get update \
gstreamer1.0-libav \ && apt-get install --no-install-recommends -y \
gstreamer1.0-plugins-bad \ build-essential \
gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-ugly \
gstreamer1.0-tools \
libjpeg-dev \ libjpeg-dev \
libpython3-dev \
python3-gevent \ python3-gevent \
python3-numpy \ python3-numpy \
python3-opencv \ python3-opencv \
python3-flask \
python3-pip \ python3-pip \
git \
&& pip3 install \
aioboto3 \
aiohttp \
jpeg2dct \
motor \
prometheus_client \
sanic==21.6.2 \
sanic-json-logging \
git+https://github.com/Assarius/sanic-prometheus@Sanic_22 \
&& apt-get remove -y \
build-essential \
libjpeg-dev \
libpython3-dev \
&& apt-get autoremove -y \
&& apt-get clean && apt-get clean
RUN pip3 install boto3 prometheus_client pymongo==3.12.2 aiohttp jpeg2dct sanic==21.6.2 sanic_prometheus motor
COPY camdetect.py /app COPY camdetect.py /app
ENTRYPOINT /app/camdetect.py ENTRYPOINT /app/camdetect.py
EXPOSE 5000 EXPOSE 5000

View File

@ -8,8 +8,26 @@ In a nutshell:
- It brings the MJPEG stream into the cluster - It brings the MJPEG stream into the cluster
- Performs highly optimal JPEG DCT coefficient based motion detection - Performs highly optimal JPEG DCT coefficient based motion detection
without actually decoding the JPEG frame to a bitmap without actually decoding the JPEG frame to a bitmap
- WIP: Writes events to MongoDB - Writes events to MongoDB
- WIP: Uploads screenshots to S3 - Generates thumbnails based on JPEG DCT coefficents
- Uploads screenshots and corresponding thumbnails to S3
- Exposes endpoint for distributing MJPEG stream inside the cluster, - Exposes endpoint for distributing MJPEG stream inside the cluster,
eg by the `camera-tiler` eg for the `camera-tiler`
- Exposes endpoint for inspecting DCT blocks where motion has been detected - Exposes endpoint for inspecting DCT blocks where motion has been detected
# Developing
Bundled `docker-compose.yml` brings up:
* [camdetect bypass stream](http://localhost:5000/bypass)
* [camdetect debug](http://localhost:5000/debug)
* [Minio](http://localhost:9001/buckets/camdetect/browse)
* [Mongoexpress](http://localhost:8081/db/default/eventlog)
* [Prometheus](http://localhost:9090/graph)
* [mjpeg-streamer](http://user:123456@localhost:8080/?action=stream)
To manually trigger event:
```
docker kill -sUSR1 camera-motion-detect_camdetect_1
```

View File

@ -1,86 +1,390 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import aioboto3
import aiohttp import aiohttp
import asyncio import asyncio
import cv2 import cv2
import hashlib
import io
import json
import numpy as np import numpy as np
import os import os
import json import pymongo
import socket import signal
import sys import sys
from datetime import datetime import botocore.exceptions
from datetime import datetime, timedelta
from jpeg2dct.numpy import loads from jpeg2dct.numpy import loads
from prometheus_client import Counter, Gauge from math import inf
from motor.motor_asyncio import AsyncIOMotorClient
from prometheus_client import Counter, Gauge, Histogram
from pymongo import ReturnDocument
from sanic import Sanic, response from sanic import Sanic, response
from sanic.response import stream from sanic_json_logging import setup_json_logging
from sanic.log import logger
from sanic_prometheus import monitor from sanic_prometheus import monitor
from sanic.response import stream
from time import time from time import time
from urllib.parse import urlparse
_, url = sys.argv _, target = sys.argv
# Override basic auth password from env var
basic_auth_password = os.getenv("BASIC_AUTH_PASSWORD")
if basic_auth_password:
o = urlparse(target)
netloc = o.netloc
username = ""
if "@" in netloc:
username, netloc = o.netloc.split("@", 1)
if ":" in username:
username, _ = username.split(":")
target = o._replace(netloc="%s:%s@%s" % (username, basic_auth_password, netloc)).geturl()
AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
S3_ENDPOINT_URL = os.environ["S3_ENDPOINT_URL"]
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "camdetect")
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default")
FQDN = socket.getfqdn() MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "eventlog")
SLIDE_WINDOW = 2 SOURCE_NAME = os.environ["SOURCE_NAME"]
SLIDE_WINDOW = 4
DCT_BLOCK_SIZE = 8 DCT_BLOCK_SIZE = 8
UPLOAD_FRAMESKIP = 3
CLOCK_SKEW_TOLERANCE = timedelta(seconds=3)
# Percentage of blocks active to consider movement in whole frame
THRESHOLD_RATIO = int(os.getenv("THRESHOLD_RATIO", "5"))
# Days to keep events
TTL_DAYS = int(os.getenv("TTL_DAYS", "3"))
# How many blocks have changes to consider movement in frame
THRESHOLD_BLOCKS = 20
THRESHOLD_MOTION_START = 2
CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n" CHUNK_BOUNDARY = b"\n--frame\nContent-Type: image/jpeg\n\n"
counter_dropped_bytes = Counter( hist_active_blocks_ratio = Histogram(
"camdetect_dropped_bytes", "camtiler_active_blocks_ratio",
"Ratio of active DCT blocks",
["roi"],
buckets=(0.01, 0.02, 0.05, 0.1, 0.5, inf))
hist_processing_latency = Histogram(
"camtiler_frame_processing_latency_seconds",
"Frame processing latency",
buckets=(0.01, 0.05, 0.1, 0.5, 1, inf))
hist_upload_latency = Histogram(
"camtiler_frame_upload_latency_seconds",
"Frame processing latency",
buckets=(0.1, 0.5, 1, 5, 10, inf))
counter_events = Counter(
"camtiler_events",
"Count of successfully processed events")
counter_frames = Counter(
"camtiler_frames",
"Count of frames",
["stage"])
counter_dropped_frames = Counter(
"camtiler_dropped_frames",
"Frames that were dropped due to one of queues being full",
["stage"])
counter_discarded_bytes = Counter(
"camtiler_discarded_bytes",
"Bytes that were not not handled or part of actual JPEG frames") "Bytes that were not not handled or part of actual JPEG frames")
counter_rx_bytes = Counter( counter_receive_bytes = Counter(
"camdetect_rx_bytes", "counter_receive_bytes",
"Bytes received over HTTP stream") "Bytes received over HTTP stream")
counter_tx_bytes = Counter( counter_transmit_bytes = Counter(
"camdetect_tx_bytes", "camtiler_transmit_bytes",
"Bytes transmitted over HTTP streams") "Bytes transmitted over HTTP streams")
counter_rx_frames = Counter( counter_receive_frames = Counter(
"camdetect_rx_frames", "camtiler_receive_frames",
"Frames received") "Frames received from upstream")
counter_tx_frames = Counter( counter_transmit_frames = Counter(
"camdetect_tx_frames", "camtiler_transmit_frames",
"Frames transmitted") "Frames transmitted to downstream consumers")
counter_tx_events = Counter( counter_emitted_events = Counter(
"camdetect_tx_events", "camtiler_emitted_events",
"Events emitted") "Events emitted")
counter_rx_chunks = Counter( counter_receive_chunks = Counter(
"camdetect_rx_chunks", "camtiler_receive_chunks",
"HTTP chunks received") "HTTP chunks received")
counter_errors = Counter( counter_errors = Counter(
"camdetect_errors", "camtiler_errors",
"Upstream connection errors", "Upstream connection errors",
["exception"]) ["stage", "exception"])
counter_movement_frames = Counter(
"camdetect_movement_frames",
"Frames with movement detected in them")
gauge_last_frame = Gauge( gauge_last_frame = Gauge(
"camdetect_last_frame", "camtiler_last_frame_timestamp_seconds",
"Timestamp of last frame") "Timestamp of last frame",
gauge_total_blocks = Gauge( ["stage"])
"camdetect_total_blocks", gauge_queue_frames = Gauge(
"Total DCT blocks") "camtiler_queue_frames",
gauge_active_blocks = Gauge( "Numer of frames in a queue",
"camdetect_active_blocks", ["stage"])
"Total active, threshold exceeding DCT blocks") gauge_build_info = Gauge(
"docker_build_info",
"Build info",
["git_commit", "git_commit_timestamp"])
gauge_build_info.labels(
os.getenv("GIT_COMMIT", "null"),
os.getenv("GIT_COMMIT_TIMESTAMP", "null")).set(1)
# Reset some gauges
gauge_queue_frames.labels("download").set(0)
gauge_queue_frames.labels("hold").set(0)
gauge_queue_frames.labels("upload").set(0)
counter_frames.labels("motion").inc(0)
counter_frames.labels("downloaded").inc(0)
assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough
class Frame(object): async def upload(bucket, blob: bytes, thumb: bytes, event_id):
def __init__(self, blob): """
self.blob = blob Upload single frame to S3 bucket
self.y, self.cb, self.cr = loads(blob) """
self.mask = np.int16(self.y[:, :, 0])
# Generate S3 path based on the JPEG blob SHA512 digest
fp = hashlib.sha512(blob).hexdigest()
path = "%s/%s.jpg" % (fp[:4], fp[4:])
try:
await bucket.upload_fileobj(io.BytesIO(thumb), "thumb/%s" % path)
await bucket.upload_fileobj(io.BytesIO(blob), path)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
counter_errors.labels("upload", j).inc()
# Add screenshot path to the event
app.ctx.coll.update_one({
"_id": event_id
}, {
"$addToSet": {
"screenshots": path,
}
})
counter_frames.labels("stored").inc()
now = datetime.utcnow()
gauge_last_frame.labels("upload").set(now.timestamp())
# TODO: Handle 16MB maximum document size
async def client_connect(resp): async def uploader(queue):
"""
Uploader task grabs JPEG blobs from upload queue and uploads them to S3
"""
session = aioboto3.Session()
async with session.resource("s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=S3_ENDPOINT_URL) as s3:
bucket = await s3.Bucket(S3_BUCKET_NAME)
while True:
dt, blob, thumb, event_id = await queue.get()
gauge_queue_frames.labels("upload").set(queue.qsize())
await upload(bucket, blob, thumb, event_id)
counter_frames.labels("uploaded").inc()
hist_upload_latency.observe(
(datetime.utcnow() - dt).total_seconds())
class ReferenceFrame():
"""
ReferenceFrame keeps last 2 ^ size frames to infer the background scene
compared to which motion is detected
This is pretty much what background subtractor does in OpenCV,
only difference is that we want have better performance instead of
accuracy
"""
class NotEnoughFrames(Exception):
pass
def __init__(self, size=SLIDE_WINDOW):
self.y = []
self.cumulative = None
self.size = size
def put(self, y):
if self.cumulative is None:
self.cumulative = np.copy(y)
else:
self.cumulative += y
self.y.append(y)
if len(self.y) > 2 ** self.size:
self.cumulative -= self.y[0]
self.y = self.y[1:]
def get(self):
if len(self.y) == 2 ** self.size:
return self.cumulative >> SLIDE_WINDOW
else:
raise self.NotEnoughFrames()
async def motion_detector(reference_frame, download_queue, upload_queue):
"""
Motion detector grabs JPEG blobs and Y channel coefficients
from download queue, performs motion detection and pushes relevant
JPEG blobs to upload queue going to S3
"""
event_id = None
differing_blocks = []
uploads_skipped = 0
# Hold queue keeps frames that we have before motion event start timestamp
hold_queue = asyncio.Queue(2 ** SLIDE_WINDOW)
while True:
dt, blob, dct, thumb = await download_queue.get()
gauge_queue_frames.labels("download").set(download_queue.qsize())
app.ctx.last_frame, app.ctx.dct = blob, dct
# Signal /bypass and /debug handlers about new frame
app.ctx.event_frame.set()
app.ctx.event_frame.clear()
# Separate most significant luma value for each DCT (8x8 pixel) block
y = np.int16(dct[0][:, :, 0])
reference_frame.put(y)
try:
app.ctx.mask = cv2.inRange(cv2.absdiff(y,
reference_frame.get()), 50, 65535)
except ReferenceFrame.NotEnoughFrames:
app.ctx.mask = None
motion_detected = False
else:
# Implement dumb Kalman filter
active_blocks = np.count_nonzero(app.ctx.mask)
differing_blocks.append(active_blocks)
differing_blocks[:] = differing_blocks[-10:]
total_blocks = app.ctx.mask.shape[0] * app.ctx.mask.shape[1]
threshold_blocks = THRESHOLD_RATIO * total_blocks / 100
average_blocks = sum(differing_blocks) / len(differing_blocks)
hist_active_blocks_ratio.labels("main").observe(active_blocks / total_blocks)
motion_detected = average_blocks > threshold_blocks
now = datetime.utcnow()
gauge_last_frame.labels("processed").set(now.timestamp())
hist_processing_latency.observe((now - dt).total_seconds())
# Propagate SIGUSR1 signal handler
if app.ctx.manual_trigger:
logger.info("Manually triggering event via SIGUSR1")
motion_detected = True
app.ctx.manual_trigger = False
# Handle event start
if motion_detected and not event_id:
result = await app.ctx.coll.find_one_and_update({
"@timestamp": {
"$lte": dt + CLOCK_SKEW_TOLERANCE,
"$gte": dt - CLOCK_SKEW_TOLERANCE,
},
"source": SOURCE_NAME,
}, {
"$setOnInsert": {
"@timestamp": dt,
"source": SOURCE_NAME,
"event": "motion-detected",
"started": dt,
"finished": dt + timedelta(minutes=2),
"component": "camdetect",
"screenshots": [],
"action": "event",
}
}, upsert=True, return_document=ReturnDocument.AFTER)
app.ctx.event_id = event_id = result["_id"]
# Handle buffering frames prior event start
if hold_queue.full():
await hold_queue.get()
hold_queue.put_nowait((blob, thumb))
gauge_queue_frames.labels("hold").set(hold_queue.qsize())
# Handle image upload
if motion_detected and event_id:
counter_frames.labels("motion").inc()
while True:
if not uploads_skipped:
uploads_skipped = UPLOAD_FRAMESKIP
else:
uploads_skipped -= 1
continue
# Drain queue of frames prior event start
try:
blob, thumb = hold_queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
# Push JPEG blob into upload queue
upload_queue.put_nowait((dt, blob, thumb, event_id))
except asyncio.QueueFull:
counter_dropped_frames.labels("upload").inc()
gauge_queue_frames.labels("upload").set(upload_queue.qsize())
# Handle event end
if not motion_detected and event_id:
app.ctx.coll.update_one({
"_id": event_id
}, {
"$set": {
"finished": dt,
}
})
app.ctx.event_id = event_id = None
counter_events.inc()
def generate_thumbnail(dct):
"""
This is highly efficient and highly inaccurate function to generate
thumbnail based purely on JPEG coefficients
"""
y, cr, cb = dct
# Determine aspect ratio and minimum dimension for cropping
ar = y.shape[0] < y.shape[1]
dm = (y.shape[0] if ar else y.shape[1]) & 0xfffffff8
# Determine cropping slices to make it square
jl = ((y.shape[1] >> 1) - (dm >> 1) if ar else 0)
jt = (0 if ar else (y.shape[0] >> 1) - (dm >> 1))
jr = jl + dm
jb = jt + dm
# Do the actual crop
ty = y[jt:jb, jl:jr, 0]
tb = cb[jt >> 1:jb >> 1, jl >> 1:jr >> 1, 0]
tr = cr[jt >> 1:jb >> 1, jl >> 1:jr >> 1, 0]
# Upsample chroma, dummy convert first coeff and stack all channels
m = np.dstack((
np.array((ty >> 3) + 127, dtype=np.uint8),
np.array(
(tb.repeat(2, 1).repeat(2, 0) >> 3) + 127, dtype=np.uint8)[:dm],
np.array(
(tr.repeat(2, 1).repeat(2, 0) >> 3) + 127, dtype=np.uint8)[:dm]))
_, jpeg = cv2.imencode(".jpg",
cv2.cvtColor(m, cv2.COLOR_YCrCb2BGR),
(cv2.IMWRITE_JPEG_QUALITY, 80))
return jpeg
async def download(resp, queue):
"""
This coroutine iterates over HTTP connection chunks
assembling the original JPEG blobs and decodes the
DCT coefficients of the frames
"""
buf = b"" buf = b""
print("Upstream connection opened with status:", resp.status) logger.info("Upstream connection opened with status: %d", resp.status)
async for data, end_of_http_chunk in resp.content.iter_chunks(): async for data, end_of_http_chunk in resp.content.iter_chunks():
counter_rx_bytes.inc(len(data)) counter_receive_bytes.inc(len(data))
if end_of_http_chunk: if end_of_http_chunk:
counter_rx_chunks.inc() counter_receive_chunks.inc()
if buf: if buf:
# seek end # seek end
marker = data.find(b"\xff\xd9") marker = data.find(b"\xff\xd9")
@ -88,100 +392,82 @@ async def client_connect(resp):
buf += data buf += data
continue continue
else: else:
app.ctx.last_frame = Frame(buf + data[:marker+2]) # Assemble JPEG blob
gauge_last_frame.set(time()) blob = buf + data[:marker + 2]
reference = app.ctx.last_frame.mask # Parse DCT coefficients
app.ctx.frames.append(reference) try:
if app.ctx.avg is None: dct = loads(blob)
app.ctx.avg = np.copy(reference) except RuntimeError:
counter_frames.labels("corrupted").inc()
else: else:
app.ctx.avg += reference now = datetime.utcnow()
gauge_last_frame.labels("download").set(now.timestamp())
if len(app.ctx.frames) > 2 ** SLIDE_WINDOW: try:
app.ctx.avg -= app.ctx.frames[0] queue.put_nowait((
app.ctx.frames = app.ctx.frames[1:] now,
blob,
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: dct,
app.ctx.thresh = cv2.inRange(cv2.absdiff( generate_thumbnail(dct)))
app.ctx.last_frame.mask, except asyncio.QueueFull:
app.ctx.avg >> SLIDE_WINDOW), 25, 65535) counter_dropped_frames.labels("download").inc()
else: else:
app.ctx.thresh = None counter_frames.labels("downloaded").inc()
gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] * gauge_queue_frames.labels("download").set(queue.qsize())
app.ctx.last_frame.mask.shape[1]) data = data[marker + 2:]
movement_detected = False
if app.ctx.thresh is not None:
differing_blocks = np.count_nonzero(app.ctx.thresh)
gauge_active_blocks.set(differing_blocks)
if differing_blocks > THRESHOLD_BLOCKS:
counter_movement_frames.inc()
movement_detected = True
if movement_detected:
if app.ctx.motion_frames < 30:
app.ctx.motion_frames += 1
else:
if app.ctx.motion_frames > 0:
app.ctx.motion_frames -= 1
if app.ctx.motion_frames > 20:
if not app.ctx.motion_start:
app.ctx.motion_start = datetime.utcnow()
print("Movement start")
elif app.ctx.motion_frames < 5:
app.ctx.motion_start = None
print("Movement end")
app.ctx.event_frame.set()
app.ctx.event_frame.clear()
data = data[marker+2:]
buf = b"" buf = b""
counter_rx_frames.inc() counter_receive_frames.inc()
# seek begin # seek begin
marker = data.find(b"\xff\xd8") marker = data.rfind(b"\xff\xd8")
if marker >= 0: if marker >= 0:
buf = data[marker:] buf = data[marker:]
else: else:
counter_dropped_bytes.inc(len(data)) counter_discarded_bytes.inc(len(data))
async def client(): async def downloader(queue: asyncio.Queue):
"""
Downloader task connects to MJPEG source and
pushes the JPEG frames to download queue
"""
while True: while True:
to = aiohttp.ClientTimeout(connect=5, sock_read=2) to = aiohttp.ClientTimeout(connect=5, sock_read=2)
async with aiohttp.ClientSession(timeout=to) as session: async with aiohttp.ClientSession(timeout=to) as session:
print("Opening upstream connection to %s" % url) logger.info("Opening connection to %s", target)
try: try:
async with session.get(url) as resp: async with session.get(target) as resp:
await client_connect(resp) await download(resp, queue)
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
print("Caught exception %s" % j) logger.info("Caught exception %s", j)
counter_errors.labels(exception=j).inc() counter_errors.labels("download", j).inc()
await asyncio.sleep(1) await asyncio.sleep(1)
app = Sanic("camdetect")
app = Sanic("lease") setup_json_logging(app)
app.config["WTF_CSRF_ENABLED"] = False
@app.route("/bypass") @app.route("/bypass")
async def bypass_stream_wrapper(request): async def bypass_stream_wrapper(request):
async def stream_camera(response): # Desired frame interval, by default 500ms
while True: interval = float(request.args.get("interval", 500)) / 1000.0
await app.ctx.event_frame.wait()
data = CHUNK_BOUNDARY + app.ctx.last_frame.blob
await response.write(data)
counter_tx_bytes.inc(len(data))
counter_tx_frames.inc()
async def stream_camera(response):
ts = 0
while True:
while True:
await app.ctx.event_frame.wait()
if time() > ts + interval:
break
ts = time()
data = CHUNK_BOUNDARY + app.ctx.last_frame
await response.write(data)
counter_transmit_bytes.inc(len(data))
counter_transmit_frames.inc()
return response.stream( return response.stream(
stream_camera, stream_camera,
content_type="multipart/x-mixed-replace; boundary=frame" content_type="multipart/x-mixed-replace; boundary=frame")
)
@app.route("/debug") @app.route("/debug")
@ -189,21 +475,30 @@ async def stream_wrapper(request):
async def stream_camera(response): async def stream_camera(response):
while True: while True:
await app.ctx.event_frame.wait() await app.ctx.event_frame.wait()
arr = np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW:
for y in range(0, len(app.ctx.last_frame.mask)):
for x in range(0, len(app.ctx.last_frame.mask[0])):
if app.ctx.thresh[y][x] > 0:
img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE,
x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE, 2] = 255
# Parse JPEG blob
arr = np.frombuffer(app.ctx.last_frame, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
# Highlight green or red channel depending on whether
# motion event is in progress or not
channel = 2 if app.ctx.event_id else 1
if app.ctx.mask is not None:
for y in range(0, app.ctx.mask.shape[0]):
for x in range(0, app.ctx.mask.shape[1]):
if app.ctx.mask[y][x] > 0:
img[y * DCT_BLOCK_SIZE:(y + 1) * DCT_BLOCK_SIZE,
x * DCT_BLOCK_SIZE:(x + 1) * DCT_BLOCK_SIZE,
channel] = 255
# Compress modified frame as JPEG frame
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80)) _, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
data = CHUNK_BOUNDARY + jpeg.tobytes() data = CHUNK_BOUNDARY + jpeg.tobytes()
await response.write(data) await response.write(data)
counter_tx_bytes.inc(len(data)) counter_transmit_bytes.inc(len(data))
counter_tx_frames.inc() counter_transmit_frames.inc()
# Transmit as chunked MJPEG stream
return response.stream( return response.stream(
stream_camera, stream_camera,
content_type="multipart/x-mixed-replace; boundary=frame" content_type="multipart/x-mixed-replace; boundary=frame"
@ -212,9 +507,32 @@ async def stream_wrapper(request):
@app.route("/readyz") @app.route("/readyz")
async def ready_check(request): async def ready_check(request):
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW: logger.info("Testing if Mongo is accessible")
return response.text("OK") try:
return response.text("Not enough frames", status=503) async for i in app.ctx.coll.find().limit(1):
break
except pymongo.errors.ServerSelectionTimeoutError:
return response.text("MongoDB server selection timeout", status=503)
session = aioboto3.Session()
logger.info("Testing if S3 is writable")
async with session.resource("s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=S3_ENDPOINT_URL) as s3:
bucket = await s3.Bucket(S3_BUCKET_NAME)
await bucket.upload_fileobj(io.BytesIO(b"test"), "test")
return response.text("OK")
print("Checking if there are any frames received")
if app.ctx.mask is None:
return response.text("Not enough frames", status=503)
@app.route("/healthz")
async def health_check(request):
if app.ctx.mask is None:
return response.text("Not enough frames", status=503)
@app.route("/event") @app.route("/event")
@ -222,28 +540,50 @@ async def wrapper_stream_event(request):
async def stream_event(response): async def stream_event(response):
while True: while True:
await app.ctx.event_frame.wait() await app.ctx.event_frame.wait()
if len(app.ctx.frames) < 2 ** SLIDE_WINDOW: if app.ctx.mask is not None:
continue continue
s = "data: " + json.dumps(app.ctx.thresh.tolist()) + "\r\n\r\n" s = "data: " + json.dumps(app.ctx.mask.tolist()) + "\r\n\r\n"
await response.write(s.encode()) await response.write(s.encode())
counter_tx_events.inc() counter_emitted_events.inc()
return stream(stream_event, content_type="text/event-stream") return stream(stream_event, content_type="text/event-stream")
def handler(signum, frame):
# SIGUSR1 handler for manually triggering an event
app.ctx.manual_trigger = True
@app.listener("before_server_start") @app.listener("before_server_start")
async def setup_db(app, loop): async def setup_db(app, loop):
app.ctx.mask = None
app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database()
app.ctx.coll = app.ctx.db[MONGO_COLLECTION]
app.ctx.coll.create_index(
"@timestamp", expireAfterSeconds=TTL_DAYS * 60 * 60 * 24)
app.ctx.coll.create_index([
("source", pymongo.ASCENDING),
("@timestamp", pymongo.ASCENDING)], unique=True)
app.ctx.last_frame = None app.ctx.last_frame = None
app.ctx.event_frame = asyncio.Event() app.ctx.event_frame = asyncio.Event()
app.ctx.frames = [] app.ctx.event_id = None
app.ctx.avg = None app.ctx.manual_trigger = False
app.ctx.motion_frames = 0 signal.signal(signal.SIGUSR1, handler)
app.ctx.motion_start = None
app.ctx.motion_end = None # Set up processing pipeline
asyncio.create_task(client()) download_queue = asyncio.Queue(50)
upload_queue = asyncio.Queue(50)
asyncio.create_task(uploader(
upload_queue))
asyncio.create_task(downloader(
download_queue))
asyncio.create_task(motion_detector(
ReferenceFrame(),
download_queue,
upload_queue))
monitor(app).expose_endpoint() monitor(app).expose_endpoint()
try: app.run(host="0.0.0.0",
app.run(host="0.0.0.0", port=5000) port=5000,
except KeyboardInterrupt: debug=bool(os.getenv("DEBUG", 0)))
asyncio.get_event_loop().close()

View File

@ -1,7 +1,20 @@
version: '3.7' version: '3.7'
# All keys here are for dev instance only, do not put prod keys here # All keys here are for dev instance only, do not put prod keys here
# To override and use inventory from prod use .env file x-common: &common
AWS_ACCESS_KEY_ID: camdetect
MINIO_ROOT_USER: camdetect
AWS_SECRET_ACCESS_KEY: 2mSI6HdbJ8
MINIO_ROOT_PASSWORD: 2mSI6HdbJ8
ME_CONFIG_MONGODB_ENABLE_ADMIN: 'true'
ME_CONFIG_MONGODB_SERVER: '127.0.0.1'
ME_CONFIG_MONGODB_AUTH_DATABASE: admin
MINIO_DEFAULT_BUCKETS: camdetect
MINIO_URI: 'http://camdetect:2mSI6HdbJ8@127.0.0.1:9000/camdetect'
S3_ENDPOINT_URL: http://127.0.0.1:9000
MINIO_CONSOLE_PORT_NUMBER: 9001
MJPEGSTREAMER_CREDENTIALS: user:123456
SOURCE_NAME: dummy
services: services:
camdetect: camdetect:
@ -11,18 +24,13 @@ services:
context: . context: .
entrypoint: /app/camdetect.py entrypoint: /app/camdetect.py
command: http://user:123456@127.0.0.1:8080?action=stream command: http://user:123456@127.0.0.1:8080?action=stream
environment: environment: *common
- MJPEGSTREAMER_CREDENTIALS=user:123456
env_file: .env
mongoexpress: mongoexpress:
restart: always restart: always
image: mongo-express image: mongo-express
network_mode: host network_mode: host
environment: environment: *common
- ME_CONFIG_MONGODB_ENABLE_ADMIN=true
- ME_CONFIG_MONGODB_SERVER=127.0.0.1
- ME_CONFIG_MONGODB_AUTH_DATABASE=admin
logging: logging:
driver: none driver: none
@ -42,16 +50,16 @@ services:
- --config.file=/config/prometheus.yml - --config.file=/config/prometheus.yml
volumes: volumes:
- ./config:/config:ro - ./config:/config:ro
logging:
driver: none
minio: minio:
restart: always restart: always
network_mode: host network_mode: host
image: bitnami/minio:latest image: bitnami/minio:latest
environment: environment: *common
- MINIO_ACCESS_KEY=kspace-mugshot logging:
- MINIO_SECRET_KEY=2mSI6HdbJ8 driver: none
- MINIO_DEFAULT_BUCKETS=kspace-mugshot:download
- MINIO_CONSOLE_PORT_NUMBER=9001
mjpg-streamer: mjpg-streamer:
network_mode: host network_mode: host
@ -59,4 +67,17 @@ services:
image: kvaps/mjpg-streamer image: kvaps/mjpg-streamer
devices: devices:
- /dev/video0 - /dev/video0
command: -i "/usr/lib64/input_uvc.so -y -d /dev/video0 -r 1280x720 -f 30" -o "output_http.so -c user:123456" command: -i "/usr/lib64/input_uvc.so -y -d /dev/video0 -r 1280x720 -f 5" -o "output_http.so -c user:123456"
log-viewer-backend:
restart: always
network_mode: host
build:
context: ./log-viewer/backend
environment: *common
log-viewer-frontend:
restart: always
network_mode: host
build:
context: ./log-viewer/frontend

1
log-viewer Submodule

@ -0,0 +1 @@
Subproject commit b8e9f03b86ff63a5cdc77bae18caf7d7efd077b9

View File

@ -1,11 +1,4 @@
#!/bin/bash #!/bin/bash
mongo <<EOF mongo <<EOF
rs.initiate()
rs.initiate({
_id: 'rs0',
members: [
{_id: 0, host: '127.0.0.1:27017'}
]
})
EOF EOF