Preliminary implementation

This commit is contained in:
Lauri Võsandi 2019-05-12 00:33:25 +03:00
commit 9ef231d582
4 changed files with 232 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
stream*
extract*.mp4

12
README.md Normal file
View File

@ -0,0 +1,12 @@
To stream from h264 hardware encoder accelerated USB webcam:
gst-launch-1.0 -v v4l2src device=/dev/video2 \
! capsfilter caps="video/x-h264, width=1280, height=720, framerate=20/1" \
! rtph264pay config-interval=10 pt=96 \
! udpsink host=1.2.3.4 port=5000
Run server.py on 1.2.3.4
Files are written to currect directory
For video feed open: http://localhost:6001/?frameskip=5&subsampling=4&quality=50&thumbs=0

19
extract.py Normal file
View File

@ -0,0 +1,19 @@
from datetime import datetime
import os
import sys
_, path = sys.argv
TIME_FORMAT = "%Y-%m-%d-%H-%M-%S"
clip_start = datetime.strptime(path[7:26], TIME_FORMAT)
event_no = 1
for line in open(path):
frame, raw_start, raw_end = line.strip().split(";")
event_start, event_end = datetime.strptime(raw_start, TIME_FORMAT), datetime.strptime(raw_end, TIME_FORMAT)
cmd = "ffmpeg -ss %s -i %s -t %s -vcodec copy extracted-%s-%d.mp4" % (
event_start-clip_start, path.replace(".events", ".mp4"), event_end-event_start,
clip_start.strftime(TIME_FORMAT), event_no)
print("executing:", cmd)
os.system(cmd)
event_no += 1

199
server.py Normal file
View File

@ -0,0 +1,199 @@
import cv2
import numpy as np
from flask import Flask, Response
from gevent import pywsgi
from threading import Thread, Event
from time import sleep
from flask import request
from datetime import datetime
from collections import deque
TIME_FORMAT = "%Y-%m-%d-%H-%M-%S"
MOTION_GREEN_THRESHOLD = 30
MOTION_AREA_THRESHOLD = 30
MOTION_FRAMES_THRESHOLD = 20
# 2 to power of SLIDE_WINDOW is the count of frames kept in memory for bg detection
SLIDE_WINDOW = 5
# For thumbnail use only every n-th pixel
THUMBNAIL_SUBSAMPLING = 4
# For motion detection sample only every n-th pixel
MOTION_SUBSAMPLING = 8
# Set to True to automatically spawn Gstreamer process per camera and redirect packets to it
AUTOMUX = False
PIPELINE = """udpsrc port=%(port)d caps = "application/x-rtp, encoding-name=(string)H264" \
! rtph264depay \
! tee name=t \
! decodebin \
! videoconvert \
! appsink \
t. ! h264parse \
! mp4mux streamable=true faststart=true fragment-duration=1000 dts-method=1 \
! filesink async=0 location=%(filename)s.mp4
""".replace("\n", "")
def detect_stream(port, addr):
now = 123
app = Flask(__name__)
events = deque(maxlen=THUMBNAIL_SUBSAMPLING)
frame = None
event = Event()
def generator(subsampling, frameskip, motion, quality, thumbs):
while True:
for j in range(0, frameskip):
event.wait()
yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n'
if motion:
for i in range(0, 2):
for j in range(0, 2):
frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,0] = 0
frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,1] = 0
frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,2] = thresh
if thumbs:
stacked = np.vstack([frame, np.hstack(events)])
else:
stacked = frame
ret, jpeg = cv2.imencode('.jpg', stacked[::subsampling,::subsampling],
(cv2.IMWRITE_JPEG_QUALITY, quality))
yield jpeg.tostring()
yield b'\r\n\r\n'
@app.route('/')
def video_combined():
return Response(generator(
request.args.get('subsampling', default = 1, type = int),
request.args.get('frameskip', default = 0, type = int),
request.args.get('motion', default = 0, type = int),
request.args.get('quality', default = 50, type = int),
request.args.get('thumbs', default = 1, type = int),
), mimetype='multipart/x-mixed-replace; boundary=frame')
print("Server listening on TCP port", port)
class WebThread(Thread):
def run(self):
print("Web server running for port", port)
app.run(threaded=True, port=port+1000)
thread = WebThread()
thread.start()
fh = None
cap = None
frames = []
avg = None
motion_detected = False
motion_start = None
motion_seen = 0
while True:
if not cap:
now = datetime.now()
timestamp = now.strftime(TIME_FORMAT)
filename = "stream-%(timestamp)s-%(addr)s" % locals()
if fh:
fh.close()
fh = open("%s.events" % filename, "w")
pipeline = PIPELINE % locals()
print("gst-launch-1.0 -v", pipeline.replace("appsink", "autovideosink"))
frame_count = 0
cap = cv2.VideoCapture(pipeline)
ret, frame = cap.read()
if ret == False:
cap = None
sleep(3)
continue
frame_count += 1
thumbnail = frame[::THUMBNAIL_SUBSAMPLING,::THUMBNAIL_SUBSAMPLING].copy()
if not events:
for j in range(0, THUMBNAIL_SUBSAMPLING):
events.append(np.zeros(thumbnail.shape))
reference = np.uint16(frame[::MOTION_SUBSAMPLING,::MOTION_SUBSAMPLING,1])
frames.append(reference)
if avg is None:
avg = np.copy(reference)
else:
avg += reference
if len(frames) <= 2 ** SLIDE_WINDOW:
continue
avg -= frames[0]
frames = frames[1:]
delta = cv2.absdiff(reference, avg >> SLIDE_WINDOW)
thresh = cv2.inRange(delta, MOTION_GREEN_THRESHOLD, 255)
im2, contours, hierarchy = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
motion_detected = False
for cnt in contours:
x, y, w, h = [j*MOTION_SUBSAMPLING for j in cv2.boundingRect(cnt)]
if w < MOTION_AREA_THRESHOLD or h < MOTION_AREA_THRESHOLD:
continue
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 4);
motion_detected = True
if motion_detected:
if motion_seen < MOTION_FRAMES_THRESHOLD:
motion_seen += 1
elif not motion_start:
events.append(thumbnail)
motion_start = datetime.now()
print("Event started:", motion_start)
else:
if motion_seen > 0:
motion_seen -= 1
if motion_seen == 0 and motion_start:
motion_end = datetime.now()
print("Got event:", motion_start, "to", motion_end)
fh.write("%d;%s;%s\n" % (frame_count, motion_start.strftime(TIME_FORMAT), motion_end.strftime(TIME_FORMAT)))
motion_start = None
event.set()
event.clear()
if AUTOMUX:
import os
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.settimeout(5)
sock.bind(("", 5000))
next_port = 5001
mapping = dict()
print("Flushing iptables")
os.system("iptables -F PREROUTING -t nat")
os.system("iptables -F OUTPUT -t nat")
os.system("sysctl -w net.ipv4.conf.all.route_localnet=1")
print("Listening on UDP port 5000")
while True:
try:
buf, (addr, port) = sock.recvfrom(20)
except OSError: # timed out
continue
if addr not in mapping:
mapping[addr] = next_port
print("Redirecting", addr, "to", next_port)
os.system("iptables -I PREROUTING -t nat -p udp --dport 5000 -s %(addr)s -j REDIRECT --to-port %(next_port)d" % locals())
os.system("iptables -t nat -I OUTPUT -o lo -p udp --dport 5000 -j REDIRECT --to %(next_port)d" % locals())
if not os.fork():
print("Spawning process for", addr, next_port)
detect_stream(next_port, addr)
break
next_port += 1
else:
detect_stream(5001, "127.0.0.1")