diff --git a/camtiler.py b/camtiler.py index d62a8c9..c9d8d13 100755 --- a/camtiler.py +++ b/camtiler.py @@ -38,6 +38,9 @@ counter_rx_bytes = Counter( counter_tx_bytes = Counter( "camtiler_client_tx_bytes", "Bytes transmitted over HTTP streams") +counter_rx_chunks = Counter( + "camtiler_client_rx_chunks", + "HTTP chunks received") counter_rx_frames = Counter( "camtiler_client_rx_frames", "Frames received") @@ -47,9 +50,6 @@ counter_tx_frames = Counter( counter_tx_events = Counter( "camtiler_client_tx_events", "Events emitted") -counter_eos = Counter( - "camtiler_client_eos", - "Count of End of Stream occurrences") counter_timeout_errors = Counter( "camtiler_client_timeout_errors", "Upstream connection timeout errors") @@ -106,18 +106,18 @@ async def client_connect(name, resp): buf = b"" print("Upstream connection opened with status:", resp.status) async for data, end_of_http_chunk in resp.content.iter_chunks(): - counter_rx_bytes.inc(len(data)) if end_of_http_chunk: - counter_eos.inc() - break + counter_rx_chunks.inc() if buf: - # seek end + # If we already have something in buffer, seek end in new data marker = data.find(b'\xff\xd9') if marker < 0: + # If no end marker was found add it to buffer buf += data continue else: + # If end marker was found, decode JPEG frame blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8) img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED) app.ctx.frames[name] = img @@ -125,17 +125,19 @@ async def client_connect(name, resp): buf = b"" counter_rx_frames.inc() - # seek begin + # Seek begin in newly received data marker = data.find(b'\xff\xd8') if marker >= 0: - buf = data[marker:] - else: - counter_dropped_bytes.inc(len(data)) + data, buf = data[:marker], data[marker:] + + # Remaining data is effectively dropped + counter_dropped_bytes.inc(len(data)) async def client(name, url): print("Opening upstream connection to %s" % url) - async with aiohttp.ClientSession() as session: + while True: + async with aiohttp.ClientSession() as session: async with session.get(url) as resp: try: await client_connect(name, resp)