diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..aab2253 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,6 @@ +repos: +- repo: https://github.com/PyCQA/flake8 + rev: 3.9.2 + hooks: + - id: flake8 + additional_dependencies: [flake8-typing-imports==1.10.0,flake8-quotes==3.2.0] diff --git a/README.md b/README.md new file mode 100644 index 0000000..d0e9496 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +To run locally with suitable `KUBECONFIG`: + +``` +MY_POD_NAMESPACE=camtiler ./camera-operator.py +``` + diff --git a/camera-operator.py b/camera-operator.py index 0fd9f22..a8834bd 100755 --- a/camera-operator.py +++ b/camera-operator.py @@ -6,22 +6,21 @@ import yaml from base64 import b64decode from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio import client, config, watch -from os import path from time import time from urllib.parse import urlparse import useful.logs -useful.logs.setup( - json_fields={"msg":"message", "level": "levelname"}) +useful.logs.setup(json_fields={"msg": "message", "level": "levelname"}) logger = logging.getLogger() NAMESPACE = os.environ["MY_POD_NAMESPACE"] LABEL_MANAGED_BY = "camera-operator" with open("camera-service.yml") as stream: - SERVICE_BODY = stream.read() + SERVICE_BODY = stream.read() with open("camera-deployment.yml") as stream: - DEPLOYMENT_BODY = stream.read() + DEPLOYMENT_BODY = stream.read() + async def apply_changes(item, v1, now, apps_api): target = item["spec"]["target"] @@ -44,99 +43,94 @@ async def apply_changes(item, v1, now, apps_api): # Generate Deployment body = yaml.safe_load(DEPLOYMENT_BODY.replace("foobar", name)) - body["metadata"]["labels"] ["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY - body["metadata"]["labels"] ["modified"] = now + body["metadata"]["labels"]["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY + body["metadata"]["labels"]["modified"] = now body["spec"]["template"]["spec"]["containers"][0]["args"] = [target] if replicas: body["spec"]["replicas"] = replicas try: - await apps_api.replace_namespaced_deployment( - name = name, body = body, namespace=NAMESPACE) + await apps_api.replace_namespaced_deployment(name, NAMESPACE, body) logger.debug("Updated deployment %s/%s", NAMESPACE, name) - except client.exceptions.ApiException as e: - await apps_api.create_namespaced_deployment( - body = body, namespace=NAMESPACE) + except client.exceptions.ApiException: + await apps_api.create_namespaced_deployment(NAMESPACE, body) logger.debug("Created deployment %s/%s", NAMESPACE, name) # Generate Service body = yaml.safe_load(SERVICE_BODY.replace("foobar", name)) - body["metadata"]["labels"] ["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY - body["metadata"]["labels"] ["modified"] = now + body["metadata"]["labels"]["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY + body["metadata"]["labels"]["modified"] = now try: - await v1.replace_namespaced_service( - name = name, body = body, namespace=NAMESPACE) + await v1.replace_namespaced_service(name, NAMESPACE, body) logger.debug("Updated service %s/%s", NAMESPACE, name) - except client.exceptions.ApiException as e: - await v1.create_namespaced_service( - body = body, namespace=NAMESPACE) + except client.exceptions.ApiException: + await v1.create_namespaced_service(NAMESPACE, body) logger.debug("Created service %s/%s", NAMESPACE, name) async def main(): - if os.getenv("KUBECONFIG"): - await config.load_kube_config() - else: - config.load_incluster_config() - async with ApiClient() as api: - v1 = client.CoreV1Api(api) - apps_api = client.AppsV1Api() - api_instance = client.CustomObjectsApi(api) - now = str(time()) + if os.getenv("KUBECONFIG"): + await config.load_kube_config() + else: + config.load_incluster_config() + async with ApiClient() as api: + v1 = client.CoreV1Api(api) + apps_api = client.AppsV1Api() + api_instance = client.CustomObjectsApi(api) + now = str(time()) - w = watch.Watch() - args = "k-space.ee", "v1alpha1", NAMESPACE, "cams" + w = watch.Watch() + args = "k-space.ee", "v1alpha1", NAMESPACE, "cams" - resp = await api_instance.list_namespaced_custom_object(*args) - for i in resp["items"]: - await apply_changes(i, v1, now, apps_api) + resp = await api_instance.list_namespaced_custom_object(*args) + for i in resp["items"]: + await apply_changes(i, v1, now, apps_api) - logger.info("Cleaning up dangling deployments and services") - resp = await v1.list_namespaced_service(NAMESPACE) - for i in resp.items: - if not i.metadata.labels: - continue - if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: - continue - if i.metadata.labels.get("modified") == now: - continue - logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name) - await v1.delete_namespaced_service(i.metadata.name, NAMESPACE) + logger.info("Cleaning up dangling deployments and services") + resp = await v1.list_namespaced_service(NAMESPACE) + for i in resp.items: + if not i.metadata.labels: + continue + if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: + continue + if i.metadata.labels.get("modified") == now: + continue + logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name) + await v1.delete_namespaced_service(i.metadata.name, NAMESPACE) - resp = await apps_api.list_namespaced_deployment(NAMESPACE) - for i in resp.items: - if not i.metadata.labels: - continue - if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: - continue - if i.metadata.labels.get("modified") == now: - continue - logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name) - await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE) + resp = await apps_api.list_namespaced_deployment(NAMESPACE) + for i in resp.items: + if not i.metadata.labels: + continue + if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: + continue + if i.metadata.labels.get("modified") == now: + continue + logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name) + await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE) - logger.info("Subscribing to updates") + logger.info("Subscribing to updates") - async for event in w.stream(api_instance.list_namespaced_custom_object, *args): - if event["type"] == "ADDED": - await apply_changes(event["object"], v1, now, apps_api) - elif event["type"] == "DELETED": - name = "camera-%s" % event["object"]["metadata"]["name"] - try: - await v1.delete_namespaced_service(name, NAMESPACE) - except client.exceptions.ApiException as e: - pass - else: - logger.debug("Removed service: %s/%s", NAMESPACE, name) - try: - await apps_api.delete_namespaced_deployment(name, NAMESPACE) - except client.exceptions.ApiException as e: - pass - else: - logger.debug("Removed deployment: %s/%s", NAMESPACE, name) + async for event in w.stream(api_instance.list_namespaced_custom_object, *args): + if event["type"] == "ADDED": + await apply_changes(event["object"], v1, now, apps_api) + elif event["type"] == "DELETED": + name = "camera-%s" % event["object"]["metadata"]["name"] + try: + await v1.delete_namespaced_service(name, NAMESPACE) + except client.exceptions.ApiException: + pass + else: + logger.debug("Removed service: %s/%s", NAMESPACE, name) + try: + await apps_api.delete_namespaced_deployment(name, NAMESPACE) + except client.exceptions.ApiException: + pass + else: + logger.debug("Removed deployment: %s/%s", NAMESPACE, name) -if __name__ == '__main__': +if __name__ == "__main__": loop = asyncio.new_event_loop() loop.run_until_complete(main()) loop.close() -