From f4befe5aeb0907f251e0dc3ac4c217f310770d36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Tue, 1 Nov 2022 15:51:26 +0200 Subject: [PATCH] Add explicit notify for secondaries --- bind-sidecar.py | 164 +++++++++++++++++++++++++++++------------------- 1 file changed, 98 insertions(+), 66 deletions(-) diff --git a/bind-sidecar.py b/bind-sidecar.py index 0831768..5ecb432 100755 --- a/bind-sidecar.py +++ b/bind-sidecar.py @@ -1,78 +1,113 @@ #!/usr/bin/env python3 import argparse import asyncio -import ecs_logging -import logging import os import signal import socket from kubernetes_asyncio.client.api_client import ApiClient +from kubernetes_asyncio.client.exceptions import ApiException from kubernetes_asyncio import client, config, watch -from time import time -parser = argparse.ArgumentParser() -parser.add_argument("--replica", action="store_true") -args = parser.parse_args() - -# Get the Logger -logger = logging.getLogger() -logger.setLevel(logging.INFO) - -# Add an ECS formatter to the Handler -handler = logging.StreamHandler() -handler.setFormatter(ecs_logging.StdlibFormatter()) -logger.addHandler(handler) PATH_CONFIG = "/var/bind/bind.conf" PATH_ZONEFILE = "/var/bind/db.%s" -active_zones = set() +class BindSidecar(object): + def __init__(self, replica=False): + self.replica = replica + self.update_task = None + self.active_zones = set() + self.secondaries = {} -def update_config(): - master = socket.gethostbyname("bind-primary") if args.replica else None - - with open(PATH_CONFIG + ".part", "w") as fh: - for zone in active_zones: - path = PATH_ZONEFILE % zone - if args.replica: - fh.write("zone \"%s\" { type slave; masters { %s key zone-transfer; }; };\n" % (zone, master)) - else: - fh.write("zone \"%s\" { type master; file \"%s\"; };\n" % (zone, path)) - os.rename(PATH_CONFIG + ".part", PATH_CONFIG) - logger.debug("File %s updated", PATH_CONFIG) - try: - with open("/var/bind/named.pid") as fh: - os.kill(int(fh.read()), signal.SIGHUP) - except FileNotFoundError: - logger.warn("File /var/bind/named.pid not found, assuming Bind not running yet") - - -async def update_loop(v1, apps_api, api_instance, revision): - flt = "codemowers.io", "v1alpha1", "bindzones" - method = api_instance.list_cluster_custom_object - w = watch.Watch() - latest_version = None - logger.info("Subscribing to updates") - async for event in w.stream(method, *flt, resource_version=latest_version): - latest_version = event["object"]["metadata"]["resourceVersion"] - if event["type"] == "ADDED": - zone = event["object"]["metadata"]["name"] - active_zones.add(zone) - if not args.replica: + async def update_config(self): + await asyncio.sleep(5) + master = socket.gethostbyname("bind-primary") if self.replica else None + with open(PATH_CONFIG + ".part", "w") as fh: + for zone in self.active_zones: path = PATH_ZONEFILE % zone - logger.info("Adding zone: %s", zone) - if not os.path.exists(path): - with open(path + ".part", "w") as fh: - fh.write("@ IN SOA ns1.%(zone)s. hostmaster.%(zone)s. (1 300 300 300 300)\n" % locals()) - fh.write(" NS ns1.%(zone)s.\n" % locals()) - fh.write("ns1 A 127.0.0.1\n") - os.rename(path + ".part", path) - update_config() - elif event["type"] == "DELETED": - logger.info("Removing zone: %s", zone) - active_zones.remove(zone) - os.unlink(PATH_ZONEFILE % event["object"]["metadata"]["name"]) + if self.replica: + fh.write("zone \"%s\" { type slave; masters { %s key zone-transfer; }; };\n" % (zone, master)) + else: + + also_notify = ("also-notify { %s };" % (" ".join(["%s;" % j for j in self.secondaries.values()]))) if self.secondaries else "" + fh.write("zone \"%s\" { type master; notify explicit; file \"%s\"; %s};\n" % (zone, path, also_notify)) + os.rename(PATH_CONFIG + ".part", PATH_CONFIG) + print("File", PATH_CONFIG, "updated") + try: + with open("/var/bind/named.pid") as fh: + os.kill(int(fh.read()), signal.SIGHUP) + except FileNotFoundError: + print("File /var/bind/named.pid not found, assuming Bind not running yet") + self.update_task = None + + def reschedule_update(self): + if self.update_task: + self.update_task.cancel() + self.update_task = asyncio.create_task(self.update_config()) + + async def watch_bind_zones(self, api_instance): + flt = "codemowers.io", "v1alpha1", "bindzones" + method = api_instance.list_cluster_custom_object + w = watch.Watch() + print("Subscribing to BindZone updates") + while True: + try: + async for event in w.stream(method, *flt): + if event["type"] == "ADDED": + zone = event["object"]["metadata"]["name"] + self.active_zones.add(zone) + if not self.replica: + path = PATH_ZONEFILE % zone + print("Adding zone: %s" % zone) + if not os.path.exists(path): + with open(path + ".part", "w") as fh: + fh.write("@ IN SOA ns1.%(zone)s. hostmaster.%(zone)s. (1 300 300 300 300)\n" % locals()) + fh.write(" NS ns1.%(zone)s.\n" % locals()) + fh.write("ns1 A 127.0.0.1\n") + os.rename(path + ".part", path) + self.reschedule_update() + elif event["type"] == "DELETED": + print("Removing zone: %s" % zone) + self.active_zones.remove(zone) + os.unlink(PATH_ZONEFILE % event["object"]["metadata"]["name"]) + self.reschedule_update() + except ApiException as e: + if e.status == 410: + print("BindZone watch expired") + else: + raise + except asyncio.exceptions.TimeoutError: + pass + + async def watch_bind_replica_pods(self, v1): + w = watch.Watch() + print("Subscribing to pod updates") + while True: + try: + async for event in w.stream(v1.list_namespaced_pod, "bind", label_selector="app=bind-secondary"): + if not event["object"].status.container_statuses: + continue + container_id = event["object"].status.container_statuses[0].container_id + if event["type"] in ("ADDED", "MODIFIED"): + if event["object"].status.pod_ip: + self.secondaries[container_id] = event["object"].status.pod_ip + print("Pod", event["object"].metadata.name, event["type"].lower(), "; current secondaries:", self.secondaries.values()) + self.reschedule_update() + except ApiException as e: + if e.status == 410: + print("Pod watch expired") + else: + raise + except asyncio.exceptions.TimeoutError: + pass + self.secondaries = {} + + async def run(self, v1, api_instance): + tasks = [self.watch_bind_zones(api_instance)] + if not self.replica: + tasks.append(self.watch_bind_replica_pods(v1)) + await asyncio.gather(*tasks) async def main(): @@ -80,16 +115,13 @@ async def main(): await config.load_kube_config() else: config.load_incluster_config() + parser = argparse.ArgumentParser() + parser.add_argument("--replica", action="store_true") + args = parser.parse_args() async with ApiClient() as api: v1 = client.CoreV1Api(api) - apps_api = client.AppsV1Api() api_instance = client.CustomObjectsApi(api) - revision = str(time()) - while True: - try: - await update_loop(v1, apps_api, api_instance, revision) - except asyncio.exceptions.TimeoutError: - pass + await BindSidecar(args.replica).run(v1, api_instance) if __name__ == "__main__":