Chunked transfer fixes
continuous-integration/drone Build is passing Details

This commit is contained in:
Lauri Võsandi 2022-02-13 20:54:24 +02:00 committed by Lauri Võsandi
parent 311f485893
commit b7383e9215
1 changed files with 14 additions and 12 deletions

View File

@ -38,6 +38,9 @@ counter_rx_bytes = Counter(
counter_tx_bytes = Counter( counter_tx_bytes = Counter(
"camtiler_client_tx_bytes", "camtiler_client_tx_bytes",
"Bytes transmitted over HTTP streams") "Bytes transmitted over HTTP streams")
counter_rx_chunks = Counter(
"camtiler_client_rx_chunks",
"HTTP chunks received")
counter_rx_frames = Counter( counter_rx_frames = Counter(
"camtiler_client_rx_frames", "camtiler_client_rx_frames",
"Frames received") "Frames received")
@ -47,9 +50,6 @@ counter_tx_frames = Counter(
counter_tx_events = Counter( counter_tx_events = Counter(
"camtiler_client_tx_events", "camtiler_client_tx_events",
"Events emitted") "Events emitted")
counter_eos = Counter(
"camtiler_client_eos",
"Count of End of Stream occurrences")
counter_timeout_errors = Counter( counter_timeout_errors = Counter(
"camtiler_client_timeout_errors", "camtiler_client_timeout_errors",
"Upstream connection timeout errors") "Upstream connection timeout errors")
@ -106,18 +106,18 @@ async def client_connect(name, resp):
buf = b"" buf = b""
print("Upstream connection opened with status:", resp.status) print("Upstream connection opened with status:", resp.status)
async for data, end_of_http_chunk in resp.content.iter_chunks(): async for data, end_of_http_chunk in resp.content.iter_chunks():
counter_rx_bytes.inc(len(data))
if end_of_http_chunk: if end_of_http_chunk:
counter_eos.inc() counter_rx_chunks.inc()
break
if buf: if buf:
# seek end # If we already have something in buffer, seek end in new data
marker = data.find(b'\xff\xd9') marker = data.find(b'\xff\xd9')
if marker < 0: if marker < 0:
# If no end marker was found add it to buffer
buf += data buf += data
continue continue
else: else:
# If end marker was found, decode JPEG frame
blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8) blob = np.frombuffer(buf + data[:marker+2], dtype=np.uint8)
img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED) img = cv2.imdecode(blob, cv2.IMREAD_UNCHANGED)
app.ctx.frames[name] = img app.ctx.frames[name] = img
@ -125,17 +125,19 @@ async def client_connect(name, resp):
buf = b"" buf = b""
counter_rx_frames.inc() counter_rx_frames.inc()
# seek begin # Seek begin in newly received data
marker = data.find(b'\xff\xd8') marker = data.find(b'\xff\xd8')
if marker >= 0: if marker >= 0:
buf = data[marker:] data, buf = data[:marker], data[marker:]
else:
counter_dropped_bytes.inc(len(data)) # Remaining data is effectively dropped
counter_dropped_bytes.inc(len(data))
async def client(name, url): async def client(name, url):
print("Opening upstream connection to %s" % url) print("Opening upstream connection to %s" % url)
async with aiohttp.ClientSession() as session: while True:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp: async with session.get(url) as resp:
try: try:
await client_connect(name, resp) await client_connect(name, resp)