commit 9ef231d582009597f532b8917ecddca2a0297a5f Author: Lauri Võsandi Date: Sun May 12 00:33:25 2019 +0300 Preliminary implementation diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d244d61 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +stream* +extract*.mp4 diff --git a/README.md b/README.md new file mode 100644 index 0000000..d612190 --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +To stream from h264 hardware encoder accelerated USB webcam: + + gst-launch-1.0 -v v4l2src device=/dev/video2 \ + ! capsfilter caps="video/x-h264, width=1280, height=720, framerate=20/1" \ + ! rtph264pay config-interval=10 pt=96 \ + ! udpsink host=1.2.3.4 port=5000 + +Run server.py on 1.2.3.4 + +Files are written to currect directory + +For video feed open: http://localhost:6001/?frameskip=5&subsampling=4&quality=50&thumbs=0 diff --git a/extract.py b/extract.py new file mode 100644 index 0000000..efefd94 --- /dev/null +++ b/extract.py @@ -0,0 +1,19 @@ +from datetime import datetime +import os +import sys +_, path = sys.argv + +TIME_FORMAT = "%Y-%m-%d-%H-%M-%S" + +clip_start = datetime.strptime(path[7:26], TIME_FORMAT) +event_no = 1 +for line in open(path): + frame, raw_start, raw_end = line.strip().split(";") + event_start, event_end = datetime.strptime(raw_start, TIME_FORMAT), datetime.strptime(raw_end, TIME_FORMAT) + cmd = "ffmpeg -ss %s -i %s -t %s -vcodec copy extracted-%s-%d.mp4" % ( + event_start-clip_start, path.replace(".events", ".mp4"), event_end-event_start, + clip_start.strftime(TIME_FORMAT), event_no) + print("executing:", cmd) + os.system(cmd) + event_no += 1 + diff --git a/server.py b/server.py new file mode 100644 index 0000000..24daa76 --- /dev/null +++ b/server.py @@ -0,0 +1,199 @@ +import cv2 +import numpy as np +from flask import Flask, Response +from gevent import pywsgi +from threading import Thread, Event +from time import sleep +from flask import request +from datetime import datetime +from collections import deque + +TIME_FORMAT = "%Y-%m-%d-%H-%M-%S" +MOTION_GREEN_THRESHOLD = 30 +MOTION_AREA_THRESHOLD = 30 +MOTION_FRAMES_THRESHOLD = 20 + +# 2 to power of SLIDE_WINDOW is the count of frames kept in memory for bg detection +SLIDE_WINDOW = 5 + +# For thumbnail use only every n-th pixel +THUMBNAIL_SUBSAMPLING = 4 + +# For motion detection sample only every n-th pixel +MOTION_SUBSAMPLING = 8 + +# Set to True to automatically spawn Gstreamer process per camera and redirect packets to it +AUTOMUX = False + +PIPELINE = """udpsrc port=%(port)d caps = "application/x-rtp, encoding-name=(string)H264" \ + ! rtph264depay \ + ! tee name=t \ + ! decodebin \ + ! videoconvert \ + ! appsink \ + t. ! h264parse \ + ! mp4mux streamable=true faststart=true fragment-duration=1000 dts-method=1 \ + ! filesink async=0 location=%(filename)s.mp4 +""".replace("\n", "") + +def detect_stream(port, addr): + now = 123 + app = Flask(__name__) + events = deque(maxlen=THUMBNAIL_SUBSAMPLING) + frame = None + event = Event() + + def generator(subsampling, frameskip, motion, quality, thumbs): + while True: + for j in range(0, frameskip): + event.wait() + yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + if motion: + for i in range(0, 2): + for j in range(0, 2): + frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,0] = 0 + frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,1] = 0 + frame[i::MOTION_SUBSAMPLING,j::MOTION_SUBSAMPLING,2] = thresh + + + if thumbs: + stacked = np.vstack([frame, np.hstack(events)]) + else: + stacked = frame + ret, jpeg = cv2.imencode('.jpg', stacked[::subsampling,::subsampling], + (cv2.IMWRITE_JPEG_QUALITY, quality)) + yield jpeg.tostring() + yield b'\r\n\r\n' + + @app.route('/') + def video_combined(): + return Response(generator( + request.args.get('subsampling', default = 1, type = int), + request.args.get('frameskip', default = 0, type = int), + request.args.get('motion', default = 0, type = int), + request.args.get('quality', default = 50, type = int), + request.args.get('thumbs', default = 1, type = int), + ), mimetype='multipart/x-mixed-replace; boundary=frame') + + print("Server listening on TCP port", port) + + class WebThread(Thread): + def run(self): + print("Web server running for port", port) + app.run(threaded=True, port=port+1000) + + thread = WebThread() + thread.start() + + fh = None + cap = None + frames = [] + avg = None + + motion_detected = False + motion_start = None + motion_seen = 0 + + while True: + if not cap: + now = datetime.now() + timestamp = now.strftime(TIME_FORMAT) + filename = "stream-%(timestamp)s-%(addr)s" % locals() + if fh: + fh.close() + fh = open("%s.events" % filename, "w") + pipeline = PIPELINE % locals() + print("gst-launch-1.0 -v", pipeline.replace("appsink", "autovideosink")) + frame_count = 0 + cap = cv2.VideoCapture(pipeline) + ret, frame = cap.read() + if ret == False: + cap = None + sleep(3) + continue + + frame_count += 1 + thumbnail = frame[::THUMBNAIL_SUBSAMPLING,::THUMBNAIL_SUBSAMPLING].copy() + if not events: + for j in range(0, THUMBNAIL_SUBSAMPLING): + events.append(np.zeros(thumbnail.shape)) + + reference = np.uint16(frame[::MOTION_SUBSAMPLING,::MOTION_SUBSAMPLING,1]) + frames.append(reference) + if avg is None: + avg = np.copy(reference) + else: + avg += reference + if len(frames) <= 2 ** SLIDE_WINDOW: + continue + + avg -= frames[0] + frames = frames[1:] + + delta = cv2.absdiff(reference, avg >> SLIDE_WINDOW) + thresh = cv2.inRange(delta, MOTION_GREEN_THRESHOLD, 255) + im2, contours, hierarchy = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + + + motion_detected = False + + for cnt in contours: + x, y, w, h = [j*MOTION_SUBSAMPLING for j in cv2.boundingRect(cnt)] + if w < MOTION_AREA_THRESHOLD or h < MOTION_AREA_THRESHOLD: + continue + cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 4); + motion_detected = True + + + if motion_detected: + if motion_seen < MOTION_FRAMES_THRESHOLD: + motion_seen += 1 + elif not motion_start: + events.append(thumbnail) + motion_start = datetime.now() + print("Event started:", motion_start) + else: + if motion_seen > 0: + motion_seen -= 1 + if motion_seen == 0 and motion_start: + motion_end = datetime.now() + print("Got event:", motion_start, "to", motion_end) + fh.write("%d;%s;%s\n" % (frame_count, motion_start.strftime(TIME_FORMAT), motion_end.strftime(TIME_FORMAT))) + motion_start = None + + event.set() + event.clear() + +if AUTOMUX: + import os + import socket + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.settimeout(5) + sock.bind(("", 5000)) + next_port = 5001 + mapping = dict() + print("Flushing iptables") + os.system("iptables -F PREROUTING -t nat") + os.system("iptables -F OUTPUT -t nat") + os.system("sysctl -w net.ipv4.conf.all.route_localnet=1") + print("Listening on UDP port 5000") + while True: + try: + buf, (addr, port) = sock.recvfrom(20) + except OSError: # timed out + continue + + if addr not in mapping: + mapping[addr] = next_port + print("Redirecting", addr, "to", next_port) + os.system("iptables -I PREROUTING -t nat -p udp --dport 5000 -s %(addr)s -j REDIRECT --to-port %(next_port)d" % locals()) + os.system("iptables -t nat -I OUTPUT -o lo -p udp --dport 5000 -j REDIRECT --to %(next_port)d" % locals()) + + if not os.fork(): + print("Spawning process for", addr, next_port) + detect_stream(next_port, addr) + break + next_port += 1 +else: + detect_stream(5001, "127.0.0.1")