diff --git a/bd2fs.py b/bd2fs.py new file mode 100644 index 0000000..87221b5 --- /dev/null +++ b/bd2fs.py @@ -0,0 +1,218 @@ +from pathlib import Path + +import grpc + +from csi import csi_pb2, csi_pb2_grpc +from csi.csi_pb2 import ( + NodeStageVolumeRequest, + NodePublishVolumeRequest, + NodeUnpublishVolumeRequest, + NodeUnstageVolumeRequest, + NodeExpandVolumeRequest, + CreateVolumeRequest, +) +from declarative import ( + be_mounted, + be_unmounted, + be_absent, + be_formatted, + be_fs_expanded, + current_fs, +) +from metrics import path_stats, mountpoint_to_dev +from util import log_grpc_request + + +def get_fs(request): + fs_type = request.volume_capability.mount.fs_type + if fs_type == "": + fs_type = "ext4" + return fs_type + + +class Bd2FsIdentityServicer(csi_pb2_grpc.IdentityServicer): + def __init__(self, bds): + self.bds = bds + + @log_grpc_request + def GetPluginInfo(self, request, context): + return self.bds.GetPluginInfo(request, context) + + @log_grpc_request + def GetPluginCapabilities(self, request, context): + return self.bds.GetPluginCapabilities(request, context) + + # @log_grpc_request + def Probe(self, request, context): + return self.bds.Probe(request, context) + + +class Bd2FsNodeServicer(csi_pb2_grpc.NodeServicer): + def __init__(self, bds): + self.bds = bds + + # @log_grpc_request + def NodeGetCapabilities(self, request, context): + return self.bds.NodeGetCapabilities(request, context) + + @log_grpc_request + def NodePublishVolume(self, request, context): + staging_dev = f"{request.staging_target_path}/device" + Path(request.target_path).mkdir(exist_ok=True) + be_mounted(dev=staging_dev, mountpoint=request.target_path) + return csi_pb2.NodePublishVolumeResponse() + + @log_grpc_request + def NodeUnpublishVolume(self, request, context): + be_unmounted(request.target_path) + be_absent(request.target_path) + return csi_pb2.NodeUnpublishVolumeResponse() + + @log_grpc_request + def NodeGetInfo(self, request, context): + return self.bds.NodeGetInfo(request, context) + + @log_grpc_request + def NodeStageVolume(self, request, context): + bd_stage_request = NodeStageVolumeRequest() + bd_stage_request.CopyFrom(request) + bd_stage_request.staging_target_path = f"{request.staging_target_path}/block" + Path(bd_stage_request.staging_target_path).mkdir(exist_ok=True) + self.bds.NodeStageVolume(bd_stage_request, context) + + bd_publish_request = NodePublishVolumeRequest() + bd_publish_request.volume_id = request.volume_id + bd_publish_request.publish_context.update(request.publish_context) + bd_publish_request.staging_target_path = bd_stage_request.staging_target_path + bd_publish_request.target_path = f"{request.staging_target_path}/device" + bd_publish_request.volume_capability.CopyFrom(request.volume_capability) + bd_publish_request.readonly = False + bd_publish_request.secrets.update(request.secrets) + bd_publish_request.volume_context.update(request.volume_context) + + self.bds.NodePublishVolume(bd_publish_request, context) + + mount_path = f"{request.staging_target_path}/mount" + Path(mount_path).mkdir(exist_ok=True) + be_formatted(dev=bd_publish_request.target_path, fs=get_fs(request)) + be_mounted(dev=bd_publish_request.target_path, mountpoint=mount_path) + + return csi_pb2.NodeStageVolumeResponse() + + @log_grpc_request + def NodeUnstageVolume(self, request, context): + mount_path = f"{request.staging_target_path}/mount" + be_unmounted(mount_path) + be_absent(mount_path) + + bd_unpublish_request = NodeUnpublishVolumeRequest() + bd_unpublish_request.volume_id = request.volume_id + bd_unpublish_request.target_path = f"{request.staging_target_path}/device" + self.bds.NodeUnpublishVolume(bd_unpublish_request, context) + + bd_unstage_request = NodeUnstageVolumeRequest() + bd_unstage_request.CopyFrom(request) + bd_unstage_request.staging_target_path = f"{request.staging_target_path}/block" + self.bds.NodeUnstageVolume(bd_unstage_request, context) + be_absent(bd_unstage_request.staging_target_path) + + return csi_pb2.NodeUnstageVolumeResponse() + + # @log_grpc_request + def NodeGetVolumeStats(self, request, context): + volume_path = request.volume_path + stats = path_stats(volume_path) + return csi_pb2.NodeGetVolumeStatsResponse( + usage=[ + csi_pb2.VolumeUsage( + available=stats["fs_free"], + total=stats["fs_size"], + used=stats["fs_size"] - stats["fs_free"], + unit=csi_pb2.VolumeUsage.Unit.BYTES, + ), + csi_pb2.VolumeUsage( + available=stats["fs_files_free"], + total=stats["fs_files"], + used=stats["fs_files"] - stats["fs_files_free"], + unit=csi_pb2.VolumeUsage.Unit.INODES, + ), + ] + ) + + @log_grpc_request + def NodeExpandVolume(self, request, context): + # FIXME: hacky way to determine if `volume_path` is staged path, or the mount itself + # Based on CSI 1.4.0 specifications: + # > The staging_target_path field is not required, for backwards compatibility, but the CO SHOULD supply it. + # Apparently, k8s 1.18 does not supply it. So: + dev_path = mountpoint_to_dev(request.volume_path) + if dev_path is None: + dev_path = f"{request.volume_path}/device" + + bd_request = NodeExpandVolumeRequest() + bd_request.CopyFrom(request) + bd_request.volume_path = dev_path + self.bds.NodeExpandVolume(bd_request, context) + + # Based on CSI 1.4.0 specifications: + # > If volume_capability is omitted the SP MAY determine + # > access_type from given volume_path for the volume and perform + # > node expansion. + # Apparently k8s 1.18 omits this field. + fs_type = current_fs(bd_request.volume_path) + be_fs_expanded(fs_type, bd_request.volume_path, request.volume_path) + + size = request.capacity_range.required_bytes + return csi_pb2.NodeExpandVolumeResponse(capacity_bytes=size) + + +class Bd2FsControllerServicer(csi_pb2_grpc.ControllerServicer): + def __init__(self, bds): + self.bds = bds + + @log_grpc_request + def ControllerGetCapabilities(self, request, context): + return self.bds.ControllerGetCapabilities(request, context) + + @log_grpc_request + def CreateVolume(self, request, context): + # TODO: volume_capabilities + + if len(request.volume_capabilities) != 1: + context.abort( + grpc.StatusCode.INVALID_ARGUMENT, "Exactly one cap is supported" + ) + + volume_capability = request.volume_capabilities[0] + + AccessModeEnum = csi_pb2.VolumeCapability.AccessMode.Mode + if volume_capability.access_mode.mode not in [ + AccessModeEnum.SINGLE_NODE_WRITER + ]: + context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Unsupported access mode: {AccessModeEnum.Name(volume_capability.access_mode.mode)}", + ) + + access_type = volume_capability.WhichOneof("access_type") + assert access_type == "mount" + + bd_request = CreateVolumeRequest() + bd_request.CopyFrom(request) + bd_request.capacity_range.required_bytes = max( + request.capacity_range.required_bytes, 10 * 1024 * 1024 + ) # At least 10MB + # FIXME: update access_type + # bd_request.volume_capabilities[0].block = "" + # bd_request.volume_capabilities[0].mount = None + return self.bds.CreateVolume(bd_request, context) + + @log_grpc_request + def DeleteVolume(self, request, context): + return self.bds.DeleteVolume(request, context) + + @log_grpc_request + def ControllerExpandVolume(self, request, context): + response = self.bds.ControllerExpandVolume(request, context) + assert response.node_expansion_required + return response diff --git a/declarative.py b/declarative.py index 85d91ab..258e9a3 100644 --- a/declarative.py +++ b/declarative.py @@ -1,4 +1,5 @@ import os +import subprocess from pathlib import Path from util import run @@ -46,3 +47,41 @@ def be_unmounted(path): path = Path(path) while path.is_mount(): run(f"umount {path}") + + +def current_fs(device): + res = subprocess.run( + f"blkid -o value -s TYPE {device}", shell=True, capture_output=True + ) + if res.returncode == 2: # specified token was not found + return None + return res.stdout.decode().strip() + + +def be_formatted(dev, fs): + def init_fs(device, filesystem): + if fs == "ext4": + run(f"mkfs.ext4 {device}") + elif fs == "btrfs": + run(f"mkfs.btrfs {device}") + else: + raise Exception(f"Unsupported fs type: {filesystem}") + + dev = Path(dev).resolve() + current = current_fs(dev) + if current is None: + init_fs(dev, fs) + else: + if current != fs: + raise Exception(f"Existing filesystem does not match: {current}/{fs}") + + +def be_fs_expanded(fs, dev, path): + dev = Path(dev).resolve() + path = Path(path).resolve() + if fs == "ext4": + run(f"resize2fs {dev}") + elif fs == "btrfs": + run(f"btrfs filesystem resize max {path}") + else: + raise Exception(f"Unsupported fsType: {fs}") diff --git a/metrics.py b/metrics.py index 1defba7..a40adf1 100644 --- a/metrics.py +++ b/metrics.py @@ -10,6 +10,16 @@ import rawfile_util from rawfile_util import attached_loops +def path_stats(path): + fs_stat = os.statvfs(path) + return { + "fs_size": fs_stat.f_frsize * fs_stat.f_blocks, + "fs_free": fs_stat.f_frsize * fs_stat.f_bfree, + "fs_files": fs_stat.f_files, + "fs_files_free": fs_stat.f_ffree, + } + + def volume_stats(volume_id: str) -> dict: img_file = rawfile_util.img_file(volume_id) dev_stat = img_file.stat() @@ -19,15 +29,7 @@ def volume_stats(volume_id: str) -> dict: } mountpoint = volume_to_mountpoint(img_file) if mountpoint is not None: - fs_stat = os.statvfs(mountpoint) - stats.update( - { - "fs_size": fs_stat.f_frsize * fs_stat.f_blocks, - "fs_free": fs_stat.f_frsize * fs_stat.f_bfree, - "fs_files": fs_stat.f_files, - "fs_files_free": fs_stat.f_ffree, - } - ) + stats.update(path_stats(mountpoint)) return stats @@ -102,6 +104,18 @@ def dev_to_mountpoint(dev_name): return None +def mountpoint_to_dev(mountpoint): + res = subprocess.run( + f"findmnt --json --first-only --mountpoint {mountpoint}", + shell=True, + capture_output=True, + ) + if res.returncode != 0: + return None + data = json.loads(res.stdout.decode().strip()) + return data["filesystems"][0]["source"] + + def expose_metrics(): REGISTRY.register(VolumeStatsCollector()) start_http_server(9100) diff --git a/rawfile.py b/rawfile.py index 1dccc70..5d657ec 100755 --- a/rawfile.py +++ b/rawfile.py @@ -5,6 +5,7 @@ from concurrent import futures import click import grpc +import bd2fs import rawfile_servicer from consts import CONFIG from csi import csi_pb2_grpc @@ -30,13 +31,15 @@ def csi_driver(endpoint, nodeid, enable_metrics): expose_metrics() server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) csi_pb2_grpc.add_IdentityServicer_to_server( - rawfile_servicer.RawFileIdentityServicer(), server + bd2fs.Bd2FsIdentityServicer(rawfile_servicer.RawFileIdentityServicer()), server ) csi_pb2_grpc.add_NodeServicer_to_server( - rawfile_servicer.RawFileNodeServicer(node_name=nodeid), server + bd2fs.Bd2FsNodeServicer(rawfile_servicer.RawFileNodeServicer(node_name=nodeid)), + server, ) csi_pb2_grpc.add_ControllerServicer_to_server( - rawfile_servicer.RawFileControllerServicer(), server + bd2fs.Bd2FsControllerServicer(rawfile_servicer.RawFileControllerServicer()), + server, ) server.add_insecure_port(endpoint) server.start() diff --git a/rawfile_servicer.py b/rawfile_servicer.py index a01c380..39c0672 100644 --- a/rawfile_servicer.py +++ b/rawfile_servicer.py @@ -6,7 +6,7 @@ from google.protobuf.wrappers_pb2 import BoolValue import rawfile_util from consts import PROVISIONER_VERSION, PROVISIONER_NAME from csi import csi_pb2, csi_pb2_grpc -from declarative import be_mounted, be_unmounted, be_symlink, be_absent +from declarative import be_symlink, be_absent from metrics import volume_stats from orchestrator.k8s import volume_to_node, run_on_node from rawfile_util import attach_loop, detach_loops @@ -64,15 +64,16 @@ class RawFileNodeServicer(csi_pb2_grpc.NodeServicer): @log_grpc_request def NodePublishVolume(self, request, context): - mount_path = request.target_path + target_path = request.target_path staging_path = request.staging_target_path - be_mounted(dev=f"{staging_path}/device", mountpoint=mount_path) + staging_dev_path = Path(f"{staging_path}/dev") + be_symlink(path=target_path, to=staging_dev_path) return csi_pb2.NodePublishVolumeResponse() @log_grpc_request def NodeUnpublishVolume(self, request, context): - mount_path = request.target_path - be_unmounted(mount_path) + target_path = request.target_path + be_absent(path=target_path) return csi_pb2.NodeUnpublishVolumeResponse() @log_grpc_request @@ -89,62 +90,37 @@ class RawFileNodeServicer(csi_pb2_grpc.NodeServicer): img_file = rawfile_util.img_file(request.volume_id) loop_file = attach_loop(img_file) staging_path = request.staging_target_path - device_path = Path(f"{staging_path}/device") - be_symlink(path=device_path, to=loop_file) - mount_path = Path(f"{staging_path}/mount") - mount_path.mkdir(exist_ok=True) - be_mounted(dev=device_path, mountpoint=mount_path) + staging_dev_path = Path(f"{staging_path}/dev") + be_symlink(path=staging_dev_path, to=loop_file) return csi_pb2.NodeStageVolumeResponse() @log_grpc_request def NodeUnstageVolume(self, request, context): img_file = rawfile_util.img_file(request.volume_id) staging_path = request.staging_target_path - mount_path = Path(f"{staging_path}/mount") - be_unmounted(mount_path) - be_absent(mount_path) - device_path = Path(f"{staging_path}/device") - be_absent(device_path) + staging_dev_path = Path(f"{staging_path}/dev") + be_absent(staging_dev_path) detach_loops(img_file) return csi_pb2.NodeUnstageVolumeResponse() # @log_grpc_request def NodeGetVolumeStats(self, request, context): volume_id = request.volume_id - stats = volume_stats(volume_id) + stats = volume_stats(volume_id) # FIXME return csi_pb2.NodeGetVolumeStatsResponse( usage=[ csi_pb2.VolumeUsage( - available=stats["fs_free"], - total=stats["fs_size"], - used=stats["fs_size"] - stats["fs_free"], - unit=csi_pb2.VolumeUsage.Unit.BYTES, - ), - csi_pb2.VolumeUsage( - available=stats["fs_files_free"], - total=stats["fs_files"], - used=stats["fs_files"] - stats["fs_files_free"], - unit=csi_pb2.VolumeUsage.Unit.INODES, + total=stats["dev_size"], unit=csi_pb2.VolumeUsage.Unit.BYTES, ), ] ) @log_grpc_request def NodeExpandVolume(self, request, context): - volume_id = request.volume_id volume_path = request.volume_path size = request.capacity_range.required_bytes - fs_type = rawfile_util.metadata(volume_id)["fs_type"] - img_file = rawfile_util.img_file(volume_id) - for dev in rawfile_util.attached_loops(img_file): - run(f"losetup -c {dev}") - if fs_type == "ext4": - run(f"resize2fs {dev}") - elif fs_type == "btrfs": - run(f"btrfs filesystem resize max {volume_path}") - else: - raise Exception(f"Unsupported fsType: {fs_type}") - break + volume_path = Path(volume_path).resolve() + run(f"losetup -c {volume_path}") return csi_pb2.NodeExpandVolumeResponse(capacity_bytes=size) @@ -179,22 +155,17 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer): f"Unsupported access mode: {AccessModeEnum.Name(volume_capability.access_mode.mode)}", ) - access_type = volume_capability.WhichOneof("access_type") - if access_type == "mount": - fs_type = volume_capability.mount.fs_type - if fs_type == "": - fs_type = "ext4" - elif access_type == "block": - context.abort( - grpc.StatusCode.INVALID_ARGUMENT, "Block mode not supported (yet)" - ) - else: - context.abort( - grpc.StatusCode.INVALID_ARGUMENT, f"Unknown access type: {access_type}" - ) + # FIXME: re-enable access_type after bd2fs is fixed + # access_type = volume_capability.WhichOneof("access_type") + # if access_type == "block": + # pass + # else: + # context.abort( + # grpc.StatusCode.INVALID_ARGUMENT, + # "PANIC! This should be handled by bd2fs!", + # ) size = request.capacity_range.required_bytes - size = max(size, 10 * 1024 * 1024) # At least 10MB try: node_name = request.accessibility_requirements.preferred[0].segments[ @@ -211,8 +182,7 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer): ) run_on_node( - init_rawfile.as_cmd(volume_id=request.name, size=size, fs_type=fs_type), - node=node_name, + init_rawfile.as_cmd(volume_id=request.name, size=size), node=node_name, ) return csi_pb2.CreateVolumeResponse( diff --git a/remote.py b/remote.py index 90543ca..12c60da 100644 --- a/remote.py +++ b/remote.py @@ -10,7 +10,7 @@ def scrub(volume_id): @remote_fn -def init_rawfile(volume_id, size, fs_type): +def init_rawfile(volume_id, size): import time import rawfile_util from volume_schema import LATEST_SCHEMA_VERSION @@ -31,16 +31,9 @@ def init_rawfile(volume_id, size, fs_type): "created_at": time.time(), "img_file": img_file.as_posix(), "size": size, - "fs_type": fs_type, }, ) run(f"truncate -s {size} {img_file}") - if fs_type == "ext4": - run(f"mkfs.ext4 {img_file}") - elif fs_type == "btrfs": - run(f"mkfs.btrfs {img_file}") - else: - raise Exception(f"Unsupported fsType: {fs_type}") @remote_fn