200 lines
6.5 KiB
Python
200 lines
6.5 KiB
Python
import cv2
|
|
import numpy as np
|
|
from flask import Flask, Response
|
|
from gevent import pywsgi
|
|
from threading import Thread, Event
|
|
from time import sleep
|
|
from flask import request
|
|
from datetime import datetime
|
|
from collections import deque
|
|
|
|
TIME_FORMAT = "%Y-%m-%d-%H-%M-%S"
|
|
MOTION_GREEN_THRESHOLD = 30
|
|
MOTION_AREA_THRESHOLD = 30
|
|
MOTION_FRAMES_THRESHOLD = 20
|
|
|
|
# 2 to power of SLIDE_WINDOW is the count of frames kept in memory for bg detection
|
|
SLIDE_WINDOW = 5
|
|
|
|
# For thumbnail use only every n-th pixel
|
|
THUMBNAIL_SUBSAMPLING = 4
|
|
|
|
# For motion detection sample only every n-th pixel
|
|
MOTION_SUBSAMPLING = 8
|
|
|
|
# Set to True to automatically spawn Gstreamer process per camera and redirect packets to it
|
|
AUTOMUX = False
|
|
|
|
PIPELINE = """udpsrc port=%(port)d caps = "application/x-rtp, encoding-name=(string)H264" \
|
|
! rtph264depay \
|
|
! tee name=t \
|
|
! decodebin \
|
|
! videoconvert \
|
|
! appsink \
|
|
t. ! h264parse \
|
|
! mp4mux streamable=true faststart=true fragment-duration=1000 dts-method=1 \
|
|
! filesink async=0 location=%(filename)s.mp4
|
|
""".replace("\n", "")
|
|
|
|
def detect_stream(port, addr):
|
|
now = 123
|
|
app = Flask(__name__)
|
|
events = deque(maxlen=THUMBNAIL_SUBSAMPLING)
|
|
frame = None
|
|
event = Event()
|
|
|
|
def generator(subsampling, frameskip, motion, quality, thumbs):
|
|
while True:
|
|
for j in range(0, frameskip):
|
|
event.wait()
|
|
yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n'
|
|
if motion:
|
|
for i in range(0, 2):
|
|
for j in range(0, 2):
|
|
frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,0] = 0
|
|
frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,1] = 0
|
|
frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,2] = thresh
|
|
|
|
|
|
if thumbs:
|
|
stacked = np.vstack([frame, np.hstack(events)])
|
|
else:
|
|
stacked = frame
|
|
ret, jpeg = cv2.imencode('.jpg', stacked[::subsampling,::subsampling],
|
|
(cv2.IMWRITE_JPEG_QUALITY, quality))
|
|
yield jpeg.tostring()
|
|
yield b'\r\n\r\n'
|
|
|
|
@app.route('/')
|
|
def video_combined():
|
|
return Response(generator(
|
|
request.args.get('subsampling', default = 1, type = int),
|
|
request.args.get('frameskip', default = 0, type = int),
|
|
request.args.get('motion', default = 0, type = int),
|
|
request.args.get('quality', default = 50, type = int),
|
|
request.args.get('thumbs', default = 1, type = int),
|
|
), mimetype='multipart/x-mixed-replace; boundary=frame')
|
|
|
|
print("Server listening on TCP port", port)
|
|
|
|
class WebThread(Thread):
|
|
def run(self):
|
|
print("Web server running for port", port)
|
|
app.run(threaded=True, port=port+1000)
|
|
|
|
thread = WebThread()
|
|
thread.start()
|
|
|
|
fh = None
|
|
cap = None
|
|
frames = []
|
|
avg = None
|
|
|
|
motion_detected = False
|
|
motion_start = None
|
|
motion_seen = 0
|
|
|
|
while True:
|
|
if not cap:
|
|
now = datetime.now()
|
|
timestamp = now.strftime(TIME_FORMAT)
|
|
filename = "stream-%(timestamp)s-%(addr)s" % locals()
|
|
if fh:
|
|
fh.close()
|
|
fh = open("%s.events" % filename, "w")
|
|
pipeline = PIPELINE % locals()
|
|
print("gst-launch-1.0 -v", pipeline.replace("appsink", "autovideosink"))
|
|
frame_count = 0
|
|
cap = cv2.VideoCapture(pipeline)
|
|
ret, frame = cap.read()
|
|
if ret == False:
|
|
cap = None
|
|
sleep(3)
|
|
continue
|
|
|
|
frame_count += 1
|
|
thumbnail = frame[::THUMBNAIL_SUBSAMPLING,::THUMBNAIL_SUBSAMPLING].copy()
|
|
if not events:
|
|
for j in range(0, THUMBNAIL_SUBSAMPLING):
|
|
events.append(np.zeros(thumbnail.shape))
|
|
|
|
reference = np.uint16(frame[::MOTION_SUBSAMPLING,::MOTION_SUBSAMPLING,1])
|
|
frames.append(reference)
|
|
if avg is None:
|
|
avg = np.copy(reference)
|
|
else:
|
|
avg += reference
|
|
if len(frames) <= 2 ** SLIDE_WINDOW:
|
|
continue
|
|
|
|
avg -= frames[0]
|
|
frames = frames[1:]
|
|
|
|
delta = cv2.absdiff(reference, avg >> SLIDE_WINDOW)
|
|
thresh = cv2.inRange(delta, MOTION_GREEN_THRESHOLD, 255)
|
|
im2, contours, hierarchy = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
|
|
|
|
motion_detected = False
|
|
|
|
for cnt in contours:
|
|
x, y, w, h = [j*MOTION_SUBSAMPLING for j in cv2.boundingRect(cnt)]
|
|
if w < MOTION_AREA_THRESHOLD or h < MOTION_AREA_THRESHOLD:
|
|
continue
|
|
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 4);
|
|
motion_detected = True
|
|
|
|
|
|
if motion_detected:
|
|
if motion_seen < MOTION_FRAMES_THRESHOLD:
|
|
motion_seen += 1
|
|
elif not motion_start:
|
|
events.append(thumbnail)
|
|
motion_start = datetime.now()
|
|
print("Event started:", motion_start)
|
|
else:
|
|
if motion_seen > 0:
|
|
motion_seen -= 1
|
|
if motion_seen == 0 and motion_start:
|
|
motion_end = datetime.now()
|
|
print("Got event:", motion_start, "to", motion_end)
|
|
fh.write("%d;%s;%s\n" % (frame_count, motion_start.strftime(TIME_FORMAT), motion_end.strftime(TIME_FORMAT)))
|
|
motion_start = None
|
|
|
|
event.set()
|
|
event.clear()
|
|
|
|
if AUTOMUX:
|
|
import os
|
|
import socket
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
|
sock.settimeout(5)
|
|
sock.bind(("", 5000))
|
|
next_port = 5001
|
|
mapping = dict()
|
|
print("Flushing iptables")
|
|
os.system("iptables -F PREROUTING -t nat")
|
|
os.system("iptables -F OUTPUT -t nat")
|
|
os.system("sysctl -w net.ipv4.conf.all.route_localnet=1")
|
|
print("Listening on UDP port 5000")
|
|
while True:
|
|
try:
|
|
buf, (addr, port) = sock.recvfrom(20)
|
|
except OSError: # timed out
|
|
continue
|
|
|
|
if addr not in mapping:
|
|
mapping[addr] = next_port
|
|
print("Redirecting", addr, "to", next_port)
|
|
os.system("iptables -I PREROUTING -t nat -p udp --dport 5000 -s %(addr)s -j REDIRECT --to-port %(next_port)d" % locals())
|
|
os.system("iptables -t nat -I OUTPUT -o lo -p udp --dport 5000 -j REDIRECT --to %(next_port)d" % locals())
|
|
|
|
if not os.fork():
|
|
print("Spawning process for", addr, next_port)
|
|
detect_stream(next_port, addr)
|
|
break
|
|
next_port += 1
|
|
else:
|
|
detect_stream(5001, "127.0.0.1")
|