This commit is contained in:
commit
088112bcd4
2
.drone.yml
Normal file
2
.drone.yml
Normal file
@ -0,0 +1,2 @@
|
||||
kind: template
|
||||
load: docker.yaml
|
3
Dockerfile
Normal file
3
Dockerfile
Normal file
@ -0,0 +1,3 @@
|
||||
FROM harbor.k-space.ee/k-space/microservice-base
|
||||
ADD meta-operator.py /
|
||||
ENTRYPOINT /meta-operator.py
|
3
README.md
Normal file
3
README.md
Normal file
@ -0,0 +1,3 @@
|
||||
# Meta operator
|
||||
|
||||
Meta operator allows creating operators without building any binaries
|
199
meta-operator.py
Executable file
199
meta-operator.py
Executable file
@ -0,0 +1,199 @@
|
||||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import yaml
|
||||
from kubernetes_asyncio.client.api_client import ApiClient
|
||||
from kubernetes_asyncio import client, config, watch
|
||||
from time import time
|
||||
import useful.logs
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Run handler for ClusterOperator CRD")
|
||||
parser.add_argument("--target", "-t", default="meta")
|
||||
args = parser.parse_args()
|
||||
|
||||
useful.logs.setup(json_fields={"msg": "message", "level": "levelname", "traceback": "exc_text"})
|
||||
logger = logging.getLogger()
|
||||
|
||||
MY_POD_NAMESPACE = os.environ["MY_POD_NAMESPACE"]
|
||||
LABEL_MANAGED_BY = "meta-operator"
|
||||
|
||||
|
||||
async def cleanup(v1, apps_api, label_selector, delay=0):
|
||||
ls = ",".join(label_selector)
|
||||
if delay:
|
||||
await asyncio.sleep(delay)
|
||||
logger.info("Cleaning up %s" % ls)
|
||||
|
||||
# Clean up services
|
||||
resp = await v1.list_namespaced_service("", label_selector=ls)
|
||||
for i in resp.items:
|
||||
logger.info("Removing service: %s/%s", i.metadata.namespace, i.metadata.name)
|
||||
await v1.delete_namespaced_service(i.metadata.name, i.metadata.namespace)
|
||||
|
||||
# Clean up deployments
|
||||
resp = await apps_api.list_namespaced_deployment("", label_selector=ls)
|
||||
for i in resp.items:
|
||||
logger.info("Removing deployment: %s/%s", i.metadata.namespace, i.metadata.name)
|
||||
await apps_api.delete_namespaced_deployment(i.metadata.name, i.metadata.namespace)
|
||||
|
||||
# Clean up statefulsets
|
||||
resp = await apps_api.list_namespaced_stateful_set("", label_selector=ls)
|
||||
for i in resp.items:
|
||||
logger.info("Removing statefulset: %s/%s", i.metadata.namespace, i.metadata.name)
|
||||
await apps_api.delete_namespaced_stateful_set(i.metadata.name, i.metadata.namespace)
|
||||
|
||||
# Clean up configmaps
|
||||
resp = await v1.list_namespaced_config_map("", label_selector=ls)
|
||||
for i in resp.items:
|
||||
logger.info("Removing configmap: %s/%s", i.metadata.namespace, i.metadata.name)
|
||||
await v1.delete_namespaced_config_map(i.metadata.name, i.metadata.namespace)
|
||||
|
||||
# Clean up secrets
|
||||
resp = await v1.list_namespaced_secret("", label_selector=ls)
|
||||
for i in resp.items:
|
||||
logger.info("Removing secret: %s/%s", i.metadata.namespace, i.metadata.name)
|
||||
await v1.delete_namespaced_secret(i.metadata.name, i.metadata.namespace)
|
||||
|
||||
|
||||
async def apply_changes(operator, item, v1, revision, apps_api, api_instance):
|
||||
namespace = item["metadata"].get("namespace", MY_POD_NAMESPACE)
|
||||
name = item["metadata"]["name"]
|
||||
logger.info("Applying changes for %s in namespace %s", name, namespace)
|
||||
if namespace == "meta-operator" and name == "meta":
|
||||
logger.info("Not touching myself")
|
||||
return
|
||||
labels = {
|
||||
"codemowers.io/revision": revision,
|
||||
"codemowers.io/resource-name": name,
|
||||
"codemowers.io/resource-group": operator["spec"]["resource"]["group"],
|
||||
"codemowers.io/resource-version": operator["spec"]["resource"]["version"],
|
||||
"codemowers.io/resource-plural": operator["spec"]["resource"]["plural"],
|
||||
"app.kubernetes.io/managed-by": LABEL_MANAGED_BY,
|
||||
}
|
||||
replicas = item["spec"].get("replicas")
|
||||
if operator["spec"].get("secret", {}).get("enabled"):
|
||||
secret_name = operator["spec"]["secret"]["name"].replace("foobar", item["metadata"]["name"])
|
||||
password = "".join([random.choice(string.ascii_letters + string.digits) for j in range(20)])
|
||||
data = {}
|
||||
for o in operator["spec"]["secret"]["structure"]:
|
||||
data[o["key"]] = o["value"].replace("foobar", item["metadata"]["name"]) % password
|
||||
try:
|
||||
await v1.create_namespaced_secret(namespace, {
|
||||
"metadata": {"name": secret_name},
|
||||
"stringData": data
|
||||
})
|
||||
except client.exceptions.ApiException:
|
||||
logger.info("Secret already generated, adding labels")
|
||||
await v1.patch_namespaced_secret(secret_name, namespace, {
|
||||
"metadata": {"labels": labels}
|
||||
})
|
||||
else:
|
||||
logger.info("Secret generation disabled")
|
||||
|
||||
def transform_template(iterable):
|
||||
for i in iterable:
|
||||
body = yaml.safe_load(yaml.dump(i).replace("foobar", item["metadata"]["name"]))
|
||||
if "labels" not in body["metadata"]:
|
||||
body["metadata"]["labels"] = {}
|
||||
body["metadata"]["labels"].update(labels)
|
||||
yield body["metadata"]["name"], body
|
||||
|
||||
# Generate Services
|
||||
for name, body in transform_template(operator["spec"].get("services", [])):
|
||||
try:
|
||||
await v1.patch_namespaced_service(name, namespace, body)
|
||||
logger.info("Updated service %s/%s", namespace, name)
|
||||
except client.exceptions.ApiException:
|
||||
await v1.create_namespaced_service(namespace, body)
|
||||
logger.info("Created service %s/%s", namespace, name)
|
||||
|
||||
# Generate ConfigMaps
|
||||
for name, body in transform_template(operator["spec"].get("configmaps", [])):
|
||||
try:
|
||||
await v1.patch_namespaced_config_map(name, namespace, body)
|
||||
logger.info("Updated configmap %s/%s", namespace, name)
|
||||
except client.exceptions.ApiException:
|
||||
await v1.create_namespaced_config_map(namespace, body)
|
||||
logger.info("Created configmap %s/%s", namespace, name)
|
||||
|
||||
# Generate Deployments
|
||||
for name, body in transform_template(operator["spec"].get("deployments", [])):
|
||||
if replicas:
|
||||
body["spec"]["replicas"] = replicas
|
||||
try:
|
||||
await apps_api.patch_namespaced_deployment(name, namespace, body)
|
||||
logger.info("Updated deployment %s/%s", namespace, name)
|
||||
except client.exceptions.ApiException:
|
||||
await apps_api.create_namespaced_deployment(namespace, body)
|
||||
logger.info("Created deployment %s/%s", namespace, name)
|
||||
|
||||
# Generate statefulsets
|
||||
for name, body in transform_template(operator["spec"].get("statefulsets", [])):
|
||||
if replicas:
|
||||
body["spec"]["replicas"] = replicas
|
||||
try:
|
||||
await apps_api.patch_namespaced_stateful_set(name, namespace, body)
|
||||
logger.info("Updated statefulset %s/%s", namespace, name)
|
||||
except client.exceptions.ApiException:
|
||||
await apps_api.create_namespaced_stateful_set(namespace, body)
|
||||
logger.info("Created statefulset %s/%s", namespace, name)
|
||||
|
||||
|
||||
async def update_loop(v1, apps_api, api_instance, revision):
|
||||
logger.info("Managing resources for ClusterOperator named %s", repr(args.target))
|
||||
operator = await api_instance.get_cluster_custom_object("codemowers.io", "v1alpha1", "clusteroperators", args.target)
|
||||
label_selector = (
|
||||
"app.kubernetes.io/managed-by==%s" % LABEL_MANAGED_BY,
|
||||
"codemowers.io/resource-group==%s" % operator["spec"]["resource"]["group"],
|
||||
"codemowers.io/resource-version==%s" % operator["spec"]["resource"]["version"],
|
||||
"codemowers.io/resource-plural==%s" % operator["spec"]["resource"]["plural"]
|
||||
)
|
||||
flt = operator["spec"]["resource"]["group"], operator["spec"]["resource"]["version"], \
|
||||
"", operator["spec"]["resource"]["plural"]
|
||||
|
||||
method = api_instance.list_namespaced_custom_object
|
||||
logger.info("Subscribing to %(plural)s.%(group)s/%(version)s updates", operator["spec"]["resource"])
|
||||
w = watch.Watch()
|
||||
latest_version = None
|
||||
|
||||
task = None
|
||||
|
||||
async for event in w.stream(method, *flt, resource_version=latest_version):
|
||||
latest_version = event["object"]["metadata"]["resourceVersion"]
|
||||
if event["type"] == "ADDED":
|
||||
await apply_changes(operator, event["object"], v1, revision, apps_api, api_instance)
|
||||
if task:
|
||||
task.cancel()
|
||||
task = asyncio.create_task(cleanup(v1, apps_api,
|
||||
label_selector + ("codemowers.io/revision!=%s" % revision,),
|
||||
delay=2))
|
||||
elif event["type"] == "DELETED":
|
||||
asyncio.create_task(cleanup(v1, apps_api,
|
||||
label_selector + ("codemowers.io/resource-name==%s" % event["object"]["metadata"]["name"],)))
|
||||
|
||||
|
||||
async def main():
|
||||
if os.getenv("KUBECONFIG"):
|
||||
await config.load_kube_config()
|
||||
else:
|
||||
config.load_incluster_config()
|
||||
async with ApiClient() as api:
|
||||
v1 = client.CoreV1Api(api)
|
||||
apps_api = client.AppsV1Api()
|
||||
api_instance = client.CustomObjectsApi(api)
|
||||
revision = str(time())
|
||||
while True:
|
||||
try:
|
||||
await update_loop(v1, apps_api, api_instance, revision)
|
||||
except asyncio.exceptions.TimeoutError:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
loop.close()
|
Loading…
Reference in New Issue
Block a user