9
0
Fork 0

Improved line buffering
continuous-integration/drone Build is passing Details

This commit is contained in:
Lauri Võsandi 2022-11-07 18:18:33 +02:00
parent 2ccd734ba5
commit 4d90244507
1 changed files with 49 additions and 37 deletions

View File

@ -237,21 +237,52 @@ async def uploader(coll, queue):
messages = []
class LogFile(object):
def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, start=False):
class FileTailer(object):
def __init__(self, path, offset=0, finished=False):
self.head = offset
self.tail = offset + 1
self.offset = offset
self.path = path
self.tail = 0
self.buf = b""
self.finished = finished
self.more_content = asyncio.Event()
self.fh = open(path, "rb")
async def __aiter__(self):
with open(self.path, "rb") as fh:
while True:
if not self.finished and self.head >= self.tail:
await self.more_content.wait()
self.more_content.clear()
self.tail = fh.seek(0, os.SEEK_END)
if self.head >= self.tail:
if self.finished:
# TODO: if there is still something in buf?
break
continue
fh.seek(self.head)
chunk = fh.read(min(self.tail - self.head, 4096))
self.buf += chunk
self.head += len(chunk)
while True:
step = self.buf.find(b"\n")
if step == -1:
break
buf = self.buf[:step + 1]
self.buf = self.buf[step + 1:]
yield self.offset, len(buf), buf[:-1].decode("utf-8")
self.offset += step + 1
break
class LogFile(FileTailer):
def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, start=False):
FileTailer.__init__(self, path)
self.queue = queue
self.namespace_name = namespace_name
self.pod_name = pod_name
self.container_name = container_name
self.running = True
self.coll = coll
self.poke()
self.state = "seeking"
self.done = False
self.loop = loop
if start:
self.start()
@ -269,7 +300,6 @@ class LogFile(object):
async def handler_loop(self):
message = ""
record_size = 0
self.head = 0
skip_next = False
if not args.dry_run:
then = time()
@ -279,51 +309,34 @@ class LogFile(object):
}, sort=[("log.offset", -1)])
histogram_database_operation_latency.labels("find-replay-offset").observe(time() - then)
if last_record:
self.head = last_record["log"]["offset"]
self.head = self.offset = last_record["log"]["offset"]
counter_skipped_bytes.inc(self.head)
skip_next = True
self.state = "replaying"
offset = self.head
while self.running:
while self.head >= self.tail:
self.state = "watching"
if self.done:
break
await self.more_content.wait()
self.more_content.clear()
self.tail = self.fh.seek(0, os.SEEK_END)
assert self.head < self.tail
self.fh.seek(self.head)
buf = self.fh.readline()
self.head += len(buf)
record_offset = self.offset
async for line_offset, line_size, line in self:
assert "\n" not in line
try:
reason = "unicode-encoding"
line = buf.decode("utf-8")
if len(line) < 45:
reason = "line-short"
raise ValueError()
if not line[-1] == "\n":
reason = "no-newline %s" % repr((buf))
raise ValueError()
if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line[:-1]):
if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line):
reason = "no-regex-match"
raise ValueError()
reason = "invalid-timestamp"
event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
print("Failed to parse file %s at offset %d, reason: %s" % (self.path, self.head, reason))
print("Failed to parse file %s at offset %d, reason %s: %s" % (self.path, line_offset, reason, repr(line)))
break
histogram_line_size.observe(len(buf))
record_size += len(buf)
histogram_line_size.observe(line_size)
record_size += line_size
if record_size < args.max_record_size:
# TODO: Support Docker runtime on EKS
message += line[45:-1]
message += line[45:]
state = line[43]
if state == "P":
@ -368,7 +381,7 @@ class LogFile(object):
o["log"]["file"] = {
"path": self.path
}
o["log"]["offset"] = offset
o["log"]["offset"] = record_offset
o["host"] = host_info
o["stream"] = stream
o["event"] = {
@ -393,9 +406,8 @@ class LogFile(object):
await self.queue.put(o)
gauge_queue_entries.set(self.queue.qsize())
skip_next = False
offset = self.head
record_offset = line_offset
self.state = "closing"
self.fh.close()
log_files.pop(self.path)