Add Prometheus metrics
All checks were successful
continuous-integration/drone Build is passing

This commit is contained in:
Lauri Võsandi 2022-10-21 12:06:51 +03:00
parent 32981b2c19
commit 9c8e73b04f

View File

@ -3,9 +3,11 @@ import asyncio
import ecs_logging import ecs_logging
import logging import logging
import os import os
import prometheus_async
import random import random
import string import string
import yaml import yaml
from prometheus_client import Counter, Gauge
from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio.client.api_client import ApiClient
from kubernetes_asyncio import client, config, watch from kubernetes_asyncio import client, config, watch
from time import time from time import time
@ -27,11 +29,27 @@ args = parser.parse_args()
MY_POD_NAMESPACE = os.environ["MY_POD_NAMESPACE"] MY_POD_NAMESPACE = os.environ["MY_POD_NAMESPACE"]
LABEL_MANAGED_BY = "meta-operator" LABEL_MANAGED_BY = "meta-operator"
counter_cleanups = Counter(
"meta_operator_cleanups",
"Count of executed cleanups",
["stage"])
counter_opened_watch_streams = Counter(
"meta_operator_opened_watch_streams",
"Count of watch streams opened")
gauge_watch_stream_begin_timestamp = Gauge(
"meta_operator_watch_stream_begin_timestamp",
"Timestamp of last watch stream open")
gauge_resource_version = Gauge(
"meta_operator_resource_version",
"Last observed resource version",
["group", "version", "plural"])
async def cleanup(v1, apps_api, label_selector, delay=0): async def cleanup(v1, apps_api, label_selector, delay=0):
ls = ",".join(label_selector) ls = ",".join(label_selector)
if delay: if delay:
await asyncio.sleep(delay) await asyncio.sleep(delay)
logger.info("Cleaning up %s" % ls) logger.info("Cleaning up %s" % ls)
# Clean up services # Clean up services
@ -63,6 +81,7 @@ async def cleanup(v1, apps_api, label_selector, delay=0):
for i in resp.items: for i in resp.items:
logger.info("Removing secret: %s/%s", i.metadata.namespace, i.metadata.name) logger.info("Removing secret: %s/%s", i.metadata.namespace, i.metadata.name)
await v1.delete_namespaced_secret(i.metadata.name, i.metadata.namespace) await v1.delete_namespaced_secret(i.metadata.name, i.metadata.namespace)
counter_cleanups.labels("executed").inc()
async def apply_changes(operator, item, v1, revision, apps_api, api_instance): async def apply_changes(operator, item, v1, revision, apps_api, api_instance):
@ -130,6 +149,7 @@ async def apply_changes(operator, item, v1, revision, apps_api, api_instance):
for name, body in transform_template(operator["spec"].get("deployments", [])): for name, body in transform_template(operator["spec"].get("deployments", [])):
if replicas: if replicas:
body["spec"]["replicas"] = replicas body["spec"]["replicas"] = replicas
try: try:
await apps_api.replace_namespaced_deployment(name, namespace, body) await apps_api.replace_namespaced_deployment(name, namespace, body)
logger.info("Updated deployment %s/%s", namespace, name) logger.info("Updated deployment %s/%s", namespace, name)
@ -165,15 +185,20 @@ async def update_loop(v1, apps_api, api_instance, revision):
logger.info("Subscribing to %(plural)s.%(group)s/%(version)s updates", operator["spec"]["resource"]) logger.info("Subscribing to %(plural)s.%(group)s/%(version)s updates", operator["spec"]["resource"])
w = watch.Watch() w = watch.Watch()
latest_version = None latest_version = None
task = None task = None
counter_opened_watch_streams.inc()
async for event in w.stream(method, *flt, resource_version=latest_version): async for event in w.stream(method, *flt, resource_version=latest_version):
latest_version = event["object"]["metadata"]["resourceVersion"] latest_version = event["object"]["metadata"]["resourceVersion"]
gauge_resource_version.labels(
operator["spec"]["resource"]["group"],
operator["spec"]["resource"]["version"],
operator["spec"]["resource"]["plural"]).set(latest_version)
if event["type"] == "ADDED": if event["type"] == "ADDED":
await apply_changes(operator, event["object"], v1, revision, apps_api, api_instance) await apply_changes(operator, event["object"], v1, revision, apps_api, api_instance)
if task: if task:
task.cancel() task.cancel()
counter_cleanups.labels("postponed").inc()
task = asyncio.create_task(cleanup(v1, apps_api, task = asyncio.create_task(cleanup(v1, apps_api,
label_selector + ("codemowers.io/revision!=%s" % revision,), label_selector + ("codemowers.io/revision!=%s" % revision,),
delay=2)) delay=2))
@ -194,6 +219,7 @@ async def main():
revision = str(time()) revision = str(time())
while True: while True:
try: try:
gauge_watch_stream_begin_timestamp.set(time())
await update_loop(v1, apps_api, api_instance, revision) await update_loop(v1, apps_api, api_instance, revision)
except asyncio.exceptions.TimeoutError: except asyncio.exceptions.TimeoutError:
pass pass
@ -201,5 +227,6 @@ async def main():
if __name__ == "__main__": if __name__ == "__main__":
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
loop.create_task(prometheus_async.aio.web.start_http_server(port=5000))
loop.run_until_complete(main()) loop.run_until_complete(main())
loop.close() loop.close()