import cv2 import numpy as np from flask import Flask, Response from gevent import pywsgi from threading import Thread, Event from time import sleep from flask import request from datetime import datetime from collections import deque TIME_FORMAT = "%Y-%m-%d-%H-%M-%S" MOTION_GREEN_THRESHOLD = 30 MOTION_AREA_THRESHOLD = 30 MOTION_FRAMES_THRESHOLD = 20 # 2 to power of SLIDE_WINDOW is the count of frames kept in memory for bg detection SLIDE_WINDOW = 5 # For thumbnail use only every n-th pixel THUMBNAIL_SUBSAMPLING = 4 # For motion detection sample only every n-th pixel MOTION_SUBSAMPLING = 8 # Set to True to automatically spawn Gstreamer process per camera and redirect packets to it AUTOMUX = False PIPELINE = """udpsrc port=%(port)d caps = "application/x-rtp, encoding-name=(string)H264" \ ! rtph264depay \ ! tee name=t \ ! decodebin \ ! videoconvert \ ! appsink \ t. ! h264parse \ ! mp4mux streamable=true faststart=true fragment-duration=1000 dts-method=1 \ ! filesink async=0 location=%(filename)s.mp4 """.replace("\n", "") def detect_stream(port, addr): now = 123 app = Flask(__name__) events = deque(maxlen=THUMBNAIL_SUBSAMPLING) frame = None event = Event() def generator(subsampling, frameskip, motion, quality, thumbs): while True: for j in range(0, frameskip): event.wait() yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' if motion: for i in range(0, 2): for j in range(0, 2): frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,0] = 0 frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,1] = 0 frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,2] = thresh if thumbs: stacked = np.vstack([frame, np.hstack(events)]) else: stacked = frame ret, jpeg = cv2.imencode('.jpg', stacked[::subsampling,::subsampling], (cv2.IMWRITE_JPEG_QUALITY, quality)) yield jpeg.tostring() yield b'\r\n\r\n' @app.route('/') def video_combined(): return Response(generator( request.args.get('subsampling', default = 1, type = int), request.args.get('frameskip', default = 0, type = int), request.args.get('motion', default = 0, type = int), request.args.get('quality', default = 50, type = int), request.args.get('thumbs', default = 1, type = int), ), mimetype='multipart/x-mixed-replace; boundary=frame') print("Server listening on TCP port", port) class WebThread(Thread): def run(self): print("Web server running for port", port) app.run(threaded=True, port=port+1000) thread = WebThread() thread.start() fh = None cap = None frames = [] avg = None motion_detected = False motion_start = None motion_seen = 0 while True: if not cap: now = datetime.now() timestamp = now.strftime(TIME_FORMAT) filename = "stream-%(timestamp)s-%(addr)s" % locals() if fh: fh.close() fh = open("%s.events" % filename, "w") pipeline = PIPELINE % locals() print("gst-launch-1.0 -v", pipeline.replace("appsink", "autovideosink")) frame_count = 0 cap = cv2.VideoCapture(pipeline) ret, frame = cap.read() if ret == False: cap = None sleep(3) continue frame_count += 1 thumbnail = frame[::THUMBNAIL_SUBSAMPLING,::THUMBNAIL_SUBSAMPLING].copy() if not events: for j in range(0, THUMBNAIL_SUBSAMPLING): events.append(np.zeros(thumbnail.shape)) reference = np.uint16(frame[::MOTION_SUBSAMPLING,::MOTION_SUBSAMPLING,1]) frames.append(reference) if avg is None: avg = np.copy(reference) else: avg += reference if len(frames) <= 2 ** SLIDE_WINDOW: continue avg -= frames[0] frames = frames[1:] delta = cv2.absdiff(reference, avg >> SLIDE_WINDOW) thresh = cv2.inRange(delta, MOTION_GREEN_THRESHOLD, 255) im2, contours, hierarchy = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) motion_detected = False for cnt in contours: x, y, w, h = [j*MOTION_SUBSAMPLING for j in cv2.boundingRect(cnt)] if w < MOTION_AREA_THRESHOLD or h < MOTION_AREA_THRESHOLD: continue cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 4); motion_detected = True if motion_detected: if motion_seen < MOTION_FRAMES_THRESHOLD: motion_seen += 1 elif not motion_start: events.append(thumbnail) motion_start = datetime.now() print("Event started:", motion_start) else: if motion_seen > 0: motion_seen -= 1 if motion_seen == 0 and motion_start: motion_end = datetime.now() print("Got event:", motion_start, "to", motion_end) fh.write("%d;%s;%s\n" % (frame_count, motion_start.strftime(TIME_FORMAT), motion_end.strftime(TIME_FORMAT))) motion_start = None event.set() event.clear() if AUTOMUX: import os import socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.settimeout(5) sock.bind(("", 5000)) next_port = 5001 mapping = dict() print("Flushing iptables") os.system("iptables -F PREROUTING -t nat") os.system("iptables -F OUTPUT -t nat") os.system("sysctl -w net.ipv4.conf.all.route_localnet=1") print("Listening on UDP port 5000") while True: try: buf, (addr, port) = sock.recvfrom(20) except OSError: # timed out continue if addr not in mapping: mapping[addr] = next_port print("Redirecting", addr, "to", next_port) os.system("iptables -I PREROUTING -t nat -p udp --dport 5000 -s %(addr)s -j REDIRECT --to-port %(next_port)d" % locals()) os.system("iptables -t nat -I OUTPUT -o lo -p udp --dport 5000 -j REDIRECT --to %(next_port)d" % locals()) if not os.fork(): print("Spawning process for", addr, next_port) detect_stream(next_port, addr) break next_port += 1 else: detect_stream(5001, "127.0.0.1")