diff --git a/meta-operator.py b/meta-operator.py index 6372390..2ca8886 100755 --- a/meta-operator.py +++ b/meta-operator.py @@ -3,9 +3,11 @@ import asyncio import ecs_logging import logging import os +import prometheus_async import random import string import yaml +from prometheus_client import Counter, Gauge from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio import client, config, watch from time import time @@ -27,11 +29,27 @@ args = parser.parse_args() MY_POD_NAMESPACE = os.environ["MY_POD_NAMESPACE"] LABEL_MANAGED_BY = "meta-operator" +counter_cleanups = Counter( + "meta_operator_cleanups", + "Count of executed cleanups", + ["stage"]) +counter_opened_watch_streams = Counter( + "meta_operator_opened_watch_streams", + "Count of watch streams opened") +gauge_watch_stream_begin_timestamp = Gauge( + "meta_operator_watch_stream_begin_timestamp", + "Timestamp of last watch stream open") +gauge_resource_version = Gauge( + "meta_operator_resource_version", + "Last observed resource version", + ["group", "version", "plural"]) + async def cleanup(v1, apps_api, label_selector, delay=0): ls = ",".join(label_selector) if delay: await asyncio.sleep(delay) + logger.info("Cleaning up %s" % ls) # Clean up services @@ -63,6 +81,7 @@ async def cleanup(v1, apps_api, label_selector, delay=0): for i in resp.items: logger.info("Removing secret: %s/%s", i.metadata.namespace, i.metadata.name) await v1.delete_namespaced_secret(i.metadata.name, i.metadata.namespace) + counter_cleanups.labels("executed").inc() async def apply_changes(operator, item, v1, revision, apps_api, api_instance): @@ -130,6 +149,7 @@ async def apply_changes(operator, item, v1, revision, apps_api, api_instance): for name, body in transform_template(operator["spec"].get("deployments", [])): if replicas: body["spec"]["replicas"] = replicas + try: await apps_api.replace_namespaced_deployment(name, namespace, body) logger.info("Updated deployment %s/%s", namespace, name) @@ -165,15 +185,20 @@ async def update_loop(v1, apps_api, api_instance, revision): logger.info("Subscribing to %(plural)s.%(group)s/%(version)s updates", operator["spec"]["resource"]) w = watch.Watch() latest_version = None - task = None + counter_opened_watch_streams.inc() async for event in w.stream(method, *flt, resource_version=latest_version): latest_version = event["object"]["metadata"]["resourceVersion"] + gauge_resource_version.labels( + operator["spec"]["resource"]["group"], + operator["spec"]["resource"]["version"], + operator["spec"]["resource"]["plural"]).set(latest_version) if event["type"] == "ADDED": await apply_changes(operator, event["object"], v1, revision, apps_api, api_instance) if task: task.cancel() + counter_cleanups.labels("postponed").inc() task = asyncio.create_task(cleanup(v1, apps_api, label_selector + ("codemowers.io/revision!=%s" % revision,), delay=2)) @@ -194,6 +219,7 @@ async def main(): revision = str(time()) while True: try: + gauge_watch_stream_begin_timestamp.set(time()) await update_loop(v1, apps_api, api_instance, revision) except asyncio.exceptions.TimeoutError: pass @@ -201,5 +227,6 @@ async def main(): if __name__ == "__main__": loop = asyncio.new_event_loop() + loop.create_task(prometheus_async.aio.web.start_http_server(port=5000)) loop.run_until_complete(main()) loop.close()