#!/usr/bin/env python3 import asyncio import ecs_logging import logging import os import prometheus_async import random import string import yaml from prometheus_client import Counter, Gauge from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio.client.exceptions import ApiException from kubernetes_asyncio import client, config, watch from time import time import argparse # Get the Logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Add an ECS formatter to the Handler handler = logging.StreamHandler() handler.setFormatter(ecs_logging.StdlibFormatter()) logger.addHandler(handler) parser = argparse.ArgumentParser(description="Run handler for ClusterOperator CRD") parser.add_argument("--target", "-t", default="meta") args = parser.parse_args() MY_POD_NAMESPACE = os.environ["MY_POD_NAMESPACE"] LABEL_MANAGED_BY = "meta-operator" counter_cleanups = Counter( "meta_operator_cleanups", "Count of executed cleanups", ["stage"]) counter_opened_watch_streams = Counter( "meta_operator_opened_watch_streams", "Count of watch streams opened") gauge_watch_stream_begin_timestamp = Gauge( "meta_operator_watch_stream_begin_timestamp", "Timestamp of last watch stream open") gauge_resource_version = Gauge( "meta_operator_resource_version", "Last observed resource version", ["group", "version", "plural"]) async def cleanup(v1, apps_api, label_selector, delay=0): ls = ",".join(label_selector) if delay: await asyncio.sleep(delay) logger.info("Cleaning up %s" % ls) # Clean up services resp = await v1.list_namespaced_service("", label_selector=ls) for i in resp.items: logger.info("Removing service: %s/%s", i.metadata.namespace, i.metadata.name) await v1.delete_namespaced_service(i.metadata.name, i.metadata.namespace) # Clean up deployments resp = await apps_api.list_namespaced_deployment("", label_selector=ls) for i in resp.items: logger.info("Removing deployment: %s/%s", i.metadata.namespace, i.metadata.name) await apps_api.delete_namespaced_deployment(i.metadata.name, i.metadata.namespace) # Clean up statefulsets resp = await apps_api.list_namespaced_stateful_set("", label_selector=ls) for i in resp.items: logger.info("Removing statefulset: %s/%s", i.metadata.namespace, i.metadata.name) await apps_api.delete_namespaced_stateful_set(i.metadata.name, i.metadata.namespace) # Clean up configmaps resp = await v1.list_namespaced_config_map("", label_selector=ls) for i in resp.items: logger.info("Removing configmap: %s/%s", i.metadata.namespace, i.metadata.name) await v1.delete_namespaced_config_map(i.metadata.name, i.metadata.namespace) # Clean up secrets resp = await v1.list_namespaced_secret("", label_selector=ls) for i in resp.items: logger.info("Removing secret: %s/%s", i.metadata.namespace, i.metadata.name) await v1.delete_namespaced_secret(i.metadata.name, i.metadata.namespace) counter_cleanups.labels("executed").inc() async def apply_changes(operator, item, v1, revision, apps_api, api_instance): namespace = item["metadata"].get("namespace", MY_POD_NAMESPACE) name = item["metadata"]["name"] logger.info("Applying changes for %s in namespace %s", name, namespace) if namespace == "meta-operator" and name == "meta": logger.info("Not touching myself") return labels = { "codemowers.io/revision": revision, "codemowers.io/resource-name": name, "codemowers.io/resource-group": operator["spec"]["resource"]["group"], "codemowers.io/resource-version": operator["spec"]["resource"]["version"], "codemowers.io/resource-plural": operator["spec"]["resource"]["plural"], "app.kubernetes.io/managed-by": LABEL_MANAGED_BY, } replicas = item["spec"].get("replicas") if operator["spec"].get("secret", {}).get("enabled"): secret_name = operator["spec"]["secret"]["name"].replace("foobar", item["metadata"]["name"]) password = "".join([random.choice(string.ascii_letters + string.digits) for j in range(20)]) data = {} for o in operator["spec"]["secret"]["structure"]: data[o["key"]] = o["value"].replace("foobar", item["metadata"]["name"]) % password try: await v1.create_namespaced_secret(namespace, { "metadata": {"name": secret_name}, "stringData": data }) except client.exceptions.ApiException: logger.info("Secret already generated, adding labels") await v1.patch_namespaced_secret(secret_name, namespace, { "metadata": {"labels": labels} }) else: logger.info("Secret generation disabled") def transform_template(iterable): for i in iterable: body = yaml.safe_load(yaml.dump(i).replace("foobar", item["metadata"]["name"])) if "labels" not in body["metadata"]: body["metadata"]["labels"] = {} body["metadata"]["labels"].update(labels) yield body["metadata"]["name"], body # Generate Services for name, body in transform_template(operator["spec"].get("services", [])): try: await v1.replace_namespaced_service(name, namespace, body) logger.info("Updated service %s/%s", namespace, name) except client.exceptions.ApiException: await v1.create_namespaced_service(namespace, body) logger.info("Created service %s/%s", namespace, name) # Generate ConfigMaps for name, body in transform_template(operator["spec"].get("configmaps", [])): try: await v1.replace_namespaced_config_map(name, namespace, body) logger.info("Updated configmap %s/%s", namespace, name) except client.exceptions.ApiException: await v1.create_namespaced_config_map(namespace, body) logger.info("Created configmap %s/%s", namespace, name) # Generate Deployments for name, body in transform_template(operator["spec"].get("deployments", [])): if replicas: body["spec"]["replicas"] = replicas try: await apps_api.replace_namespaced_deployment(name, namespace, body) logger.info("Updated deployment %s/%s", namespace, name) except client.exceptions.ApiException: await apps_api.create_namespaced_deployment(namespace, body) logger.info("Created deployment %s/%s", namespace, name) # Generate statefulsets for name, body in transform_template(operator["spec"].get("statefulsets", [])): if replicas: body["spec"]["replicas"] = replicas try: await apps_api.replace_namespaced_stateful_set(name, namespace, body) logger.info("Updated statefulset %s/%s", namespace, name) except client.exceptions.ApiException: await apps_api.create_namespaced_stateful_set(namespace, body) logger.info("Created statefulset %s/%s", namespace, name) async def update_loop(v1, apps_api, api_instance, revision): logger.info("Managing resources for ClusterOperator named %s", repr(args.target)) operator = await api_instance.get_cluster_custom_object("codemowers.io", "v1alpha1", "clusteroperators", args.target) label_selector = ( "app.kubernetes.io/managed-by==%s" % LABEL_MANAGED_BY, "codemowers.io/resource-group==%s" % operator["spec"]["resource"]["group"], "codemowers.io/resource-version==%s" % operator["spec"]["resource"]["version"], "codemowers.io/resource-plural==%s" % operator["spec"]["resource"]["plural"] ) flt = operator["spec"]["resource"]["group"], operator["spec"]["resource"]["version"], \ "", operator["spec"]["resource"]["plural"] method = api_instance.list_namespaced_custom_object logger.info("Subscribing to %(plural)s.%(group)s/%(version)s updates", operator["spec"]["resource"]) w = watch.Watch() latest_version = None task = None counter_opened_watch_streams.inc() async for event in w.stream(method, *flt, resource_version=latest_version): latest_version = event["object"]["metadata"]["resourceVersion"] gauge_resource_version.labels( operator["spec"]["resource"]["group"], operator["spec"]["resource"]["version"], operator["spec"]["resource"]["plural"]).set(latest_version) if event["type"] == "ADDED": await apply_changes(operator, event["object"], v1, revision, apps_api, api_instance) if task: task.cancel() counter_cleanups.labels("postponed").inc() task = asyncio.create_task(cleanup(v1, apps_api, label_selector + ("codemowers.io/revision!=%s" % revision,), delay=2)) elif event["type"] == "DELETED": asyncio.create_task(cleanup(v1, apps_api, label_selector + ("codemowers.io/resource-name==%s" % event["object"]["metadata"]["name"],))) async def main(): if os.getenv("KUBECONFIG"): await config.load_kube_config() else: config.load_incluster_config() async with ApiClient() as api: v1 = client.CoreV1Api(api) apps_api = client.AppsV1Api() api_instance = client.CustomObjectsApi(api) revision = str(time()) while True: try: gauge_watch_stream_begin_timestamp.set(time()) await update_loop(v1, apps_api, api_instance, revision) except ApiException as e: logger.debug("Caught exception: %s", e) if e.status == 410: pass else: raise except asyncio.exceptions.TimeoutError: pass if __name__ == "__main__": loop = asyncio.new_event_loop() loop.create_task(prometheus_async.aio.web.start_http_server(port=5000)) loop.run_until_complete(main()) loop.close()