More consistent handling of symlinks
continuous-integration/drone Build is passing
Details
continuous-integration/drone Build is passing
Details
This commit is contained in:
parent
2c5bd944a9
commit
6f9e5de0b6
|
@ -37,9 +37,7 @@ args = parser.parse_args()
|
|||
|
||||
ROOT = "/var/log/containers"
|
||||
app = Sanic("tail")
|
||||
fhs = dict()
|
||||
tasks = dict()
|
||||
tails = collections.Counter()
|
||||
|
||||
with open("/etc/machine-id") as fh:
|
||||
machine_id = fh.read().strip()
|
||||
|
@ -167,7 +165,7 @@ async def uploader(coll, queue):
|
|||
|
||||
|
||||
class LogFile(object):
|
||||
def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, container_uid):
|
||||
def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name):
|
||||
self.path = path
|
||||
self.tail = 0
|
||||
self.more_content = asyncio.Event()
|
||||
|
@ -176,13 +174,15 @@ class LogFile(object):
|
|||
self.namespace_name = namespace_name
|
||||
self.pod_name = pod_name
|
||||
self.container_name = container_name
|
||||
self.container_uid = container_uid
|
||||
self.running = True
|
||||
self.coll = coll
|
||||
self.poke()
|
||||
self.state = "seeking"
|
||||
self.done = False
|
||||
loop.create_task(self.handler_loop())
|
||||
self.loop = loop
|
||||
|
||||
def start(self):
|
||||
self.loop.create_task(self.handler_loop())
|
||||
|
||||
def poke(self):
|
||||
self.tail = self.fh.seek(0, os.SEEK_END)
|
||||
|
@ -276,7 +276,6 @@ class LogFile(object):
|
|||
o["kubernetes"] = {
|
||||
"container": {
|
||||
"name": self.container_name,
|
||||
"id": self.container_uid
|
||||
},
|
||||
"namespace": self.namespace_name,
|
||||
"pod": {
|
||||
|
@ -310,47 +309,76 @@ class LogFile(object):
|
|||
async def watcher(loop, queue, coll):
|
||||
print("Starting watching")
|
||||
with Inotify() as inotify:
|
||||
async def add_file(path):
|
||||
def add_file(path, done=False):
|
||||
if path in log_files:
|
||||
log_files[path].done = done
|
||||
return log_files[path]
|
||||
print("Adding file: %s" % path)
|
||||
m = re.match("(.*)_(.*)_(.*)-([0-9abcdef]{64})\\.log$", os.path.basename(path))
|
||||
|
||||
m = re.match("/var/log/pods/(.*)_(.*)_.*/(.*)/[0-9]+\\.log$", path)
|
||||
|
||||
if not m:
|
||||
print("Unexpected filename:", path)
|
||||
raise
|
||||
counter_unexpected_filenames.inc()
|
||||
return
|
||||
pod_name, namespace_name, container_name, container_uid = m.groups()
|
||||
namespace_name, pod_name, container_name = m.groups()
|
||||
for prefix in args.exclude_pod_prefixes:
|
||||
if pod_name.startswith(prefix):
|
||||
return
|
||||
if args.namespace and namespace_name != args.namespace:
|
||||
return
|
||||
# Handle log rotation as /var/logs/containers path is a symlink to
|
||||
# actual file under /var/logs/pods
|
||||
path = os.readlink(path)
|
||||
assert path not in log_files
|
||||
log_files[path] = LogFile(loop, coll, queue, path, namespace_name, pod_name, container_name, container_uid)
|
||||
lf = log_files[path] = LogFile(loop, coll, queue, path, namespace_name, pod_name, container_name)
|
||||
lf.done = done
|
||||
inotify.add_watch(path, Mask.MODIFY | Mask.CLOSE_WRITE | Mask.DELETE_SELF)
|
||||
return lf
|
||||
|
||||
inotify.add_watch(ROOT, Mask.CREATE | Mask.ONLYDIR)
|
||||
for filename in os.listdir(ROOT):
|
||||
await add_file(os.path.join(ROOT, filename))
|
||||
|
||||
# Register all existing log files
|
||||
for pod_dir in os.listdir("/var/log/pods"):
|
||||
m = re.match("(.*)_(.*)_(.*)$", pod_dir)
|
||||
if not m:
|
||||
print("Unexpected directory", pod_dir)
|
||||
continue
|
||||
namespace_name, pod_name, pod_id = m.groups()
|
||||
for container_name in os.listdir(os.path.join("/var/log/pods", pod_dir)):
|
||||
if not re.match("^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$", container_name):
|
||||
print("Unexpected directory:", container_name)
|
||||
continue
|
||||
for filename in os.listdir(os.path.join("/var/log/pods", pod_dir, container_name)):
|
||||
m = re.match("[0-9]+\\.log$", filename)
|
||||
if not m:
|
||||
print("Unexpected filename:", filename)
|
||||
continue
|
||||
path = os.path.join("/var/log/pods", pod_dir, container_name, filename)
|
||||
add_file(path, done=True)
|
||||
|
||||
# Inspect currently running containers
|
||||
for filename in os.listdir("/var/log/containers"):
|
||||
path = os.path.realpath(os.path.join(os.path.join("/var/log/containers", filename)))
|
||||
add_file(path, done=False)
|
||||
|
||||
for log_file in log_files.values():
|
||||
log_file.start()
|
||||
|
||||
async for event in inotify:
|
||||
# Events for /var/log/pods
|
||||
if event.mask & Mask.CREATE:
|
||||
counter_inotify_events.labels("create").inc()
|
||||
await add_file(str(event.path))
|
||||
elif event.mask & Mask.DELETE_SELF:
|
||||
print("File deleted: %s" % event.path)
|
||||
counter_inotify_events.labels("delete_self").inc()
|
||||
log_file = log_files.get(str(event.path))
|
||||
if log_file:
|
||||
log_file.close()
|
||||
add_file(os.path.realpath(event.path))
|
||||
|
||||
# Events for /var/log/pods
|
||||
elif event.mask & Mask.CLOSE_WRITE:
|
||||
print("File closed: %s" % event.path)
|
||||
counter_inotify_events.labels("close_write").inc()
|
||||
# TODO: Close opened file already now instead waiting for deletion
|
||||
log_file = log_files.get(str(event.path))
|
||||
log_file.close()
|
||||
elif event.mask & Mask.MODIFY:
|
||||
counter_inotify_events.labels("modify").inc()
|
||||
log_file = log_files.get(str(event.path))
|
||||
if log_file:
|
||||
log_file.poke()
|
||||
assert not log_file.done
|
||||
log_file.poke()
|
||||
elif event.mask & Mask.IGNORED:
|
||||
counter_inotify_events.labels("ignored").inc()
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue