Add explicit notify for secondaries
All checks were successful
continuous-integration/drone Build is passing
All checks were successful
continuous-integration/drone Build is passing
This commit is contained in:
parent
7058855cfe
commit
f4befe5aeb
116
bind-sidecar.py
116
bind-sidecar.py
@ -1,78 +1,113 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import argparse
|
import argparse
|
||||||
import asyncio
|
import asyncio
|
||||||
import ecs_logging
|
|
||||||
import logging
|
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import socket
|
import socket
|
||||||
from kubernetes_asyncio.client.api_client import ApiClient
|
from kubernetes_asyncio.client.api_client import ApiClient
|
||||||
|
from kubernetes_asyncio.client.exceptions import ApiException
|
||||||
from kubernetes_asyncio import client, config, watch
|
from kubernetes_asyncio import client, config, watch
|
||||||
from time import time
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
|
||||||
parser.add_argument("--replica", action="store_true")
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
# Get the Logger
|
|
||||||
logger = logging.getLogger()
|
|
||||||
logger.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
# Add an ECS formatter to the Handler
|
|
||||||
handler = logging.StreamHandler()
|
|
||||||
handler.setFormatter(ecs_logging.StdlibFormatter())
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
PATH_CONFIG = "/var/bind/bind.conf"
|
PATH_CONFIG = "/var/bind/bind.conf"
|
||||||
PATH_ZONEFILE = "/var/bind/db.%s"
|
PATH_ZONEFILE = "/var/bind/db.%s"
|
||||||
|
|
||||||
active_zones = set()
|
|
||||||
|
|
||||||
|
class BindSidecar(object):
|
||||||
|
def __init__(self, replica=False):
|
||||||
|
self.replica = replica
|
||||||
|
self.update_task = None
|
||||||
|
self.active_zones = set()
|
||||||
|
self.secondaries = {}
|
||||||
|
|
||||||
def update_config():
|
async def update_config(self):
|
||||||
master = socket.gethostbyname("bind-primary") if args.replica else None
|
await asyncio.sleep(5)
|
||||||
|
master = socket.gethostbyname("bind-primary") if self.replica else None
|
||||||
with open(PATH_CONFIG + ".part", "w") as fh:
|
with open(PATH_CONFIG + ".part", "w") as fh:
|
||||||
for zone in active_zones:
|
for zone in self.active_zones:
|
||||||
path = PATH_ZONEFILE % zone
|
path = PATH_ZONEFILE % zone
|
||||||
if args.replica:
|
if self.replica:
|
||||||
fh.write("zone \"%s\" { type slave; masters { %s key zone-transfer; }; };\n" % (zone, master))
|
fh.write("zone \"%s\" { type slave; masters { %s key zone-transfer; }; };\n" % (zone, master))
|
||||||
else:
|
else:
|
||||||
fh.write("zone \"%s\" { type master; file \"%s\"; };\n" % (zone, path))
|
|
||||||
|
also_notify = ("also-notify { %s };" % (" ".join(["%s;" % j for j in self.secondaries.values()]))) if self.secondaries else ""
|
||||||
|
fh.write("zone \"%s\" { type master; notify explicit; file \"%s\"; %s};\n" % (zone, path, also_notify))
|
||||||
os.rename(PATH_CONFIG + ".part", PATH_CONFIG)
|
os.rename(PATH_CONFIG + ".part", PATH_CONFIG)
|
||||||
logger.debug("File %s updated", PATH_CONFIG)
|
print("File", PATH_CONFIG, "updated")
|
||||||
try:
|
try:
|
||||||
with open("/var/bind/named.pid") as fh:
|
with open("/var/bind/named.pid") as fh:
|
||||||
os.kill(int(fh.read()), signal.SIGHUP)
|
os.kill(int(fh.read()), signal.SIGHUP)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logger.warn("File /var/bind/named.pid not found, assuming Bind not running yet")
|
print("File /var/bind/named.pid not found, assuming Bind not running yet")
|
||||||
|
self.update_task = None
|
||||||
|
|
||||||
|
def reschedule_update(self):
|
||||||
|
if self.update_task:
|
||||||
|
self.update_task.cancel()
|
||||||
|
self.update_task = asyncio.create_task(self.update_config())
|
||||||
|
|
||||||
async def update_loop(v1, apps_api, api_instance, revision):
|
async def watch_bind_zones(self, api_instance):
|
||||||
flt = "codemowers.io", "v1alpha1", "bindzones"
|
flt = "codemowers.io", "v1alpha1", "bindzones"
|
||||||
method = api_instance.list_cluster_custom_object
|
method = api_instance.list_cluster_custom_object
|
||||||
w = watch.Watch()
|
w = watch.Watch()
|
||||||
latest_version = None
|
print("Subscribing to BindZone updates")
|
||||||
logger.info("Subscribing to updates")
|
while True:
|
||||||
async for event in w.stream(method, *flt, resource_version=latest_version):
|
try:
|
||||||
latest_version = event["object"]["metadata"]["resourceVersion"]
|
async for event in w.stream(method, *flt):
|
||||||
if event["type"] == "ADDED":
|
if event["type"] == "ADDED":
|
||||||
zone = event["object"]["metadata"]["name"]
|
zone = event["object"]["metadata"]["name"]
|
||||||
active_zones.add(zone)
|
self.active_zones.add(zone)
|
||||||
if not args.replica:
|
if not self.replica:
|
||||||
path = PATH_ZONEFILE % zone
|
path = PATH_ZONEFILE % zone
|
||||||
logger.info("Adding zone: %s", zone)
|
print("Adding zone: %s" % zone)
|
||||||
if not os.path.exists(path):
|
if not os.path.exists(path):
|
||||||
with open(path + ".part", "w") as fh:
|
with open(path + ".part", "w") as fh:
|
||||||
fh.write("@ IN SOA ns1.%(zone)s. hostmaster.%(zone)s. (1 300 300 300 300)\n" % locals())
|
fh.write("@ IN SOA ns1.%(zone)s. hostmaster.%(zone)s. (1 300 300 300 300)\n" % locals())
|
||||||
fh.write(" NS ns1.%(zone)s.\n" % locals())
|
fh.write(" NS ns1.%(zone)s.\n" % locals())
|
||||||
fh.write("ns1 A 127.0.0.1\n")
|
fh.write("ns1 A 127.0.0.1\n")
|
||||||
os.rename(path + ".part", path)
|
os.rename(path + ".part", path)
|
||||||
update_config()
|
self.reschedule_update()
|
||||||
elif event["type"] == "DELETED":
|
elif event["type"] == "DELETED":
|
||||||
logger.info("Removing zone: %s", zone)
|
print("Removing zone: %s" % zone)
|
||||||
active_zones.remove(zone)
|
self.active_zones.remove(zone)
|
||||||
os.unlink(PATH_ZONEFILE % event["object"]["metadata"]["name"])
|
os.unlink(PATH_ZONEFILE % event["object"]["metadata"]["name"])
|
||||||
|
self.reschedule_update()
|
||||||
|
except ApiException as e:
|
||||||
|
if e.status == 410:
|
||||||
|
print("BindZone watch expired")
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
except asyncio.exceptions.TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def watch_bind_replica_pods(self, v1):
|
||||||
|
w = watch.Watch()
|
||||||
|
print("Subscribing to pod updates")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
async for event in w.stream(v1.list_namespaced_pod, "bind", label_selector="app=bind-secondary"):
|
||||||
|
if not event["object"].status.container_statuses:
|
||||||
|
continue
|
||||||
|
container_id = event["object"].status.container_statuses[0].container_id
|
||||||
|
if event["type"] in ("ADDED", "MODIFIED"):
|
||||||
|
if event["object"].status.pod_ip:
|
||||||
|
self.secondaries[container_id] = event["object"].status.pod_ip
|
||||||
|
print("Pod", event["object"].metadata.name, event["type"].lower(), "; current secondaries:", self.secondaries.values())
|
||||||
|
self.reschedule_update()
|
||||||
|
except ApiException as e:
|
||||||
|
if e.status == 410:
|
||||||
|
print("Pod watch expired")
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
except asyncio.exceptions.TimeoutError:
|
||||||
|
pass
|
||||||
|
self.secondaries = {}
|
||||||
|
|
||||||
|
async def run(self, v1, api_instance):
|
||||||
|
tasks = [self.watch_bind_zones(api_instance)]
|
||||||
|
if not self.replica:
|
||||||
|
tasks.append(self.watch_bind_replica_pods(v1))
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
@ -80,16 +115,13 @@ async def main():
|
|||||||
await config.load_kube_config()
|
await config.load_kube_config()
|
||||||
else:
|
else:
|
||||||
config.load_incluster_config()
|
config.load_incluster_config()
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--replica", action="store_true")
|
||||||
|
args = parser.parse_args()
|
||||||
async with ApiClient() as api:
|
async with ApiClient() as api:
|
||||||
v1 = client.CoreV1Api(api)
|
v1 = client.CoreV1Api(api)
|
||||||
apps_api = client.AppsV1Api()
|
|
||||||
api_instance = client.CustomObjectsApi(api)
|
api_instance = client.CustomObjectsApi(api)
|
||||||
revision = str(time())
|
await BindSidecar(args.replica).run(v1, api_instance)
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
await update_loop(v1, apps_api, api_instance, revision)
|
|
||||||
except asyncio.exceptions.TimeoutError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user