Switch to Kubernetes Watch API
All checks were successful
continuous-integration/drone Build is passing

This commit is contained in:
Lauri Võsandi 2022-08-02 16:01:21 +03:00
parent 02606b9e3d
commit 2b2f838c20
2 changed files with 105 additions and 67 deletions

View File

@ -32,13 +32,16 @@ spec:
containers: containers:
- name: camdetect - name: camdetect
image: harbor.k-space.ee/k-space/camera-motion-detect:latest image: harbor.k-space.ee/k-space/camera-motion-detect:latest
ports:
- containerPort: 5000
name: "http"
resources: resources:
requests: requests:
memory: "64Mi" memory: "64Mi"
cpu: "250m" cpu: "200m"
limits: limits:
memory: "128Mi" memory: "128Mi"
cpu: "500m" cpu: "1"
securityContext: securityContext:
readOnlyRootFilesystem: true readOnlyRootFilesystem: true
runAsNonRoot: true runAsNonRoot: true

View File

@ -1,45 +1,29 @@
import asyncio import asyncio
import os
import yaml import yaml
from base64 import b64decode from base64 import b64decode
from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio.client.api_client import ApiClient
from kubernetes_asyncio import client, config from kubernetes_asyncio import client, config, watch
from os import path from os import path
from time import time from time import time
from urllib.parse import urlparse from urllib.parse import urlparse
NAMESPACE = os.environ["MY_POD_NAMESPACE"]
LABEL_MANAGED_BY = "camera-operator" LABEL_MANAGED_BY = "camera-operator"
with open("camera-service.yml") as stream: with open("camera-service.yml") as stream:
SERVICE_BODY = stream.read() SERVICE_BODY = stream.read()
with open("camera-deployment.yml") as stream: with open("camera-deployment.yml") as stream:
DEPLOYMENT_BODY = stream.read() DEPLOYMENT_BODY = stream.read()
async def main(): async def apply_changes(item, v1, now, apps_api):
await config.load_kube_config()
async with ApiClient() as api:
v1 = client.CoreV1Api(api)
apps_api = client.AppsV1Api()
print("Listing namespaces")
ret = await v1.list_namespace()
api_instance = client.CustomObjectsApi(api)
now = str(time())
for i in ret.items:
try:
resp = await api_instance.list_namespaced_custom_object(
"k-space.ee", "v1alpha1", i.metadata.name, "cams")
except client.exceptions.ApiException:
continue
for item in resp["items"]:
target = item["spec"]["target"] target = item["spec"]["target"]
replicas = item["spec"].get("replicas") replicas = item["spec"].get("replicas")
# Pull in secrets for the target URL # Pull in secrets for the target URL
secret_ref = item["spec"].get("secretRef") secret_ref = item["spec"].get("secretRef")
if secret_ref: if secret_ref:
secret = await v1.read_namespaced_secret(secret_ref, i.metadata.name) secret = await v1.read_namespaced_secret(secret_ref, NAMESPACE)
username = b64decode(secret.data.get("username", b"")).decode("ascii") username = b64decode(secret.data.get("username", b"")).decode("ascii")
password = b64decode(secret.data.get("password", b"")).decode("ascii") password = b64decode(secret.data.get("password", b"")).decode("ascii")
o = urlparse(target) o = urlparse(target)
@ -49,7 +33,7 @@ async def main():
target = o._replace(netloc="%s:%s@%s" % (username, password, netloc)).geturl() target = o._replace(netloc="%s:%s@%s" % (username, password, netloc)).geturl()
name = "camera-%s" % item["metadata"]["name"] name = "camera-%s" % item["metadata"]["name"]
print("Applying", name) print("Applying changes for", name, "CRD")
# Generate Deployment # Generate Deployment
body = yaml.safe_load(DEPLOYMENT_BODY.replace("foobar", name)) body = yaml.safe_load(DEPLOYMENT_BODY.replace("foobar", name))
@ -61,12 +45,12 @@ async def main():
body["spec"]["replicas"] = replicas body["spec"]["replicas"] = replicas
try: try:
await apps_api.replace_namespaced_deployment( await apps_api.replace_namespaced_deployment(
name = name, body = body, namespace=i.metadata.name) name = name, body = body, namespace=NAMESPACE)
print("Updated deployment %s/%s" % (i.metadata.name, name)) print(" * Updated deployment %s/%s" % (NAMESPACE, name))
except client.exceptions.ApiException as e: except client.exceptions.ApiException as e:
await apps_api.create_namespaced_deployment( await apps_api.create_namespaced_deployment(
body = body, namespace=i.metadata.name) body = body, namespace=NAMESPACE)
print("Created deployment %s/%s" % (i.metadata.name, name)) print(" * Created deployment %s/%s" % (NAMESPACE, name))
# Generate Service # Generate Service
body = yaml.safe_load(SERVICE_BODY.replace("foobar", name)) body = yaml.safe_load(SERVICE_BODY.replace("foobar", name))
@ -74,27 +58,78 @@ async def main():
body["metadata"]["labels"] ["modified"] = now body["metadata"]["labels"] ["modified"] = now
try: try:
await v1.replace_namespaced_service( await v1.replace_namespaced_service(
name = name, body = body, namespace=i.metadata.name) name = name, body = body, namespace=NAMESPACE)
print("Updated service %s/%s" % (i.metadata.name, name)) print(" * Updated service %s/%s" % (NAMESPACE, name))
except client.exceptions.ApiException as e: except client.exceptions.ApiException as e:
await v1.create_namespaced_service( await v1.create_namespaced_service(
body = body, namespace=i.metadata.name) body = body, namespace=NAMESPACE)
print("Created service %s/%s" % (i.metadata.name, name)) print(" * Created service %s/%s" % (NAMESPACE, name))
deployments = await apps_api.list_deployment_for_all_namespaces()
for dep in deployments.items: async def main():
if not dep.metadata.labels: if os.getenv("KUBECONFIG"):
await config.load_kube_config()
else:
await config.load_incluster_config()
async with ApiClient() as api:
v1 = client.CoreV1Api(api)
apps_api = client.AppsV1Api()
api_instance = client.CustomObjectsApi(api)
now = str(time())
w = watch.Watch()
args = "k-space.ee", "v1alpha1", NAMESPACE, "cams"
resp = await api_instance.list_namespaced_custom_object(*args)
for i in resp["items"]:
await apply_changes(i, v1, now, apps_api)
print("Cleaning up dangling deployments and services")
resp = await v1.list_namespaced_service(NAMESPACE)
for i in resp.items:
if not i.metadata.labels:
continue continue
if dep.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY: if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY:
continue continue
if dep.metadata.labels.get("modified") == now: if i.metadata.labels.get("modified") == now:
continue continue
print("Removing deployment: %s/%s" % (dep.metadata.namespace, dep.metadata.name)) print(" * Removing service: %s/%s" % (NAMESPACE, i.metadata.name))
await apps_api.delete_namespaced_deployment(name=dep.metadata.name, namespace=dep.metadata.namespace) await v1.delete_namespaced_service(i.metadata.name, NAMESPACE)
print("Done")
resp = await apps_api.list_namespaced_deployment(NAMESPACE)
for i in resp.items:
if not i.metadata.labels:
continue
if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY:
continue
if i.metadata.labels.get("modified") == now:
continue
print(" * Removing deployment: %s/%s" % (NAMESPACE, i.metadata.name))
await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE)
print("Subscribing to updates")
async for event in w.stream(api_instance.list_namespaced_custom_object, *args):
if event["type"] == "ADDED":
await apply_changes(event["object"], v1, now, apps_api)
elif event["type"] == "DELETED":
name = "camera-%s" % event["object"]["metadata"]["name"]
try:
await v1.delete_namespaced_service(name, NAMESPACE)
except client.exceptions.ApiException as e:
pass
else:
print("Removed service: %s/%s" % (NAMESPACE, name))
try:
await apps_api.delete_namespaced_deployment(name, NAMESPACE)
except client.exceptions.ApiException as e:
pass
else:
print("Removed deployment: %s/%s" % (NAMESPACE, name))
if __name__ == '__main__': if __name__ == '__main__':
loop = asyncio.get_event_loop() loop = asyncio.new_event_loop()
loop.run_until_complete(main()) loop.run_until_complete(main())
loop.close() loop.close()