diff --git a/log_shipper.py b/log_shipper.py index d45f396..83ea61e 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -169,7 +169,7 @@ class LogFile(object): self.path = path self.tail = 0 self.more_content = asyncio.Event() - self.fh = open(path) + self.fh = open(path, "rb") self.queue = queue self.namespace_name = namespace_name self.pod_name = pod_name @@ -221,16 +221,23 @@ class LogFile(object): assert self.head < self.tail self.fh.seek(self.head) buf = self.fh.readline() + try: - if len(buf) < 45: + reason = "unicode-encoding" + line = buf.decode("utf-8") + if len(line) < 45: + reason = "line-short" raise ValueError() - if not buf[-1] == "\n": + if not line[-1] == "\n": + reason = "no-newline %s" % repr((buf)) raise ValueError() - if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", buf[:-1]): + if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line[:-1]): + reason = "no-regex-match" raise ValueError() - event_created = datetime.strptime(buf[:23], "%Y-%m-%dT%H:%M:%S.%f") + reason = "invalid-timestamp" + event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f") except ValueError: - print("Failed to parse file %s at offset %d" % (self.path, self.head)) + print("Failed to parse file %s at offset %d, reason: %s" % (self.path, self.head, reason)) break histogram_line_size.observe(len(buf)) @@ -239,9 +246,9 @@ class LogFile(object): if record_size < args.max_record_size: # TODO: Support Docker runtime on EKS - message += buf[45:-1] + message += line[45:-1] - state = buf[43] + state = line[43] if state == "P": # This is partial message continue @@ -257,7 +264,7 @@ class LogFile(object): # TODO: Log portion of the message continue - stream = buf[36:42].strip() + stream = line[36:42].strip() if message.startswith("{\""): # TODO: Follow Filebeat hints try: