#!/usr/bin/env python3 import asyncio import logging import os import yaml from base64 import b64decode from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio import client, config, watch from time import time from urllib.parse import urlparse import useful.logs useful.logs.setup(json_fields={"msg": "message", "level": "levelname", "traceback": "exc_text"}) logger = logging.getLogger() NAMESPACE = os.environ["MY_POD_NAMESPACE"] LABEL_MANAGED_BY = "camera-operator" with open("camera-service.yml") as stream: SERVICE_BODY = stream.read() with open("camera-deployment.yml") as stream: DEPLOYMENT_BODY = stream.read() def object_managed(i, now): """ Check if Kubernetes object is managed according to CRD-s """ if not i.metadata.labels: return False if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: return False if i.metadata.labels.get("modified") == now: return False return True async def cleanup_dangling_objects(v1, apps_api, now): await asyncio.sleep(2) logger.info("Cleaning up dangling deployments and services") resp = await v1.list_namespaced_service(NAMESPACE) for i in resp.items: if not object_managed(i, now): continue logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name) await v1.delete_namespaced_service(i.metadata.name, NAMESPACE) resp = await apps_api.list_namespaced_deployment(NAMESPACE) for i in resp.items: if not object_managed(i, now): continue logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name) await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE) async def apply_changes(item, v1, now, apps_api): target = item["spec"]["target"] replicas = item["spec"].get("replicas") # Pull in secrets for the target URL secret_ref = item["spec"].get("secretRef") if secret_ref: secret = await v1.read_namespaced_secret(secret_ref, NAMESPACE) username = b64decode(secret.data.get("username", b"")).decode("ascii") password = b64decode(secret.data.get("password", b"")).decode("ascii") o = urlparse(target) netloc = o.netloc if "@" in netloc: _, netloc = o.netloc.split("@", 1) target = o._replace(netloc="%s:%s@%s" % (username, password, netloc)).geturl() name = "camera-%s" % item["metadata"]["name"] logger.info("Applying changes for %s", name) # Generate Deployment body = yaml.safe_load(DEPLOYMENT_BODY.replace("foobar", name)) body["metadata"]["labels"]["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY body["metadata"]["labels"]["modified"] = now body["spec"]["template"]["spec"]["containers"][0]["args"] = [target] if replicas: body["spec"]["replicas"] = replicas try: await apps_api.replace_namespaced_deployment(name, NAMESPACE, body) logger.debug("Updated deployment %s/%s", NAMESPACE, name) except client.exceptions.ApiException: await apps_api.create_namespaced_deployment(NAMESPACE, body) logger.debug("Created deployment %s/%s", NAMESPACE, name) # Generate Service body = yaml.safe_load(SERVICE_BODY.replace("foobar", name)) body["metadata"]["labels"]["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY body["metadata"]["labels"]["modified"] = now try: await v1.replace_namespaced_service(name, NAMESPACE, body) logger.debug("Updated service %s/%s", NAMESPACE, name) except client.exceptions.ApiException: await v1.create_namespaced_service(NAMESPACE, body) logger.debug("Created service %s/%s", NAMESPACE, name) async def update_loop(v1, apps_api, api_instance): cleanup_task = None latest_version = None now = str(time()) w = watch.Watch() args = "k-space.ee", "v1alpha1", NAMESPACE, "cams" logger.info("Subscribing to updates") async for event in w.stream(api_instance.list_namespaced_custom_object, *args, resource_version=latest_version): latest_version = event["object"]["metadata"]["resourceVersion"] if event["type"] == "ADDED": await apply_changes(event["object"], v1, now, apps_api) elif event["type"] == "DELETED": name = "camera-%s" % event["object"]["metadata"]["name"] logger.info("Remove %s" % name) try: await v1.delete_namespaced_service(name, NAMESPACE) except client.exceptions.ApiException: pass else: logger.debug("Remove service: %s/%s", NAMESPACE, name) try: await apps_api.delete_namespaced_deployment(name, NAMESPACE) except client.exceptions.ApiException: pass else: logger.debug("Remove deployment: %s/%s", NAMESPACE, name) if cleanup_task: cleanup_task.cancel() cleanup_task = asyncio.create_task(cleanup_dangling_objects(v1, apps_api, now)) async def main(): if os.getenv("KUBECONFIG"): await config.load_kube_config() else: config.load_incluster_config() async with ApiClient() as api: v1 = client.CoreV1Api(api) apps_api = client.AppsV1Api() api_instance = client.CustomObjectsApi(api) while True: try: await update_loop(v1, apps_api, api_instance) except asyncio.exceptions.TimeoutError: pass if __name__ == "__main__": loop = asyncio.new_event_loop() loop.run_until_complete(main()) loop.close()