diff --git a/camera-operator.py b/camera-operator.py index a8834bd..0154f1f 100755 --- a/camera-operator.py +++ b/camera-operator.py @@ -10,7 +10,7 @@ from time import time from urllib.parse import urlparse import useful.logs -useful.logs.setup(json_fields={"msg": "message", "level": "levelname"}) +useful.logs.setup(json_fields={"msg": "message", "level": "levelname", "traceback": "exc_text"}) logger = logging.getLogger() NAMESPACE = os.environ["MY_POD_NAMESPACE"] @@ -22,6 +22,36 @@ with open("camera-deployment.yml") as stream: DEPLOYMENT_BODY = stream.read() +def object_managed(i, now): + """ + Check if Kubernetes object is managed according to CRD-s + """ + if not i.metadata.labels: + return False + if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: + return False + if i.metadata.labels.get("modified") == now: + return False + return True + + +async def cleanup_dangling_objects(v1, apps_api, now): + await asyncio.sleep(2) + logger.info("Cleaning up dangling deployments and services") + resp = await v1.list_namespaced_service(NAMESPACE) + for i in resp.items: + if not object_managed(i, now): + continue + logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name) + await v1.delete_namespaced_service(i.metadata.name, NAMESPACE) + resp = await apps_api.list_namespaced_deployment(NAMESPACE) + for i in resp.items: + if not object_managed(i, now): + continue + logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name) + await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE) + + async def apply_changes(item, v1, now, apps_api): target = item["spec"]["target"] replicas = item["spec"].get("replicas") @@ -68,6 +98,37 @@ async def apply_changes(item, v1, now, apps_api): logger.debug("Created service %s/%s", NAMESPACE, name) +async def update_loop(v1, apps_api, api_instance): + cleanup_task = None + latest_version = None + now = str(time()) + w = watch.Watch() + args = "k-space.ee", "v1alpha1", NAMESPACE, "cams" + logger.info("Subscribing to updates") + async for event in w.stream(api_instance.list_namespaced_custom_object, *args, resource_version=latest_version): + latest_version = event["object"]["metadata"]["resourceVersion"] + if event["type"] == "ADDED": + await apply_changes(event["object"], v1, now, apps_api) + elif event["type"] == "DELETED": + name = "camera-%s" % event["object"]["metadata"]["name"] + logger.info("Remove %s" % name) + try: + await v1.delete_namespaced_service(name, NAMESPACE) + except client.exceptions.ApiException: + pass + else: + logger.debug("Remove service: %s/%s", NAMESPACE, name) + try: + await apps_api.delete_namespaced_deployment(name, NAMESPACE) + except client.exceptions.ApiException: + pass + else: + logger.debug("Remove deployment: %s/%s", NAMESPACE, name) + if cleanup_task: + cleanup_task.cancel() + cleanup_task = asyncio.create_task(cleanup_dangling_objects(v1, apps_api, now)) + + async def main(): if os.getenv("KUBECONFIG"): await config.load_kube_config() @@ -77,57 +138,11 @@ async def main(): v1 = client.CoreV1Api(api) apps_api = client.AppsV1Api() api_instance = client.CustomObjectsApi(api) - now = str(time()) - - w = watch.Watch() - args = "k-space.ee", "v1alpha1", NAMESPACE, "cams" - - resp = await api_instance.list_namespaced_custom_object(*args) - for i in resp["items"]: - await apply_changes(i, v1, now, apps_api) - - logger.info("Cleaning up dangling deployments and services") - resp = await v1.list_namespaced_service(NAMESPACE) - for i in resp.items: - if not i.metadata.labels: - continue - if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: - continue - if i.metadata.labels.get("modified") == now: - continue - logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name) - await v1.delete_namespaced_service(i.metadata.name, NAMESPACE) - - resp = await apps_api.list_namespaced_deployment(NAMESPACE) - for i in resp.items: - if not i.metadata.labels: - continue - if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: - continue - if i.metadata.labels.get("modified") == now: - continue - logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name) - await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE) - - logger.info("Subscribing to updates") - - async for event in w.stream(api_instance.list_namespaced_custom_object, *args): - if event["type"] == "ADDED": - await apply_changes(event["object"], v1, now, apps_api) - elif event["type"] == "DELETED": - name = "camera-%s" % event["object"]["metadata"]["name"] - try: - await v1.delete_namespaced_service(name, NAMESPACE) - except client.exceptions.ApiException: - pass - else: - logger.debug("Removed service: %s/%s", NAMESPACE, name) - try: - await apps_api.delete_namespaced_deployment(name, NAMESPACE) - except client.exceptions.ApiException: - pass - else: - logger.debug("Removed deployment: %s/%s", NAMESPACE, name) + while True: + try: + await update_loop(v1, apps_api, api_instance) + except asyncio.exceptions.TimeoutError: + pass if __name__ == "__main__":