commit 088112bcd4a1c7c5dc321808866d17accf6e27f8 Author: Lauri Võsandi Date: Sun Aug 28 09:00:00 2022 +0300 Initial commit diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..e7b5dfa --- /dev/null +++ b/.drone.yml @@ -0,0 +1,2 @@ +kind: template +load: docker.yaml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..30d1cea --- /dev/null +++ b/Dockerfile @@ -0,0 +1,3 @@ +FROM harbor.k-space.ee/k-space/microservice-base +ADD meta-operator.py / +ENTRYPOINT /meta-operator.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..a93426c --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# Meta operator + +Meta operator allows creating operators without building any binaries diff --git a/meta-operator.py b/meta-operator.py new file mode 100755 index 0000000..b705c71 --- /dev/null +++ b/meta-operator.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +import asyncio +import logging +import os +import random +import string +import yaml +from kubernetes_asyncio.client.api_client import ApiClient +from kubernetes_asyncio import client, config, watch +from time import time +import useful.logs +import argparse + +parser = argparse.ArgumentParser(description="Run handler for ClusterOperator CRD") +parser.add_argument("--target", "-t", default="meta") +args = parser.parse_args() + +useful.logs.setup(json_fields={"msg": "message", "level": "levelname", "traceback": "exc_text"}) +logger = logging.getLogger() + +MY_POD_NAMESPACE = os.environ["MY_POD_NAMESPACE"] +LABEL_MANAGED_BY = "meta-operator" + + +async def cleanup(v1, apps_api, label_selector, delay=0): + ls = ",".join(label_selector) + if delay: + await asyncio.sleep(delay) + logger.info("Cleaning up %s" % ls) + + # Clean up services + resp = await v1.list_namespaced_service("", label_selector=ls) + for i in resp.items: + logger.info("Removing service: %s/%s", i.metadata.namespace, i.metadata.name) + await v1.delete_namespaced_service(i.metadata.name, i.metadata.namespace) + + # Clean up deployments + resp = await apps_api.list_namespaced_deployment("", label_selector=ls) + for i in resp.items: + logger.info("Removing deployment: %s/%s", i.metadata.namespace, i.metadata.name) + await apps_api.delete_namespaced_deployment(i.metadata.name, i.metadata.namespace) + + # Clean up statefulsets + resp = await apps_api.list_namespaced_stateful_set("", label_selector=ls) + for i in resp.items: + logger.info("Removing statefulset: %s/%s", i.metadata.namespace, i.metadata.name) + await apps_api.delete_namespaced_stateful_set(i.metadata.name, i.metadata.namespace) + + # Clean up configmaps + resp = await v1.list_namespaced_config_map("", label_selector=ls) + for i in resp.items: + logger.info("Removing configmap: %s/%s", i.metadata.namespace, i.metadata.name) + await v1.delete_namespaced_config_map(i.metadata.name, i.metadata.namespace) + + # Clean up secrets + resp = await v1.list_namespaced_secret("", label_selector=ls) + for i in resp.items: + logger.info("Removing secret: %s/%s", i.metadata.namespace, i.metadata.name) + await v1.delete_namespaced_secret(i.metadata.name, i.metadata.namespace) + + +async def apply_changes(operator, item, v1, revision, apps_api, api_instance): + namespace = item["metadata"].get("namespace", MY_POD_NAMESPACE) + name = item["metadata"]["name"] + logger.info("Applying changes for %s in namespace %s", name, namespace) + if namespace == "meta-operator" and name == "meta": + logger.info("Not touching myself") + return + labels = { + "codemowers.io/revision": revision, + "codemowers.io/resource-name": name, + "codemowers.io/resource-group": operator["spec"]["resource"]["group"], + "codemowers.io/resource-version": operator["spec"]["resource"]["version"], + "codemowers.io/resource-plural": operator["spec"]["resource"]["plural"], + "app.kubernetes.io/managed-by": LABEL_MANAGED_BY, + } + replicas = item["spec"].get("replicas") + if operator["spec"].get("secret", {}).get("enabled"): + secret_name = operator["spec"]["secret"]["name"].replace("foobar", item["metadata"]["name"]) + password = "".join([random.choice(string.ascii_letters + string.digits) for j in range(20)]) + data = {} + for o in operator["spec"]["secret"]["structure"]: + data[o["key"]] = o["value"].replace("foobar", item["metadata"]["name"]) % password + try: + await v1.create_namespaced_secret(namespace, { + "metadata": {"name": secret_name}, + "stringData": data + }) + except client.exceptions.ApiException: + logger.info("Secret already generated, adding labels") + await v1.patch_namespaced_secret(secret_name, namespace, { + "metadata": {"labels": labels} + }) + else: + logger.info("Secret generation disabled") + + def transform_template(iterable): + for i in iterable: + body = yaml.safe_load(yaml.dump(i).replace("foobar", item["metadata"]["name"])) + if "labels" not in body["metadata"]: + body["metadata"]["labels"] = {} + body["metadata"]["labels"].update(labels) + yield body["metadata"]["name"], body + + # Generate Services + for name, body in transform_template(operator["spec"].get("services", [])): + try: + await v1.patch_namespaced_service(name, namespace, body) + logger.info("Updated service %s/%s", namespace, name) + except client.exceptions.ApiException: + await v1.create_namespaced_service(namespace, body) + logger.info("Created service %s/%s", namespace, name) + + # Generate ConfigMaps + for name, body in transform_template(operator["spec"].get("configmaps", [])): + try: + await v1.patch_namespaced_config_map(name, namespace, body) + logger.info("Updated configmap %s/%s", namespace, name) + except client.exceptions.ApiException: + await v1.create_namespaced_config_map(namespace, body) + logger.info("Created configmap %s/%s", namespace, name) + + # Generate Deployments + for name, body in transform_template(operator["spec"].get("deployments", [])): + if replicas: + body["spec"]["replicas"] = replicas + try: + await apps_api.patch_namespaced_deployment(name, namespace, body) + logger.info("Updated deployment %s/%s", namespace, name) + except client.exceptions.ApiException: + await apps_api.create_namespaced_deployment(namespace, body) + logger.info("Created deployment %s/%s", namespace, name) + + # Generate statefulsets + for name, body in transform_template(operator["spec"].get("statefulsets", [])): + if replicas: + body["spec"]["replicas"] = replicas + try: + await apps_api.patch_namespaced_stateful_set(name, namespace, body) + logger.info("Updated statefulset %s/%s", namespace, name) + except client.exceptions.ApiException: + await apps_api.create_namespaced_stateful_set(namespace, body) + logger.info("Created statefulset %s/%s", namespace, name) + + +async def update_loop(v1, apps_api, api_instance, revision): + logger.info("Managing resources for ClusterOperator named %s", repr(args.target)) + operator = await api_instance.get_cluster_custom_object("codemowers.io", "v1alpha1", "clusteroperators", args.target) + label_selector = ( + "app.kubernetes.io/managed-by==%s" % LABEL_MANAGED_BY, + "codemowers.io/resource-group==%s" % operator["spec"]["resource"]["group"], + "codemowers.io/resource-version==%s" % operator["spec"]["resource"]["version"], + "codemowers.io/resource-plural==%s" % operator["spec"]["resource"]["plural"] + ) + flt = operator["spec"]["resource"]["group"], operator["spec"]["resource"]["version"], \ + "", operator["spec"]["resource"]["plural"] + + method = api_instance.list_namespaced_custom_object + logger.info("Subscribing to %(plural)s.%(group)s/%(version)s updates", operator["spec"]["resource"]) + w = watch.Watch() + latest_version = None + + task = None + + async for event in w.stream(method, *flt, resource_version=latest_version): + latest_version = event["object"]["metadata"]["resourceVersion"] + if event["type"] == "ADDED": + await apply_changes(operator, event["object"], v1, revision, apps_api, api_instance) + if task: + task.cancel() + task = asyncio.create_task(cleanup(v1, apps_api, + label_selector + ("codemowers.io/revision!=%s" % revision,), + delay=2)) + elif event["type"] == "DELETED": + asyncio.create_task(cleanup(v1, apps_api, + label_selector + ("codemowers.io/resource-name==%s" % event["object"]["metadata"]["name"],))) + + +async def main(): + if os.getenv("KUBECONFIG"): + await config.load_kube_config() + else: + config.load_incluster_config() + async with ApiClient() as api: + v1 = client.CoreV1Api(api) + apps_api = client.AppsV1Api() + api_instance = client.CustomObjectsApi(api) + revision = str(time()) + while True: + try: + await update_loop(v1, apps_api, api_instance, revision) + except asyncio.exceptions.TimeoutError: + pass + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close()