This commit is contained in:
parent
e276e71456
commit
43764d3365
6
.pre-commit-config.yaml
Normal file
6
.pre-commit-config.yaml
Normal file
@ -0,0 +1,6 @@
|
||||
repos:
|
||||
- repo: https://github.com/PyCQA/flake8
|
||||
rev: 3.9.2
|
||||
hooks:
|
||||
- id: flake8
|
||||
additional_dependencies: [flake8-typing-imports==1.10.0,flake8-quotes==3.2.0]
|
6
README.md
Normal file
6
README.md
Normal file
@ -0,0 +1,6 @@
|
||||
To run locally with suitable `KUBECONFIG`:
|
||||
|
||||
```
|
||||
MY_POD_NAMESPACE=camtiler ./camera-operator.py
|
||||
```
|
||||
|
@ -6,22 +6,21 @@ import yaml
|
||||
from base64 import b64decode
|
||||
from kubernetes_asyncio.client.api_client import ApiClient
|
||||
from kubernetes_asyncio import client, config, watch
|
||||
from os import path
|
||||
from time import time
|
||||
from urllib.parse import urlparse
|
||||
import useful.logs
|
||||
|
||||
useful.logs.setup(
|
||||
json_fields={"msg":"message", "level": "levelname"})
|
||||
useful.logs.setup(json_fields={"msg": "message", "level": "levelname"})
|
||||
logger = logging.getLogger()
|
||||
|
||||
NAMESPACE = os.environ["MY_POD_NAMESPACE"]
|
||||
|
||||
LABEL_MANAGED_BY = "camera-operator"
|
||||
with open("camera-service.yml") as stream:
|
||||
SERVICE_BODY = stream.read()
|
||||
SERVICE_BODY = stream.read()
|
||||
with open("camera-deployment.yml") as stream:
|
||||
DEPLOYMENT_BODY = stream.read()
|
||||
DEPLOYMENT_BODY = stream.read()
|
||||
|
||||
|
||||
async def apply_changes(item, v1, now, apps_api):
|
||||
target = item["spec"]["target"]
|
||||
@ -44,99 +43,94 @@ async def apply_changes(item, v1, now, apps_api):
|
||||
|
||||
# Generate Deployment
|
||||
body = yaml.safe_load(DEPLOYMENT_BODY.replace("foobar", name))
|
||||
body["metadata"]["labels"] ["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY
|
||||
body["metadata"]["labels"] ["modified"] = now
|
||||
body["metadata"]["labels"]["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY
|
||||
body["metadata"]["labels"]["modified"] = now
|
||||
body["spec"]["template"]["spec"]["containers"][0]["args"] = [target]
|
||||
|
||||
if replicas:
|
||||
body["spec"]["replicas"] = replicas
|
||||
try:
|
||||
await apps_api.replace_namespaced_deployment(
|
||||
name = name, body = body, namespace=NAMESPACE)
|
||||
await apps_api.replace_namespaced_deployment(name, NAMESPACE, body)
|
||||
logger.debug("Updated deployment %s/%s", NAMESPACE, name)
|
||||
except client.exceptions.ApiException as e:
|
||||
await apps_api.create_namespaced_deployment(
|
||||
body = body, namespace=NAMESPACE)
|
||||
except client.exceptions.ApiException:
|
||||
await apps_api.create_namespaced_deployment(NAMESPACE, body)
|
||||
logger.debug("Created deployment %s/%s", NAMESPACE, name)
|
||||
|
||||
# Generate Service
|
||||
body = yaml.safe_load(SERVICE_BODY.replace("foobar", name))
|
||||
body["metadata"]["labels"] ["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY
|
||||
body["metadata"]["labels"] ["modified"] = now
|
||||
body["metadata"]["labels"]["app.kubernetes.io/managed-by"] = LABEL_MANAGED_BY
|
||||
body["metadata"]["labels"]["modified"] = now
|
||||
try:
|
||||
await v1.replace_namespaced_service(
|
||||
name = name, body = body, namespace=NAMESPACE)
|
||||
await v1.replace_namespaced_service(name, NAMESPACE, body)
|
||||
logger.debug("Updated service %s/%s", NAMESPACE, name)
|
||||
except client.exceptions.ApiException as e:
|
||||
await v1.create_namespaced_service(
|
||||
body = body, namespace=NAMESPACE)
|
||||
except client.exceptions.ApiException:
|
||||
await v1.create_namespaced_service(NAMESPACE, body)
|
||||
logger.debug("Created service %s/%s", NAMESPACE, name)
|
||||
|
||||
|
||||
async def main():
|
||||
if os.getenv("KUBECONFIG"):
|
||||
await config.load_kube_config()
|
||||
else:
|
||||
config.load_incluster_config()
|
||||
async with ApiClient() as api:
|
||||
v1 = client.CoreV1Api(api)
|
||||
apps_api = client.AppsV1Api()
|
||||
api_instance = client.CustomObjectsApi(api)
|
||||
now = str(time())
|
||||
if os.getenv("KUBECONFIG"):
|
||||
await config.load_kube_config()
|
||||
else:
|
||||
config.load_incluster_config()
|
||||
async with ApiClient() as api:
|
||||
v1 = client.CoreV1Api(api)
|
||||
apps_api = client.AppsV1Api()
|
||||
api_instance = client.CustomObjectsApi(api)
|
||||
now = str(time())
|
||||
|
||||
w = watch.Watch()
|
||||
args = "k-space.ee", "v1alpha1", NAMESPACE, "cams"
|
||||
w = watch.Watch()
|
||||
args = "k-space.ee", "v1alpha1", NAMESPACE, "cams"
|
||||
|
||||
resp = await api_instance.list_namespaced_custom_object(*args)
|
||||
for i in resp["items"]:
|
||||
await apply_changes(i, v1, now, apps_api)
|
||||
resp = await api_instance.list_namespaced_custom_object(*args)
|
||||
for i in resp["items"]:
|
||||
await apply_changes(i, v1, now, apps_api)
|
||||
|
||||
logger.info("Cleaning up dangling deployments and services")
|
||||
resp = await v1.list_namespaced_service(NAMESPACE)
|
||||
for i in resp.items:
|
||||
if not i.metadata.labels:
|
||||
continue
|
||||
if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY:
|
||||
continue
|
||||
if i.metadata.labels.get("modified") == now:
|
||||
continue
|
||||
logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name)
|
||||
await v1.delete_namespaced_service(i.metadata.name, NAMESPACE)
|
||||
logger.info("Cleaning up dangling deployments and services")
|
||||
resp = await v1.list_namespaced_service(NAMESPACE)
|
||||
for i in resp.items:
|
||||
if not i.metadata.labels:
|
||||
continue
|
||||
if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY:
|
||||
continue
|
||||
if i.metadata.labels.get("modified") == now:
|
||||
continue
|
||||
logger.debug("Removing service: %s/%s", NAMESPACE, i.metadata.name)
|
||||
await v1.delete_namespaced_service(i.metadata.name, NAMESPACE)
|
||||
|
||||
resp = await apps_api.list_namespaced_deployment(NAMESPACE)
|
||||
for i in resp.items:
|
||||
if not i.metadata.labels:
|
||||
continue
|
||||
if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY:
|
||||
continue
|
||||
if i.metadata.labels.get("modified") == now:
|
||||
continue
|
||||
logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name)
|
||||
await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE)
|
||||
resp = await apps_api.list_namespaced_deployment(NAMESPACE)
|
||||
for i in resp.items:
|
||||
if not i.metadata.labels:
|
||||
continue
|
||||
if i.metadata.labels.get("app.kubernetes.io/managed-by") != LABEL_MANAGED_BY:
|
||||
continue
|
||||
if i.metadata.labels.get("modified") == now:
|
||||
continue
|
||||
logger.debug("Removing deployment: %s/%s", NAMESPACE, i.metadata.name)
|
||||
await apps_api.delete_namespaced_deployment(i.metadata.name, NAMESPACE)
|
||||
|
||||
logger.info("Subscribing to updates")
|
||||
logger.info("Subscribing to updates")
|
||||
|
||||
async for event in w.stream(api_instance.list_namespaced_custom_object, *args):
|
||||
if event["type"] == "ADDED":
|
||||
await apply_changes(event["object"], v1, now, apps_api)
|
||||
elif event["type"] == "DELETED":
|
||||
name = "camera-%s" % event["object"]["metadata"]["name"]
|
||||
try:
|
||||
await v1.delete_namespaced_service(name, NAMESPACE)
|
||||
except client.exceptions.ApiException as e:
|
||||
pass
|
||||
else:
|
||||
logger.debug("Removed service: %s/%s", NAMESPACE, name)
|
||||
try:
|
||||
await apps_api.delete_namespaced_deployment(name, NAMESPACE)
|
||||
except client.exceptions.ApiException as e:
|
||||
pass
|
||||
else:
|
||||
logger.debug("Removed deployment: %s/%s", NAMESPACE, name)
|
||||
async for event in w.stream(api_instance.list_namespaced_custom_object, *args):
|
||||
if event["type"] == "ADDED":
|
||||
await apply_changes(event["object"], v1, now, apps_api)
|
||||
elif event["type"] == "DELETED":
|
||||
name = "camera-%s" % event["object"]["metadata"]["name"]
|
||||
try:
|
||||
await v1.delete_namespaced_service(name, NAMESPACE)
|
||||
except client.exceptions.ApiException:
|
||||
pass
|
||||
else:
|
||||
logger.debug("Removed service: %s/%s", NAMESPACE, name)
|
||||
try:
|
||||
await apps_api.delete_namespaced_deployment(name, NAMESPACE)
|
||||
except client.exceptions.ApiException:
|
||||
pass
|
||||
else:
|
||||
logger.debug("Removed deployment: %s/%s", NAMESPACE, name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
loop.close()
|
||||
|
||||
|
Reference in New Issue
Block a user