from pathlib import Path import grpc from csi import csi_pb2, csi_pb2_grpc from csi.csi_pb2 import ( NodeStageVolumeRequest, NodePublishVolumeRequest, NodeUnpublishVolumeRequest, NodeUnstageVolumeRequest, NodeExpandVolumeRequest, CreateVolumeRequest, ) from google.protobuf.timestamp_pb2 import Timestamp from declarative import ( be_mounted, be_unmounted, be_absent, be_formatted, be_fs_expanded, ) from fs_util import path_stats, mountpoint_to_dev from orchestrator.k8s import volume_to_node, run_on_node from remote import btrfs_create_snapshot, btrfs_delete_snapshot from util import log_grpc_request def get_fs(request): fs_type = request.volume_capability.mount.fs_type if fs_type == "": fs_type = "ext4" return fs_type class Bd2FsIdentityServicer(csi_pb2_grpc.IdentityServicer): def __init__(self, bds: csi_pb2_grpc.IdentityServicer): self.bds = bds @log_grpc_request def GetPluginInfo(self, request, context): return self.bds.GetPluginInfo(request, context) @log_grpc_request def GetPluginCapabilities(self, request, context): return self.bds.GetPluginCapabilities(request, context) # @log_grpc_request def Probe(self, request, context): return self.bds.Probe(request, context) class Bd2FsNodeServicer(csi_pb2_grpc.NodeServicer): def __init__(self, bds: csi_pb2_grpc.NodeServicer): self.bds = bds # @log_grpc_request def NodeGetCapabilities(self, request, context): return self.bds.NodeGetCapabilities(request, context) @log_grpc_request def NodePublishVolume(self, request, context): staging_dev = f"{request.staging_target_path}/device" Path(request.target_path).mkdir(exist_ok=True) be_mounted(dev=staging_dev, mountpoint=request.target_path) return csi_pb2.NodePublishVolumeResponse() @log_grpc_request def NodeUnpublishVolume(self, request, context): be_unmounted(request.target_path) be_absent(request.target_path) return csi_pb2.NodeUnpublishVolumeResponse() @log_grpc_request def NodeGetInfo(self, request, context): return self.bds.NodeGetInfo(request, context) @log_grpc_request def NodeStageVolume(self, request, context): bd_stage_request = NodeStageVolumeRequest() bd_stage_request.CopyFrom(request) bd_stage_request.staging_target_path = f"{request.staging_target_path}/block" Path(bd_stage_request.staging_target_path).mkdir(exist_ok=True) self.bds.NodeStageVolume(bd_stage_request, context) bd_publish_request = NodePublishVolumeRequest() bd_publish_request.volume_id = request.volume_id bd_publish_request.publish_context.update(request.publish_context) bd_publish_request.staging_target_path = bd_stage_request.staging_target_path bd_publish_request.target_path = f"{request.staging_target_path}/device" bd_publish_request.volume_capability.CopyFrom(request.volume_capability) bd_publish_request.readonly = False bd_publish_request.secrets.update(request.secrets) bd_publish_request.volume_context.update(request.volume_context) self.bds.NodePublishVolume(bd_publish_request, context) mount_path = f"{request.staging_target_path}/mount" Path(mount_path).mkdir(exist_ok=True) be_formatted(dev=bd_publish_request.target_path, fs=get_fs(request)) be_mounted(dev=bd_publish_request.target_path, mountpoint=mount_path) return csi_pb2.NodeStageVolumeResponse() @log_grpc_request def NodeUnstageVolume(self, request, context): mount_path = f"{request.staging_target_path}/mount" be_unmounted(mount_path) be_absent(mount_path) bd_unpublish_request = NodeUnpublishVolumeRequest() bd_unpublish_request.volume_id = request.volume_id bd_unpublish_request.target_path = f"{request.staging_target_path}/device" self.bds.NodeUnpublishVolume(bd_unpublish_request, context) bd_unstage_request = NodeUnstageVolumeRequest() bd_unstage_request.CopyFrom(request) bd_unstage_request.staging_target_path = f"{request.staging_target_path}/block" self.bds.NodeUnstageVolume(bd_unstage_request, context) be_absent(bd_unstage_request.staging_target_path) return csi_pb2.NodeUnstageVolumeResponse() # @log_grpc_request def NodeGetVolumeStats(self, request, context): volume_path = request.volume_path stats = path_stats(volume_path) return csi_pb2.NodeGetVolumeStatsResponse( usage=[ csi_pb2.VolumeUsage( available=stats["fs_avail"], total=stats["fs_size"], used=stats["fs_size"] - stats["fs_avail"], unit=csi_pb2.VolumeUsage.Unit.BYTES, ), csi_pb2.VolumeUsage( available=stats["fs_files_avail"], total=stats["fs_files"], used=stats["fs_files"] - stats["fs_files_avail"], unit=csi_pb2.VolumeUsage.Unit.INODES, ), ] ) @log_grpc_request def NodeExpandVolume(self, request, context): # FIXME: hacky way to determine if `volume_path` is staged path, or the mount itself # Based on CSI 1.4.0 specifications: # > The staging_target_path field is not required, for backwards compatibility, but the CO SHOULD supply it. # Apparently, k8s 1.18 does not supply it. So: dev_path = mountpoint_to_dev(request.volume_path) volume_path = request.volume_path if dev_path is None: dev_path = f"{request.volume_path}/device" volume_path = f"{request.volume_path}/mount" bd_request = NodeExpandVolumeRequest() bd_request.CopyFrom(request) bd_request.volume_path = dev_path self.bds.NodeExpandVolume(bd_request, context) # Based on CSI 1.4.0 specifications: # > If volume_capability is omitted the SP MAY determine # > access_type from given volume_path for the volume and perform # > node expansion. # Apparently k8s 1.18 omits this field. be_fs_expanded(bd_request.volume_path, volume_path) size = request.capacity_range.required_bytes return csi_pb2.NodeExpandVolumeResponse(capacity_bytes=size) class Bd2FsControllerServicer(csi_pb2_grpc.ControllerServicer): def __init__(self, bds: csi_pb2_grpc.ControllerServicer): self.bds = bds @log_grpc_request def ControllerGetCapabilities(self, request, context): return self.bds.ControllerGetCapabilities(request, context) @log_grpc_request def CreateVolume(self, request, context): # TODO: volume_capabilities if len(request.volume_capabilities) != 1: context.abort( grpc.StatusCode.INVALID_ARGUMENT, "Exactly one cap is supported" ) volume_capability = request.volume_capabilities[0] AccessModeEnum = csi_pb2.VolumeCapability.AccessMode.Mode if volume_capability.access_mode.mode not in [ AccessModeEnum.SINGLE_NODE_WRITER ]: context.abort( grpc.StatusCode.INVALID_ARGUMENT, f"Unsupported access mode: {AccessModeEnum.Name(volume_capability.access_mode.mode)}", ) access_type = volume_capability.WhichOneof("access_type") assert access_type == "mount" bd_request = CreateVolumeRequest() bd_request.CopyFrom(request) bd_request.capacity_range.required_bytes = max( request.capacity_range.required_bytes, 10 * 1024 * 1024 ) # At least 10MB # FIXME: update access_type # bd_request.volume_capabilities[0].block = "" # bd_request.volume_capabilities[0].mount = None return self.bds.CreateVolume(bd_request, context) @log_grpc_request def DeleteVolume(self, request, context): return self.bds.DeleteVolume(request, context) def GetCapacity(self, request, context): return self.bds.GetCapacity(request, context) @log_grpc_request def ControllerExpandVolume(self, request, context): response = self.bds.ControllerExpandVolume(request, context) assert response.node_expansion_required return response @log_grpc_request def CreateSnapshot(self, request: csi_pb2.CreateSnapshotRequest, context): volume_id = request.source_volume_id name = request.name snapshot_id, creation_time_ns = btrfs_create_snapshot( volume_id=volume_id, name=name ) nano = 10**9 return csi_pb2.CreateSnapshotResponse( snapshot=csi_pb2.Snapshot( size_bytes=0, snapshot_id=snapshot_id, source_volume_id=volume_id, creation_time=Timestamp( seconds=creation_time_ns // nano, nanos=creation_time_ns % nano ), ready_to_use=True, ) ) @log_grpc_request def DeleteSnapshot(self, request: csi_pb2.DeleteSnapshotRequest, context): snapshot_id = request.snapshot_id volume_id, name = snapshot_id.rsplit("/", 1) btrfs_delete_snapshot(volume_id=volume_id, name=name) return csi_pb2.DeleteSnapshotResponse()