From 2b2f838c20e9b1b18716d45d9a65aa86a91a1ff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Tue, 2 Aug 2022 16:01:21 +0300 Subject: [PATCH] Switch to Kubernetes Watch API --- camera-deployment.yml | 7 +- camera-operator.py | 165 +++++++++++++++++++++++++----------------- 2 files changed, 105 insertions(+), 67 deletions(-) diff --git a/camera-deployment.yml b/camera-deployment.yml index 6d5363b..04166fd 100644 --- a/camera-deployment.yml +++ b/camera-deployment.yml @@ -32,13 +32,16 @@ spec: containers: - name: camdetect image: harbor.k-space.ee/k-space/camera-motion-detect:latest + ports: + - containerPort: 5000 + name: "http" resources: requests: memory: "64Mi" - cpu: "250m" + cpu: "200m" limits: memory: "128Mi" - cpu: "500m" + cpu: "1" securityContext: readOnlyRootFilesystem: true runAsNonRoot: true diff --git a/camera-operator.py b/camera-operator.py index 3c87b76..aabc441 100644 --- a/camera-operator.py +++ b/camera-operator.py @@ -1,100 +1,135 @@ import asyncio +import os import yaml from base64 import b64decode from kubernetes_asyncio.client.api_client import ApiClient -from kubernetes_asyncio import client, config +from kubernetes_asyncio import client, config, watch from os import path from time import time from urllib.parse import urlparse +NAMESPACE = os.environ["MY_POD_NAMESPACE"] + LABEL_MANAGED_BY = "camera-operator" with open("camera-service.yml") as stream: SERVICE_BODY = stream.read() with open("camera-deployment.yml") as stream: DEPLOYMENT_BODY = stream.read() +async def apply_changes(item, v1, now, apps_api): + target = item["spec"]["target"] + replicas = item["spec"].get("replicas") + + # Pull in secrets for the target URL + secret_ref = item["spec"].get("secretRef") + if secret_ref: + secret = await v1.read_namespaced_secret(secret_ref, NAMESPACE) + username = b64decode(secret.data.get("username", b"")).decode("ascii") + password = b64decode(secret.data.get("password", b"")).decode("ascii") + o = urlparse(target) + netloc = o.netloc + if "@" in netloc: + _, netloc = o.netloc.split("@", 1) + target = o._replace(netloc="%s:%s@%s" % (username, password, netloc)).geturl() + + name = "camera-%s" % item["metadata"]["name"] + print("Applying changes for", name, "CRD") + + # Generate Deployment + body = yaml.safe_load(DEPLOYMENT_BODY.replace("foobar", name)) + body["metadata"]["labels"] ["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY + body["metadata"]["labels"] ["modified"] = now + body["spec"]["template"]["spec"]["containers"][0]["args"] = [target] + + if replicas: + body["spec"]["replicas"] = replicas + try: + await apps_api.replace_namespaced_deployment( + name = name, body = body, namespace=NAMESPACE) + print(" * Updated deployment %s/%s" % (NAMESPACE, name)) + except client.exceptions.ApiException as e: + await apps_api.create_namespaced_deployment( + body = body, namespace=NAMESPACE) + print(" * Created deployment %s/%s" % (NAMESPACE, name)) + + # Generate Service + body = yaml.safe_load(SERVICE_BODY.replace("foobar", name)) + body["metadata"]["labels"] ["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY + body["metadata"]["labels"] ["modified"] = now + try: + await v1.replace_namespaced_service( + name = name, body = body, namespace=NAMESPACE) + print(" * Updated service %s/%s" % (NAMESPACE, name)) + except client.exceptions.ApiException as e: + await v1.create_namespaced_service( + body = body, namespace=NAMESPACE) + print(" * Created service %s/%s" % (NAMESPACE, name)) + + async def main(): - - await config.load_kube_config() + if os.getenv("KUBECONFIG"): + await config.load_kube_config() + else: + await config.load_incluster_config() async with ApiClient() as api: - v1 = client.CoreV1Api(api) apps_api = client.AppsV1Api() - - print("Listing namespaces") - ret = await v1.list_namespace() api_instance = client.CustomObjectsApi(api) now = str(time()) - for i in ret.items: - try: - resp = await api_instance.list_namespaced_custom_object( - "k-space.ee", "v1alpha1", i.metadata.name, "cams") - except client.exceptions.ApiException: + w = watch.Watch() + args = "k-space.ee", "v1alpha1", NAMESPACE, "cams" + + resp = await api_instance.list_namespaced_custom_object(*args) + for i in resp["items"]: + await apply_changes(i, v1, now, apps_api) + + print("Cleaning up dangling deployments and services") + resp = await v1.list_namespaced_service(NAMESPACE) + for i in resp.items: + if not i.metadata.labels: continue - for item in resp["items"]: - target = item["spec"]["target"] - replicas = item["spec"].get("replicas") + if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: + continue + if i.metadata.labels.get("modified") == now: + continue + print(" * Removing service: %s/%s" % (NAMESPACE, i.metadata.name)) + await v1.delete_namespaced_service(i.metadata.name, NAMESPACE) - # Pull in secrets for the target URL - secret_ref = item["spec"].get("secretRef") - if secret_ref: - secret = await v1.read_namespaced_secret(secret_ref, i.metadata.name) - username = b64decode(secret.data.get("username", b"")).decode("ascii") - password = b64decode(secret.data.get("password", b"")).decode("ascii") - o = urlparse(target) - netloc = o.netloc - if "@" in netloc: - _, netloc = o.netloc.split("@", 1) - target = o._replace(netloc="%s:%s@%s" % (username, password, netloc)).geturl() + resp = await apps_api.list_namespaced_deployment(NAMESPACE) + for i in resp.items: + if not i.metadata.labels: + continue + if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: + continue + if i.metadata.labels.get("modified") == now: + continue + print(" * Removing deployment: %s/%s" % (NAMESPACE, i.metadata.name)) + await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE) - name = "camera-%s" % item["metadata"]["name"] - print("Applying", name) + print("Subscribing to updates") - # Generate Deployment - body = yaml.safe_load(DEPLOYMENT_BODY.replace("foobar", name)) - body["metadata"]["labels"] ["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY - body["metadata"]["labels"] ["modified"] = now - body["spec"]["template"]["spec"]["containers"][0]["args"] = [target] - - if replicas: - body["spec"]["replicas"] = replicas + async for event in w.stream(api_instance.list_namespaced_custom_object, *args): + if event["type"] == "ADDED": + await apply_changes(event["object"], v1, now, apps_api) + elif event["type"] == "DELETED": + name = "camera-%s" % event["object"]["metadata"]["name"] try: - await apps_api.replace_namespaced_deployment( - name = name, body = body, namespace=i.metadata.name) - print("Updated deployment %s/%s" % (i.metadata.name, name)) + await v1.delete_namespaced_service(name, NAMESPACE) except client.exceptions.ApiException as e: - await apps_api.create_namespaced_deployment( - body = body, namespace=i.metadata.name) - print("Created deployment %s/%s" % (i.metadata.name, name)) - - # Generate Service - body = yaml.safe_load(SERVICE_BODY.replace("foobar", name)) - body["metadata"]["labels"] ["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY - body["metadata"]["labels"] ["modified"] = now + pass + else: + print("Removed service: %s/%s" % (NAMESPACE, name)) try: - await v1.replace_namespaced_service( - name = name, body = body, namespace=i.metadata.name) - print("Updated service %s/%s" % (i.metadata.name, name)) + await apps_api.delete_namespaced_deployment(name, NAMESPACE) except client.exceptions.ApiException as e: - await v1.create_namespaced_service( - body = body, namespace=i.metadata.name) - print("Created service %s/%s" % (i.metadata.name, name)) + pass + else: + print("Removed deployment: %s/%s" % (NAMESPACE, name)) - deployments = await apps_api.list_deployment_for_all_namespaces() - for dep in deployments.items: - if not dep.metadata.labels: - continue - if dep.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: - continue - if dep.metadata.labels.get("modified") == now: - continue - print("Removing deployment: %s/%s" % (dep.metadata.namespace, dep.metadata.name)) - await apps_api.delete_namespaced_deployment(name=dep.metadata.name, namespace=dep.metadata.namespace) - print("Done") if __name__ == '__main__': - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() loop.run_until_complete(main()) loop.close()