camera-operator/camera-operator.py

152 lines
5.5 KiB
Python
Raw Permalink Normal View History

2022-08-02 13:08:19 +00:00
#!/usr/bin/env python3
2022-08-01 16:25:56 +00:00
import asyncio
2022-08-02 17:41:16 +00:00
import logging
2022-08-02 13:01:21 +00:00
import os
2022-08-01 16:25:56 +00:00
import yaml
2022-08-01 17:29:53 +00:00
from base64 import b64decode
2022-08-01 16:25:56 +00:00
from kubernetes_asyncio.client.api_client import ApiClient
2022-08-02 13:01:21 +00:00
from kubernetes_asyncio import client, config, watch
2022-08-01 16:25:56 +00:00
from time import time
2022-08-01 17:29:53 +00:00
from urllib.parse import urlparse
2022-08-02 17:41:16 +00:00
import useful.logs
2022-08-23 07:02:16 +00:00
useful.logs.setup(json_fields={"msg": "message", "level": "levelname", "traceback": "exc_text"})
2022-08-02 17:41:16 +00:00
logger = logging.getLogger()
2022-08-01 16:25:56 +00:00
2022-08-02 13:01:21 +00:00
NAMESPACE = os.environ["MY_POD_NAMESPACE"]
2022-08-01 16:25:56 +00:00
LABEL_MANAGED_BY = "camera-operator"
with open("camera-service.yml") as stream:
2022-08-03 04:50:30 +00:00
SERVICE_BODY = stream.read()
2022-08-01 16:25:56 +00:00
with open("camera-deployment.yml") as stream:
2022-08-03 04:50:30 +00:00
DEPLOYMENT_BODY = stream.read()
2022-08-01 16:25:56 +00:00
2022-08-23 07:02:16 +00:00
def object_managed(i, now):
"""
Check if Kubernetes object is managed according to CRD-s
"""
if not i.metadata.labels:
return False
if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY:
return False
if i.metadata.labels.get("modified") == now:
return False
return True
async def cleanup_dangling_objects(v1, apps_api, now):
await asyncio.sleep(2)
logger.info("Cleaning up dangling deployments and services")
resp = await v1.list_namespaced_service(NAMESPACE)
for i in resp.items:
if not object_managed(i, now):
continue
logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name)
await v1.delete_namespaced_service(i.metadata.name, NAMESPACE)
resp = await apps_api.list_namespaced_deployment(NAMESPACE)
for i in resp.items:
if not object_managed(i, now):
continue
logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name)
await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE)
2022-08-02 13:01:21 +00:00
async def apply_changes(item, v1, now, apps_api):
target = item["spec"]["target"]
replicas = item["spec"].get("replicas")
2022-08-01 16:25:56 +00:00
2022-08-02 13:01:21 +00:00
# Pull in secrets for the target URL
secret_ref = item["spec"].get("secretRef")
if secret_ref:
secret = await v1.read_namespaced_secret(secret_ref, NAMESPACE)
username = b64decode(secret.data.get("username", b"")).decode("ascii")
password = b64decode(secret.data.get("password", b"")).decode("ascii")
o = urlparse(target)
netloc = o.netloc
if "@" in netloc:
_, netloc = o.netloc.split("@", 1)
target = o._replace(netloc="%s:%s@%s" % (username, password, netloc)).geturl()
name = "camera-%s" % item["metadata"]["name"]
2022-08-02 17:41:16 +00:00
logger.info("Applying changes for %s", name)
2022-08-02 13:01:21 +00:00
# Generate Deployment
body = yaml.safe_load(DEPLOYMENT_BODY.replace("foobar", name))
2022-08-03 04:50:30 +00:00
body["metadata"]["labels"]["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY
body["metadata"]["labels"]["modified"] = now
2022-08-02 13:01:21 +00:00
body["spec"]["template"]["spec"]["containers"][0]["args"] = [target]
if replicas:
body["spec"]["replicas"] = replicas
try:
2022-08-03 04:50:30 +00:00
await apps_api.replace_namespaced_deployment(name, NAMESPACE, body)
2022-08-02 17:41:16 +00:00
logger.debug("Updated deployment %s/%s", NAMESPACE, name)
2022-08-03 04:50:30 +00:00
except client.exceptions.ApiException:
await apps_api.create_namespaced_deployment(NAMESPACE, body)
2022-08-02 17:41:16 +00:00
logger.debug("Created deployment %s/%s", NAMESPACE, name)
2022-08-01 16:25:56 +00:00
2022-08-02 13:01:21 +00:00
# Generate Service
body = yaml.safe_load(SERVICE_BODY.replace("foobar", name))
2022-08-03 04:50:30 +00:00
body["metadata"]["labels"]["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY
body["metadata"]["labels"]["modified"] = now
2022-08-02 13:01:21 +00:00
try:
2022-08-03 04:50:30 +00:00
await v1.replace_namespaced_service(name, NAMESPACE, body)
2022-08-02 17:41:16 +00:00
logger.debug("Updated service %s/%s", NAMESPACE, name)
2022-08-03 04:50:30 +00:00
except client.exceptions.ApiException:
await v1.create_namespaced_service(NAMESPACE, body)
2022-08-02 17:41:16 +00:00
logger.debug("Created service %s/%s", NAMESPACE, name)
2022-08-02 13:01:21 +00:00
2022-08-23 07:02:16 +00:00
async def update_loop(v1, apps_api, api_instance):
cleanup_task = None
latest_version = None
now = str(time())
w = watch.Watch()
args = "k-space.ee", "v1alpha1", NAMESPACE, "cams"
logger.info("Subscribing to updates")
async for event in w.stream(api_instance.list_namespaced_custom_object, *args, resource_version=latest_version):
latest_version = event["object"]["metadata"]["resourceVersion"]
if event["type"] == "ADDED":
await apply_changes(event["object"], v1, now, apps_api)
elif event["type"] == "DELETED":
name = "camera-%s" % event["object"]["metadata"]["name"]
logger.info("Remove %s" % name)
try:
await v1.delete_namespaced_service(name, NAMESPACE)
except client.exceptions.ApiException:
pass
else:
logger.debug("Remove service: %s/%s", NAMESPACE, name)
try:
await apps_api.delete_namespaced_deployment(name, NAMESPACE)
except client.exceptions.ApiException:
pass
else:
logger.debug("Remove deployment: %s/%s", NAMESPACE, name)
if cleanup_task:
cleanup_task.cancel()
cleanup_task = asyncio.create_task(cleanup_dangling_objects(v1, apps_api, now))
2022-08-02 13:01:21 +00:00
async def main():
2022-08-03 04:50:30 +00:00
if os.getenv("KUBECONFIG"):
await config.load_kube_config()
else:
config.load_incluster_config()
async with ApiClient() as api:
v1 = client.CoreV1Api(api)
apps_api = client.AppsV1Api()
api_instance = client.CustomObjectsApi(api)
2022-08-23 07:02:16 +00:00
while True:
try:
await update_loop(v1, apps_api, api_instance)
except asyncio.exceptions.TimeoutError:
pass
2022-08-03 04:50:30 +00:00
if __name__ == "__main__":
2022-08-02 13:01:21 +00:00
loop = asyncio.new_event_loop()
2022-08-01 16:25:56 +00:00
loop.run_until_complete(main())
loop.close()