|
|
|
@ -9,7 +9,6 @@ import json |
|
|
|
|
import numpy as np |
|
|
|
|
import os |
|
|
|
|
import signal |
|
|
|
|
import socket |
|
|
|
|
import sys |
|
|
|
|
from datetime import datetime, timedelta |
|
|
|
|
from jpeg2dct.numpy import loads |
|
|
|
@ -107,7 +106,7 @@ gauge_download_queue_size.set(0) |
|
|
|
|
assert SLIDE_WINDOW <= 8 # This is 256 frames which should be enough |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def upload(bucket, blob: bytes, event_id): |
|
|
|
|
async def upload(bucket, blob: bytes, thumb: bytes, event_id): |
|
|
|
|
""" |
|
|
|
|
Upload single JPEG blob to S3 bucket |
|
|
|
|
""" |
|
|
|
@ -115,6 +114,11 @@ async def upload(bucket, blob: bytes, event_id): |
|
|
|
|
# Generate S3 path based on the JPEG blob SHA512 digest |
|
|
|
|
fp = hashlib.sha512(blob).hexdigest() |
|
|
|
|
path = "%s/%s/%s/%s.jpg" % (fp[:4], fp[4:8], fp[8:12], fp[12:]) |
|
|
|
|
|
|
|
|
|
# First upload the thumbnail |
|
|
|
|
await bucket.upload_fileobj(io.BytesIO(thumb), "thumb/%s" % path) |
|
|
|
|
|
|
|
|
|
# Proceed to upload the original JPEG frame |
|
|
|
|
await bucket.upload_fileobj(io.BytesIO(blob), path) |
|
|
|
|
|
|
|
|
|
# Add screenshot path to the event |
|
|
|
@ -139,8 +143,8 @@ async def uploader(queue): |
|
|
|
|
async with session.resource("s3", endpoint_url=S3_ENDPOINT_URL) as s3: |
|
|
|
|
bucket = await s3.Bucket(S3_BUCKET_NAME) |
|
|
|
|
while True: |
|
|
|
|
blob, event_id = await queue.get() |
|
|
|
|
await upload(bucket, blob, event_id) |
|
|
|
|
blob, thumb, event_id = await queue.get() |
|
|
|
|
await upload(bucket, blob, thumb, event_id) |
|
|
|
|
counter_uploaded_frames.inc() |
|
|
|
|
gauge_upload_queue_size.set(queue.qsize()) |
|
|
|
|
|
|
|
|
@ -188,13 +192,16 @@ async def motion_detector(reference_frame, download_queue, upload_queue): |
|
|
|
|
event_id = None |
|
|
|
|
differing_blocks = [] |
|
|
|
|
while True: |
|
|
|
|
dt, blob, y = await download_queue.get() |
|
|
|
|
app.ctx.last_frame = blob |
|
|
|
|
dt, blob, dct, thumb = await download_queue.get() |
|
|
|
|
app.ctx.last_frame, app.ctx.dct = blob, dct |
|
|
|
|
|
|
|
|
|
# Signal /bypass and /debug handlers about new frame |
|
|
|
|
app.ctx.event_frame.set() |
|
|
|
|
app.ctx.event_frame.clear() |
|
|
|
|
|
|
|
|
|
# Separate most significant luma value for each DCT (8x8 pixel) block |
|
|
|
|
y = np.int16(dct[0][:, :, 0]) |
|
|
|
|
|
|
|
|
|
# Update metrics |
|
|
|
|
gauge_total_blocks.set(y.shape[0] * y.shape[1]) |
|
|
|
|
gauge_last_frame.set(dt.timestamp()) |
|
|
|
@ -246,7 +253,7 @@ async def motion_detector(reference_frame, download_queue, upload_queue): |
|
|
|
|
counter_movement_frames.inc() |
|
|
|
|
try: |
|
|
|
|
# Push JPEG blob into upload queue |
|
|
|
|
upload_queue.put_nowait((blob, event_id)) |
|
|
|
|
upload_queue.put_nowait((blob, thumb, event_id)) |
|
|
|
|
except asyncio.QueueFull: |
|
|
|
|
counter_upload_dropped_frames.inc() |
|
|
|
|
gauge_upload_queue_size.set(upload_queue.qsize()) |
|
|
|
@ -264,6 +271,41 @@ async def motion_detector(reference_frame, download_queue, upload_queue): |
|
|
|
|
gauge_event_active.set(0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_thumbnail(dct): |
|
|
|
|
""" |
|
|
|
|
This is highly efficient and highly inaccurate function to generate |
|
|
|
|
thumbnail based purely on JPEG coefficients |
|
|
|
|
""" |
|
|
|
|
y, cr, cb = dct |
|
|
|
|
|
|
|
|
|
# Determine aspect ratio and minimum dimension for cropping |
|
|
|
|
ar = y.shape[0] < y.shape[1] |
|
|
|
|
dm = (y.shape[0] if ar else y.shape[1]) & 0xfffffff8 |
|
|
|
|
|
|
|
|
|
# Determine cropping slices to make it square |
|
|
|
|
jl = ((y.shape[1] >> 1) - (dm >> 1) if ar else 0) |
|
|
|
|
jt = (0 if ar else (y.shape[0] >> 1) - (dm >> 1)) |
|
|
|
|
jr = jl + dm |
|
|
|
|
jb = jt + dm |
|
|
|
|
|
|
|
|
|
# Do the actual crop |
|
|
|
|
ty = y[jt:jb, jl:jr, 0] |
|
|
|
|
tb = cb[jt >> 1:jb >> 1, jl >> 1:jr >> 1, 0] |
|
|
|
|
tr = cr[jt >> 1:jb >> 1, jl >> 1:jr >> 1, 0] |
|
|
|
|
|
|
|
|
|
# Upsample chroma, dummy convert first coeff and stack all channels |
|
|
|
|
m = np.dstack(( |
|
|
|
|
np.array((ty >> 3) + 127, dtype=np.uint8), |
|
|
|
|
np.array( |
|
|
|
|
(tb.repeat(2, 1).repeat(2, 0) >> 3) + 127, dtype=np.uint8)[:dm], |
|
|
|
|
np.array( |
|
|
|
|
(tr.repeat(2, 1).repeat(2, 0) >> 3) + 127, dtype=np.uint8)[:dm])) |
|
|
|
|
_, jpeg = cv2.imencode(".jpg", |
|
|
|
|
cv2.cvtColor(m, cv2.COLOR_YCrCb2RGB), |
|
|
|
|
(cv2.IMWRITE_JPEG_QUALITY, 80)) |
|
|
|
|
return jpeg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def download(resp, queue): |
|
|
|
|
""" |
|
|
|
|
This coroutine iterates over HTTP connection chunks |
|
|
|
@ -286,15 +328,16 @@ async def download(resp, queue): |
|
|
|
|
# Assemble JPEG blob |
|
|
|
|
blob = buf + data[:marker+2] |
|
|
|
|
|
|
|
|
|
# Parse DCT coeffs and keep DCT coeffs only for Y channel |
|
|
|
|
y, _, _ = loads(blob) |
|
|
|
|
# Parse DCT coefficients |
|
|
|
|
dct = loads(blob) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
# Convert Y component to 16 bit for easier handling |
|
|
|
|
queue.put_nowait(( |
|
|
|
|
datetime.utcnow(), |
|
|
|
|
blob, |
|
|
|
|
np.int16(y[:, :, 0]))) |
|
|
|
|
dct, |
|
|
|
|
generate_thumbnail(dct))) |
|
|
|
|
except asyncio.QueueFull: |
|
|
|
|
counter_download_dropped_frames.inc() |
|
|
|
|
data = data[marker+2:] |
|
|
|
|