9
0
Fork 0
bind-sidecar/bind-sidecar.py

131 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import asyncio
import os
import signal
import socket
from kubernetes_asyncio.client.api_client import ApiClient
from kubernetes_asyncio.client.exceptions import ApiException
from kubernetes_asyncio import client, config, watch
PATH_CONFIG = "/var/bind/bind.conf"
PATH_ZONEFILE = "/var/bind/db.%s"
class BindSidecar(object):
def __init__(self, replica=False):
self.replica = replica
self.update_task = None
self.active_zones = set()
self.secondaries = {}
async def update_config(self):
await asyncio.sleep(5)
master = socket.gethostbyname("bind-primary") if self.replica else None
with open(PATH_CONFIG + ".part", "w") as fh:
for zone in self.active_zones:
path = PATH_ZONEFILE % zone
if self.replica:
fh.write("zone \"%s\" { type slave; masters { %s key zone-transfer; }; };\n" % (zone, master))
else:
also_notify = ("also-notify { %s };" % (" ".join(["%s;" % j for j in self.secondaries.values()]))) if self.secondaries else ""
fh.write("zone \"%s\" { type master; notify explicit; file \"%s\"; %s};\n" % (zone, path, also_notify))
os.rename(PATH_CONFIG + ".part", PATH_CONFIG)
print("File", PATH_CONFIG, "updated")
try:
with open("/var/bind/named.pid") as fh:
os.kill(int(fh.read()), signal.SIGHUP)
except FileNotFoundError:
print("File /var/bind/named.pid not found, assuming Bind not running yet")
self.update_task = None
def reschedule_update(self):
if self.update_task:
self.update_task.cancel()
self.update_task = asyncio.create_task(self.update_config())
async def watch_bind_zones(self, api_instance):
flt = "codemowers.io", "v1alpha1", "bindzones"
method = api_instance.list_cluster_custom_object
w = watch.Watch()
print("Subscribing to BindZone updates")
while True:
try:
async for event in w.stream(method, *flt):
if event["type"] == "ADDED":
zone = event["object"]["metadata"]["name"]
self.active_zones.add(zone)
if not self.replica:
path = PATH_ZONEFILE % zone
print("Adding zone: %s" % zone)
if not os.path.exists(path):
with open(path + ".part", "w") as fh:
fh.write("@ IN SOA ns1.%(zone)s. hostmaster.%(zone)s. (1 300 300 2592000 300)\n" % locals())
fh.write(" NS ns1.%(zone)s.\n" % locals())
fh.write("ns1 A 127.0.0.1\n")
os.rename(path + ".part", path)
self.reschedule_update()
elif event["type"] == "DELETED":
print("Removing zone: %s" % zone)
self.active_zones.remove(zone)
os.unlink(PATH_ZONEFILE % event["object"]["metadata"]["name"])
self.reschedule_update()
except ApiException as e:
if e.status == 410:
print("BindZone watch expired")
else:
raise
except asyncio.exceptions.TimeoutError:
pass
async def watch_bind_replica_pods(self, v1):
w = watch.Watch()
print("Subscribing to pod updates")
while True:
try:
async for event in w.stream(v1.list_namespaced_pod, "bind", label_selector="app=bind-secondary"):
if not event["object"].status.container_statuses:
continue
container_id = event["object"].status.container_statuses[0].container_id
if event["type"] in ("ADDED", "MODIFIED"):
if event["object"].status.pod_ip:
self.secondaries[container_id] = event["object"].status.pod_ip
print("Pod", event["object"].metadata.name, event["type"].lower(), "; current secondaries:", self.secondaries.values())
self.reschedule_update()
except ApiException as e:
if e.status == 410:
print("Pod watch expired")
else:
raise
except asyncio.exceptions.TimeoutError:
pass
self.secondaries = {}
async def run(self, v1, api_instance):
tasks = [self.watch_bind_zones(api_instance)]
if not self.replica:
tasks.append(self.watch_bind_replica_pods(v1))
await asyncio.gather(*tasks)
async def main():
if os.getenv("KUBECONFIG"):
await config.load_kube_config()
else:
config.load_incluster_config()
parser = argparse.ArgumentParser()
parser.add_argument("--replica", action="store_true")
args = parser.parse_args()
async with ApiClient() as api:
v1 = client.CoreV1Api(api)
api_instance = client.CustomObjectsApi(api)
await BindSidecar(args.replica).run(v1, api_instance)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()