Initial commit

This commit is contained in:
Lauri Võsandi 2022-02-12 20:44:20 +02:00 committed by Lauri Võsandi
commit b3486bafc2
5 changed files with 338 additions and 0 deletions

16
.drone.yml Normal file
View File

@ -0,0 +1,16 @@
---
kind: pipeline
type: kubernetes
name: default
steps:
- name: docker
image: plugins/docker
settings:
repo: harbor.k-space.ee/${DRONE_REPO}
registry: harbor.k-space.ee
mtu: 1300
username:
from_secret: docker_username
password:
from_secret: docker_password

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.env

22
Dockerfile Normal file
View File

@ -0,0 +1,22 @@
FROM ubuntu
WORKDIR /app
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
gstreamer1.0-libav \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-ugly \
gstreamer1.0-tools \
libjpeg-dev \
python3-gevent \
python3-numpy \
python3-opencv \
python3-flask \
python3-pip \
&& apt-get clean
RUN pip3 install boto3 prometheus_client pymongo==3.12.2 aiohttp jpeg2dct sanic==21.6.2 sanic_prometheus motor
COPY camdetect.py /app
ENTRYPOINT /app/camdetect.py
EXPOSE 8000
ENV PYTHONUNBUFFERED=1

241
camdetect.py Executable file
View File

@ -0,0 +1,241 @@
#!/usr/bin/env python3
import aiohttp
import asyncio
import cv2
import io
import numpy as np
import os
import json
import socket
import sys
from datetime import datetime
from jpeg2dct.numpy import load, loads
from prometheus_client import Counter, Gauge
from sanic import Sanic, response
from sanic.response import stream
from sanic_prometheus import monitor
_, url = sys.argv
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default")
FQDN = socket.getfqdn()
SLIDE_WINDOW = 2
DCT_BLOCK_SIZE = 8
# How many blocks have changes to consider movement in frame
THRESHOLD_BLOCKS = 20
THRESHOLD_MOTION_START = 2
counter_dropped_bytes = Counter(
"camtiler_client_dropped_bytes",
"Bytes that were not not handled or part of actual JPEG frames")
counter_rx_bytes = Counter(
"camtiler_client_rx_bytes",
"Bytes received over HTTP stream")
counter_tx_bytes = Counter(
"camtiler_client_tx_bytes",
"Bytes transmitted over HTTP streams")
counter_rx_frames = Counter(
"camtiler_client_rx_frames",
"Frames received")
counter_tx_frames = Counter(
"camtiler_client_tx_frames",
"Frames transmitted")
counter_tx_events = Counter(
"camtiler_client_tx_events",
"Events emitted")
counter_eos = Counter(
"camtiler_client_eos",
"Count of End of Stream occurrences")
counter_timeout_errors = Counter(
"camtiler_client_timeout_errors",
"Upstream connection timeout errors")
counter_cancelled_errors = Counter(
"camtiler_client_cancelled_errors",
"Upstream connection cancelled errors")
counter_incomplete_read_errors = Counter(
"camtiler_client_incomplete_read_errors",
"Upstream incomplete read errors")
counter_movement_frames = Counter(
"camtiler_client_movement_frames",
"Frames with movement detected in them")
gauge_total_blocks = Gauge(
"camtiler_client_total_blocks",
"Total DCT blocks")
gauge_active_blocks = Gauge(
"camtiler_client_active_blocks",
"Total active, threshold exceeding DCT blocks")
class Frame(object):
def __init__(self, blob):
self.blob = blob
self.y, self.cb, self.cr = loads(blob)
self.mask = np.int16(self.y[:,:,0])
async def client_connect(resp):
buf = b""
print("Upstream connection opened with status:", resp.status)
async for data, end_of_http_chunk in resp.content.iter_chunks():
counter_rx_bytes.inc(len(data))
if end_of_http_chunk:
counter_eos.inc()
break
if buf:
# seek end
marker = data.find(b'\xff\xd9')
if marker < 0:
buf += data
continue
else:
app.ctx.last_frame = Frame(buf + data[:marker+2])
reference = app.ctx.last_frame.mask
app.ctx.frames.append(reference)
if app.ctx.avg is None:
app.ctx.avg = np.copy(reference)
else:
app.ctx.avg += reference
if len(app.ctx.frames) > 2 ** SLIDE_WINDOW:
app.ctx.avg -= app.ctx.frames[0]
app.ctx.frames = app.ctx.frames[1:]
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW:
app.ctx.thresh = cv2.inRange(cv2.absdiff(app.ctx.last_frame.mask, app.ctx.avg >> SLIDE_WINDOW), 25, 65535)
else:
app.ctx.thresh = None
gauge_total_blocks.set(app.ctx.last_frame.mask.shape[0] * app.ctx.last_frame.mask.shape[1])
movement_detected = False
if app.ctx.thresh is not None:
differing_blocks = np.count_nonzero(app.ctx.thresh)
gauge_active_blocks.set(differing_blocks)
if differing_blocks > THRESHOLD_BLOCKS:
counter_movement_frames.inc()
movement_detected = True
if movement_detected:
if app.ctx.motion_frames < 30:
app.ctx.motion_frames += 1
else:
if app.ctx.motion_frames > 0:
app.ctx.motion_frames -= 1
if app.ctx.motion_frames > 20:
if not app.ctx.motion_start:
app.ctx.motion_start = datetime.utcnow()
print("Movement start")
elif app.ctx.motion_frames < 5:
app.ctx.motion_start = None
print("Movement end")
app.ctx.event_frame.set()
app.ctx.event_frame.clear()
data = data[marker+2:]
buf = b""
counter_rx_frames.inc()
# seek begin
marker = data.find(b'\xff\xd8')
if marker >= 0:
buf = data[marker:]
else:
counter_dropped_bytes.inc(len(data))
async def client():
print("Opening upstream connection...")
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
try:
await client_connect(resp)
except asyncio.TimeoutError:
counter_timeout_errors.inc()
except asyncio.CancelledError:
counter_cancelled_errors.inc()
except asyncio.IncompleteReadError:
counter_incomplete_read_errors.inc()
app = Sanic("lease")
app.config["WTF_CSRF_ENABLED"] = False
STREAM_RESPONSE = \
b"""
--frame
Content-Type: image/jpeg
"""
@app.route("/bypass")
async def bypass_stream_wrapper(request):
async def stream_camera(response):
while True:
await app.ctx.event_frame.wait()
data = STREAM_RESPONSE + app.ctx.last_frame.blob
await response.write(data)
counter_tx_bytes.inc(len(data))
counter_tx_frames.inc()
return response.stream(
stream_camera,
content_type='multipart/x-mixed-replace; boundary=frame'
)
@app.route("/debug")
async def stream_wrapper(request):
async def stream_camera(response):
while True:
await app.ctx.event_frame.wait()
img = cv2.imdecode(np.frombuffer(app.ctx.last_frame.blob, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
if len(app.ctx.frames) == 2 ** SLIDE_WINDOW:
for y in range(0, len(app.ctx.last_frame.mask)):
for x in range(0, len(app.ctx.last_frame.mask[0])):
if app.ctx.thresh[y][x] > 0:
img[y*DCT_BLOCK_SIZE:(y+1)*DCT_BLOCK_SIZE,x*DCT_BLOCK_SIZE:(x+1)*DCT_BLOCK_SIZE,2] = 255
_, jpeg = cv2.imencode(".jpg", img, (cv2.IMWRITE_JPEG_QUALITY, 80))
data = STREAM_RESPONSE + jpeg.tobytes()
await response.write(data)
counter_tx_bytes.inc(len(data))
counter_tx_frames.inc()
return response.stream(
stream_camera,
content_type='multipart/x-mixed-replace; boundary=frame'
)
@app.route('/event')
async def wrapper_stream_event(request):
async def stream_event(response):
while True:
await app.ctx.event_frame.wait()
if len(app.ctx.frames) < 2 ** SLIDE_WINDOW:
continue
s = 'data: ' + json.dumps(app.ctx.thresh.tolist()) + '\r\n\r\n'
await response.write(s.encode())
counter_tx_events.inc()
return stream(stream_event, content_type='text/event-stream')
@app.listener('before_server_start')
async def setup_db(app, loop):
app.ctx.last_frame = None
app.ctx.event_frame = asyncio.Event()
app.ctx.frames = []
app.ctx.avg = None
app.ctx.motion_frames = 0
app.ctx.motion_start = None
app.ctx.motion_end = None
task = asyncio.create_task(client())
monitor(app).expose_endpoint()
try:
app.run(port=5000)
except KeyboardInterrupt:
asyncio.get_event_loop().close()

58
docker-compose.yml Normal file
View File

@ -0,0 +1,58 @@
version: '3.7'
# All keys here are for dev instance only, do not put prod keys here
# To override and use inventory from prod use .env file
services:
camdetect:
restart: always
network_mode: host
build:
context: .
entrypoint: /app/camdetect.py
command: http://user:123456@127.0.0.1:8080?action=stream
environment:
- MJPEGSTREAMER_CREDENTIALS=user:123456
env_file: .env
mongoexpress:
restart: always
image: mongo-express
network_mode: host
environment:
- ME_CONFIG_MONGODB_ENABLE_ADMIN=true
- ME_CONFIG_MONGODB_SERVER=127.0.0.1
- ME_CONFIG_MONGODB_AUTH_DATABASE=admin
mongo:
network_mode: host
image: mongo:latest
volumes:
- ./mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro
command: mongod --replSet rs0 --bind_ip 127.0.0.1
prometheus:
network_mode: host
image: prom/prometheus:latest
command:
- --config.file=/config/prometheus.yml
volumes:
- ./config:/config:ro
minio:
restart: always
network_mode: host
image: bitnami/minio:latest
environment:
- MINIO_ACCESS_KEY=kspace-mugshot
- MINIO_SECRET_KEY=2mSI6HdbJ8
- MINIO_DEFAULT_BUCKETS=kspace-mugshot:download
- MINIO_CONSOLE_PORT_NUMBER=9001
mjpg-streamer:
network_mode: host
restart: always
image: kvaps/mjpg-streamer
devices:
- /dev/video0
command: -i "/usr/lib64/input_uvc.so -y -d /dev/video0 -r 1280x720 -f 30" -o "output_http.so -c user:123456"