diff --git a/log_shipper.py b/log_shipper.py index 847fdd9..87a71da 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -237,21 +237,52 @@ async def uploader(coll, queue): messages = [] -class LogFile(object): - def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, start=False): +class FileTailer(object): + def __init__(self, path, offset=0, finished=False): + self.head = offset + self.tail = offset + 1 + self.offset = offset self.path = path - self.tail = 0 + self.buf = b"" + self.finished = finished self.more_content = asyncio.Event() - self.fh = open(path, "rb") + + async def __aiter__(self): + with open(self.path, "rb") as fh: + while True: + if not self.finished and self.head >= self.tail: + await self.more_content.wait() + self.more_content.clear() + self.tail = fh.seek(0, os.SEEK_END) + if self.head >= self.tail: + if self.finished: + # TODO: if there is still something in buf? + break + continue + fh.seek(self.head) + chunk = fh.read(min(self.tail - self.head, 4096)) + self.buf += chunk + self.head += len(chunk) + while True: + step = self.buf.find(b"\n") + if step == -1: + break + buf = self.buf[:step + 1] + self.buf = self.buf[step + 1:] + yield self.offset, len(buf), buf[:-1].decode("utf-8") + self.offset += step + 1 + break + + +class LogFile(FileTailer): + def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, start=False): + FileTailer.__init__(self, path) self.queue = queue self.namespace_name = namespace_name self.pod_name = pod_name self.container_name = container_name - self.running = True self.coll = coll - self.poke() self.state = "seeking" - self.done = False self.loop = loop if start: self.start() @@ -269,7 +300,6 @@ class LogFile(object): async def handler_loop(self): message = "" record_size = 0 - self.head = 0 skip_next = False if not args.dry_run: then = time() @@ -279,51 +309,34 @@ class LogFile(object): }, sort=[("log.offset", -1)]) histogram_database_operation_latency.labels("find-replay-offset").observe(time() - then) if last_record: - self.head = last_record["log"]["offset"] + self.head = self.offset = last_record["log"]["offset"] counter_skipped_bytes.inc(self.head) skip_next = True self.state = "replaying" - offset = self.head - while self.running: - while self.head >= self.tail: - self.state = "watching" - if self.done: - break - await self.more_content.wait() - self.more_content.clear() - self.tail = self.fh.seek(0, os.SEEK_END) - - assert self.head < self.tail - self.fh.seek(self.head) - buf = self.fh.readline() - self.head += len(buf) - + record_offset = self.offset + async for line_offset, line_size, line in self: + assert "\n" not in line try: reason = "unicode-encoding" - line = buf.decode("utf-8") if len(line) < 45: reason = "line-short" raise ValueError() - if not line[-1] == "\n": - reason = "no-newline %s" % repr((buf)) - raise ValueError() - if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line[:-1]): + if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line): reason = "no-regex-match" raise ValueError() reason = "invalid-timestamp" event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f") except ValueError: - print("Failed to parse file %s at offset %d, reason: %s" % (self.path, self.head, reason)) + print("Failed to parse file %s at offset %d, reason %s: %s" % (self.path, line_offset, reason, repr(line))) break - histogram_line_size.observe(len(buf)) - - record_size += len(buf) + histogram_line_size.observe(line_size) + record_size += line_size if record_size < args.max_record_size: # TODO: Support Docker runtime on EKS - message += line[45:-1] + message += line[45:] state = line[43] if state == "P": @@ -368,7 +381,7 @@ class LogFile(object): o["log"]["file"] = { "path": self.path } - o["log"]["offset"] = offset + o["log"]["offset"] = record_offset o["host"] = host_info o["stream"] = stream o["event"] = { @@ -393,9 +406,8 @@ class LogFile(object): await self.queue.put(o) gauge_queue_entries.set(self.queue.qsize()) skip_next = False - offset = self.head + record_offset = line_offset self.state = "closing" - self.fh.close() log_files.pop(self.path)