Rewrite: Change the way we're exposing volume metrics

Summary:
Formerly, we were updating the metrics every 15 seconds. We were facing a couple of issues doing it manually:

- Outdated metrics in case of a one-time crash
- Metrics getting exposed for deleted PVs

Instead of fixing the bugs, I preferred to do it the right way. As per `python-prometheus` docs:

>  Sometimes it is not possible to directly instrument code, as it is not in your control. This requires you to proxy metrics from other systems. To do so you need to create a custom collector...

Test Plan:
- Deploy on a cluster with existing rawfile PVs
- Send request to `:9100/metrics` and assert that metrics are exposed
- Delete a PV, and assert that its metrics disappear

Reviewers: h.marvi, bghadiri, sina_rad, mhyousefi

Reviewed By: h.marvi, bghadiri, sina_rad

Differential Revision: https://phab.hamravesh.ir/D815
This commit is contained in:
Mehran Kholdi 2020-06-12 15:55:32 +04:30
parent 523ccd510f
commit 8e5cb5de78

View File

@ -1,59 +1,74 @@
import json
import os
import threading
from os.path import basename
import subprocess
from prometheus_client import Gauge
from prometheus_client.core import REGISTRY
from prometheus_client.exposition import start_http_server
from prometheus_client.metrics_core import GaugeMetricFamily
import rawfile_util
from rawfile_util import attached_loops
from util import run_out
VOLUME_ID = "volume_id"
fs_size = Gauge(
"rawfile_filesystem_size_bytes", "Filesystem size in bytes.", [VOLUME_ID]
)
fs_free = Gauge(
"rawfile_filesystem_avail_bytes", "Filesystem free space in bytes", [VOLUME_ID]
)
dev_size = Gauge("rawfile_device_size_bytes", "Device size in bytes.", [VOLUME_ID])
dev_free = Gauge(
"rawfile_device_free_bytes", "Device free space in bytes.", [VOLUME_ID]
)
def collect_stats():
blockdevices = json.loads(run_out("lsblk --json").stdout.decode())["blockdevices"]
class VolumeStatsCollector(object):
def collect(self):
VOLUME_ID = "volume_id"
fs_size = GaugeMetricFamily(
"rawfile_filesystem_size_bytes",
"Filesystem size in bytes.",
labels=[VOLUME_ID],
)
fs_free = GaugeMetricFamily(
"rawfile_filesystem_avail_bytes",
"Filesystem free space in bytes",
labels=[VOLUME_ID],
)
dev_size = GaugeMetricFamily(
"rawfile_device_size_bytes", "Device size in bytes.", labels=[VOLUME_ID]
)
dev_free = GaugeMetricFamily(
"rawfile_device_free_bytes",
"Device free space in bytes.",
labels=[VOLUME_ID],
)
def dev_to_mountpoint(dev_name):
dev_name = basename(dev_name)
matches = list(filter(lambda bd: bd["name"] == dev_name, blockdevices))
if len(matches) == 0:
return None
return matches[0]["mountpoint"]
for volume_id in rawfile_util.list_all_volumes():
img_file = rawfile_util.img_file(volume_id)
labels = [volume_id]
dev_stat = img_file.stat()
dev_size.add_metric(labels, dev_stat.st_size)
dev_free.add_metric(labels, dev_stat.st_size - dev_stat.st_blocks * 512)
mountpoint = volume_to_mountpoint(img_file)
if mountpoint is not None:
fs_stat = os.statvfs(mountpoint)
fs_size.add_metric(labels, fs_stat.f_frsize * fs_stat.f_blocks)
fs_free.add_metric(labels, fs_stat.f_frsize * fs_stat.f_bfree)
for volume_id in rawfile_util.list_all_volumes():
img_file = rawfile_util.img_file(volume_id)
labels = {VOLUME_ID: volume_id}
dev_stat = img_file.stat()
dev_size.labels(**labels).set(dev_stat.st_size)
dev_free.labels(**labels).set(dev_stat.st_size - dev_stat.st_blocks * 512)
for dev in attached_loops(img_file):
mountpoint = dev_to_mountpoint(dev)
if mountpoint is None:
continue
fs_stat = os.statvfs(mountpoint)
fs_size.labels(**labels).set(fs_stat.f_frsize * fs_stat.f_blocks)
fs_free.labels(**labels).set(fs_stat.f_frsize * fs_stat.f_bfree)
break
return [fs_size, fs_free, dev_size, dev_free]
def volume_to_mountpoint(img_file):
for dev in attached_loops(img_file):
mountpoint = dev_to_mountpoint(dev)
if mountpoint is not None:
return mountpoint
return None
def dev_to_mountpoint(dev_name):
try:
output = subprocess.run(
f"findmnt --json --first-only {dev_name}",
shell=True,
check=True,
capture_output=True,
).stdout.decode()
data = json.loads(output)
return data["filesystems"][0]["target"]
except subprocess.CalledProcessError:
return None
def expose_metrics():
def collector_loop():
collect_stats()
threading.Timer(10, collector_loop).start()
collector_loop()
REGISTRY.register(VolumeStatsCollector())
start_http_server(9100)