diff --git a/log_shipper.py b/log_shipper.py index 71555eb..5f29df1 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -37,9 +37,7 @@ args = parser.parse_args() ROOT = "/var/log/containers" app = Sanic("tail") -fhs = dict() tasks = dict() -tails = collections.Counter() with open("/etc/machine-id") as fh: machine_id = fh.read().strip() @@ -167,7 +165,7 @@ async def uploader(coll, queue): class LogFile(object): - def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, container_uid): + def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name): self.path = path self.tail = 0 self.more_content = asyncio.Event() @@ -176,13 +174,15 @@ class LogFile(object): self.namespace_name = namespace_name self.pod_name = pod_name self.container_name = container_name - self.container_uid = container_uid self.running = True self.coll = coll self.poke() self.state = "seeking" self.done = False - loop.create_task(self.handler_loop()) + self.loop = loop + + def start(self): + self.loop.create_task(self.handler_loop()) def poke(self): self.tail = self.fh.seek(0, os.SEEK_END) @@ -276,7 +276,6 @@ class LogFile(object): o["kubernetes"] = { "container": { "name": self.container_name, - "id": self.container_uid }, "namespace": self.namespace_name, "pod": { @@ -310,47 +309,76 @@ class LogFile(object): async def watcher(loop, queue, coll): print("Starting watching") with Inotify() as inotify: - async def add_file(path): + def add_file(path, done=False): + if path in log_files: + log_files[path].done = done + return log_files[path] print("Adding file: %s" % path) - m = re.match("(.*)_(.*)_(.*)-([0-9abcdef]{64})\\.log$", os.path.basename(path)) + + m = re.match("/var/log/pods/(.*)_(.*)_.*/(.*)/[0-9]+\\.log$", path) + if not m: + print("Unexpected filename:", path) + raise counter_unexpected_filenames.inc() return - pod_name, namespace_name, container_name, container_uid = m.groups() + namespace_name, pod_name, container_name = m.groups() for prefix in args.exclude_pod_prefixes: if pod_name.startswith(prefix): return if args.namespace and namespace_name != args.namespace: return - # Handle log rotation as /var/logs/containers path is a symlink to - # actual file under /var/logs/pods - path = os.readlink(path) - assert path not in log_files - log_files[path] = LogFile(loop, coll, queue, path, namespace_name, pod_name, container_name, container_uid) + lf = log_files[path] = LogFile(loop, coll, queue, path, namespace_name, pod_name, container_name) + lf.done = done inotify.add_watch(path, Mask.MODIFY | Mask.CLOSE_WRITE | Mask.DELETE_SELF) + return lf inotify.add_watch(ROOT, Mask.CREATE | Mask.ONLYDIR) - for filename in os.listdir(ROOT): - await add_file(os.path.join(ROOT, filename)) + + # Register all existing log files + for pod_dir in os.listdir("/var/log/pods"): + m = re.match("(.*)_(.*)_(.*)$", pod_dir) + if not m: + print("Unexpected directory", pod_dir) + continue + namespace_name, pod_name, pod_id = m.groups() + for container_name in os.listdir(os.path.join("/var/log/pods", pod_dir)): + if not re.match("^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?