This commit is contained in:
		| @@ -10,7 +10,7 @@ from time import time | ||||
| from urllib.parse import urlparse | ||||
| import useful.logs | ||||
|  | ||||
| useful.logs.setup(json_fields={"msg": "message", "level": "levelname"}) | ||||
| useful.logs.setup(json_fields={"msg": "message", "level": "levelname", "traceback": "exc_text"}) | ||||
| logger = logging.getLogger() | ||||
|  | ||||
| NAMESPACE = os.environ["MY_POD_NAMESPACE"] | ||||
| @@ -22,6 +22,36 @@ with open("camera-deployment.yml") as stream: | ||||
|     DEPLOYMENT_BODY = stream.read() | ||||
|  | ||||
|  | ||||
| def object_managed(i, now): | ||||
|     """ | ||||
|     Check if Kubernetes object is managed according to CRD-s | ||||
|     """ | ||||
|     if not i.metadata.labels: | ||||
|         return False | ||||
|     if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: | ||||
|         return False | ||||
|     if i.metadata.labels.get("modified") == now: | ||||
|         return False | ||||
|     return True | ||||
|  | ||||
|  | ||||
| async def cleanup_dangling_objects(v1, apps_api, now): | ||||
|     await asyncio.sleep(2) | ||||
|     logger.info("Cleaning up dangling deployments and services") | ||||
|     resp = await v1.list_namespaced_service(NAMESPACE) | ||||
|     for i in resp.items: | ||||
|         if not object_managed(i, now): | ||||
|             continue | ||||
|         logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name) | ||||
|         await v1.delete_namespaced_service(i.metadata.name, NAMESPACE) | ||||
|     resp = await apps_api.list_namespaced_deployment(NAMESPACE) | ||||
|     for i in resp.items: | ||||
|         if not object_managed(i, now): | ||||
|             continue | ||||
|         logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name) | ||||
|         await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE) | ||||
|  | ||||
|  | ||||
| async def apply_changes(item, v1, now, apps_api): | ||||
|     target = item["spec"]["target"] | ||||
|     replicas = item["spec"].get("replicas") | ||||
| @@ -68,6 +98,37 @@ async def apply_changes(item, v1, now, apps_api): | ||||
|         logger.debug("Created service %s/%s", NAMESPACE, name) | ||||
|  | ||||
|  | ||||
| async def update_loop(v1, apps_api, api_instance): | ||||
|     cleanup_task = None | ||||
|     latest_version = None | ||||
|     now = str(time()) | ||||
|     w = watch.Watch() | ||||
|     args = "k-space.ee", "v1alpha1", NAMESPACE, "cams" | ||||
|     logger.info("Subscribing to updates") | ||||
|     async for event in w.stream(api_instance.list_namespaced_custom_object, *args, resource_version=latest_version): | ||||
|         latest_version = event["object"]["metadata"]["resourceVersion"] | ||||
|         if event["type"] == "ADDED": | ||||
|             await apply_changes(event["object"], v1, now, apps_api) | ||||
|         elif event["type"] == "DELETED": | ||||
|             name = "camera-%s" % event["object"]["metadata"]["name"] | ||||
|             logger.info("Remove %s" % name) | ||||
|             try: | ||||
|                 await v1.delete_namespaced_service(name, NAMESPACE) | ||||
|             except client.exceptions.ApiException: | ||||
|                 pass | ||||
|             else: | ||||
|                 logger.debug("Remove service: %s/%s", NAMESPACE, name) | ||||
|             try: | ||||
|                 await apps_api.delete_namespaced_deployment(name, NAMESPACE) | ||||
|             except client.exceptions.ApiException: | ||||
|                 pass | ||||
|             else: | ||||
|                 logger.debug("Remove deployment: %s/%s", NAMESPACE, name) | ||||
|         if cleanup_task: | ||||
|             cleanup_task.cancel() | ||||
|         cleanup_task = asyncio.create_task(cleanup_dangling_objects(v1, apps_api, now)) | ||||
|  | ||||
|  | ||||
| async def main(): | ||||
|     if os.getenv("KUBECONFIG"): | ||||
|         await config.load_kube_config() | ||||
| @@ -77,57 +138,11 @@ async def main(): | ||||
|         v1 = client.CoreV1Api(api) | ||||
|         apps_api = client.AppsV1Api() | ||||
|         api_instance = client.CustomObjectsApi(api) | ||||
|         now = str(time()) | ||||
|  | ||||
|         w = watch.Watch() | ||||
|         args = "k-space.ee", "v1alpha1", NAMESPACE, "cams" | ||||
|  | ||||
|         resp = await api_instance.list_namespaced_custom_object(*args) | ||||
|         for i in resp["items"]: | ||||
|             await apply_changes(i, v1, now, apps_api) | ||||
|  | ||||
|         logger.info("Cleaning up dangling deployments and services") | ||||
|         resp = await v1.list_namespaced_service(NAMESPACE) | ||||
|         for i in resp.items: | ||||
|             if not i.metadata.labels: | ||||
|                 continue | ||||
|             if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: | ||||
|                 continue | ||||
|             if i.metadata.labels.get("modified") == now: | ||||
|                 continue | ||||
|             logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name) | ||||
|             await v1.delete_namespaced_service(i.metadata.name, NAMESPACE) | ||||
|  | ||||
|         resp = await apps_api.list_namespaced_deployment(NAMESPACE) | ||||
|         for i in resp.items: | ||||
|             if not i.metadata.labels: | ||||
|                 continue | ||||
|             if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: | ||||
|                 continue | ||||
|             if i.metadata.labels.get("modified") == now: | ||||
|                 continue | ||||
|             logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name) | ||||
|             await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE) | ||||
|  | ||||
|         logger.info("Subscribing to updates") | ||||
|  | ||||
|         async for event in w.stream(api_instance.list_namespaced_custom_object, *args): | ||||
|             if event["type"] == "ADDED": | ||||
|                 await apply_changes(event["object"], v1, now, apps_api) | ||||
|             elif event["type"] == "DELETED": | ||||
|                 name = "camera-%s" % event["object"]["metadata"]["name"] | ||||
|                 try: | ||||
|                     await v1.delete_namespaced_service(name, NAMESPACE) | ||||
|                 except client.exceptions.ApiException: | ||||
|                     pass | ||||
|                 else: | ||||
|                     logger.debug("Removed service: %s/%s", NAMESPACE, name) | ||||
|                 try: | ||||
|                     await apps_api.delete_namespaced_deployment(name, NAMESPACE) | ||||
|                 except client.exceptions.ApiException: | ||||
|                     pass | ||||
|                 else: | ||||
|                     logger.debug("Removed deployment: %s/%s", NAMESPACE, name) | ||||
|         while True: | ||||
|             try: | ||||
|                 await update_loop(v1, apps_api, api_instance) | ||||
|             except asyncio.exceptions.TimeoutError: | ||||
|                 pass | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|   | ||||
		Reference in New Issue
	
	Block a user