Extract blockdevice-to-filesystem logic from rawfile servicer

Summary: So that it's possible to use it with any other blockdevice provider.

Test Plan: N/A

Reviewers: sina_rad, h.marvi, mhyousefi, s.afshari

Differential Revision: https://phab.hamravesh.ir/D870
This commit is contained in:
Mehran Kholdi 2020-11-06 18:14:47 +03:30
parent 01a35354b6
commit c58dd14bf7
6 changed files with 311 additions and 74 deletions

218
bd2fs.py Normal file
View File

@ -0,0 +1,218 @@
from pathlib import Path
import grpc
from csi import csi_pb2, csi_pb2_grpc
from csi.csi_pb2 import (
NodeStageVolumeRequest,
NodePublishVolumeRequest,
NodeUnpublishVolumeRequest,
NodeUnstageVolumeRequest,
NodeExpandVolumeRequest,
CreateVolumeRequest,
)
from declarative import (
be_mounted,
be_unmounted,
be_absent,
be_formatted,
be_fs_expanded,
current_fs,
)
from metrics import path_stats, mountpoint_to_dev
from util import log_grpc_request
def get_fs(request):
fs_type = request.volume_capability.mount.fs_type
if fs_type == "":
fs_type = "ext4"
return fs_type
class Bd2FsIdentityServicer(csi_pb2_grpc.IdentityServicer):
def __init__(self, bds):
self.bds = bds
@log_grpc_request
def GetPluginInfo(self, request, context):
return self.bds.GetPluginInfo(request, context)
@log_grpc_request
def GetPluginCapabilities(self, request, context):
return self.bds.GetPluginCapabilities(request, context)
# @log_grpc_request
def Probe(self, request, context):
return self.bds.Probe(request, context)
class Bd2FsNodeServicer(csi_pb2_grpc.NodeServicer):
def __init__(self, bds):
self.bds = bds
# @log_grpc_request
def NodeGetCapabilities(self, request, context):
return self.bds.NodeGetCapabilities(request, context)
@log_grpc_request
def NodePublishVolume(self, request, context):
staging_dev = f"{request.staging_target_path}/device"
Path(request.target_path).mkdir(exist_ok=True)
be_mounted(dev=staging_dev, mountpoint=request.target_path)
return csi_pb2.NodePublishVolumeResponse()
@log_grpc_request
def NodeUnpublishVolume(self, request, context):
be_unmounted(request.target_path)
be_absent(request.target_path)
return csi_pb2.NodeUnpublishVolumeResponse()
@log_grpc_request
def NodeGetInfo(self, request, context):
return self.bds.NodeGetInfo(request, context)
@log_grpc_request
def NodeStageVolume(self, request, context):
bd_stage_request = NodeStageVolumeRequest()
bd_stage_request.CopyFrom(request)
bd_stage_request.staging_target_path = f"{request.staging_target_path}/block"
Path(bd_stage_request.staging_target_path).mkdir(exist_ok=True)
self.bds.NodeStageVolume(bd_stage_request, context)
bd_publish_request = NodePublishVolumeRequest()
bd_publish_request.volume_id = request.volume_id
bd_publish_request.publish_context.update(request.publish_context)
bd_publish_request.staging_target_path = bd_stage_request.staging_target_path
bd_publish_request.target_path = f"{request.staging_target_path}/device"
bd_publish_request.volume_capability.CopyFrom(request.volume_capability)
bd_publish_request.readonly = False
bd_publish_request.secrets.update(request.secrets)
bd_publish_request.volume_context.update(request.volume_context)
self.bds.NodePublishVolume(bd_publish_request, context)
mount_path = f"{request.staging_target_path}/mount"
Path(mount_path).mkdir(exist_ok=True)
be_formatted(dev=bd_publish_request.target_path, fs=get_fs(request))
be_mounted(dev=bd_publish_request.target_path, mountpoint=mount_path)
return csi_pb2.NodeStageVolumeResponse()
@log_grpc_request
def NodeUnstageVolume(self, request, context):
mount_path = f"{request.staging_target_path}/mount"
be_unmounted(mount_path)
be_absent(mount_path)
bd_unpublish_request = NodeUnpublishVolumeRequest()
bd_unpublish_request.volume_id = request.volume_id
bd_unpublish_request.target_path = f"{request.staging_target_path}/device"
self.bds.NodeUnpublishVolume(bd_unpublish_request, context)
bd_unstage_request = NodeUnstageVolumeRequest()
bd_unstage_request.CopyFrom(request)
bd_unstage_request.staging_target_path = f"{request.staging_target_path}/block"
self.bds.NodeUnstageVolume(bd_unstage_request, context)
be_absent(bd_unstage_request.staging_target_path)
return csi_pb2.NodeUnstageVolumeResponse()
# @log_grpc_request
def NodeGetVolumeStats(self, request, context):
volume_path = request.volume_path
stats = path_stats(volume_path)
return csi_pb2.NodeGetVolumeStatsResponse(
usage=[
csi_pb2.VolumeUsage(
available=stats["fs_free"],
total=stats["fs_size"],
used=stats["fs_size"] - stats["fs_free"],
unit=csi_pb2.VolumeUsage.Unit.BYTES,
),
csi_pb2.VolumeUsage(
available=stats["fs_files_free"],
total=stats["fs_files"],
used=stats["fs_files"] - stats["fs_files_free"],
unit=csi_pb2.VolumeUsage.Unit.INODES,
),
]
)
@log_grpc_request
def NodeExpandVolume(self, request, context):
# FIXME: hacky way to determine if `volume_path` is staged path, or the mount itself
# Based on CSI 1.4.0 specifications:
# > The staging_target_path field is not required, for backwards compatibility, but the CO SHOULD supply it.
# Apparently, k8s 1.18 does not supply it. So:
dev_path = mountpoint_to_dev(request.volume_path)
if dev_path is None:
dev_path = f"{request.volume_path}/device"
bd_request = NodeExpandVolumeRequest()
bd_request.CopyFrom(request)
bd_request.volume_path = dev_path
self.bds.NodeExpandVolume(bd_request, context)
# Based on CSI 1.4.0 specifications:
# > If volume_capability is omitted the SP MAY determine
# > access_type from given volume_path for the volume and perform
# > node expansion.
# Apparently k8s 1.18 omits this field.
fs_type = current_fs(bd_request.volume_path)
be_fs_expanded(fs_type, bd_request.volume_path, request.volume_path)
size = request.capacity_range.required_bytes
return csi_pb2.NodeExpandVolumeResponse(capacity_bytes=size)
class Bd2FsControllerServicer(csi_pb2_grpc.ControllerServicer):
def __init__(self, bds):
self.bds = bds
@log_grpc_request
def ControllerGetCapabilities(self, request, context):
return self.bds.ControllerGetCapabilities(request, context)
@log_grpc_request
def CreateVolume(self, request, context):
# TODO: volume_capabilities
if len(request.volume_capabilities) != 1:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT, "Exactly one cap is supported"
)
volume_capability = request.volume_capabilities[0]
AccessModeEnum = csi_pb2.VolumeCapability.AccessMode.Mode
if volume_capability.access_mode.mode not in [
AccessModeEnum.SINGLE_NODE_WRITER
]:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
f"Unsupported access mode: {AccessModeEnum.Name(volume_capability.access_mode.mode)}",
)
access_type = volume_capability.WhichOneof("access_type")
assert access_type == "mount"
bd_request = CreateVolumeRequest()
bd_request.CopyFrom(request)
bd_request.capacity_range.required_bytes = max(
request.capacity_range.required_bytes, 10 * 1024 * 1024
) # At least 10MB
# FIXME: update access_type
# bd_request.volume_capabilities[0].block = ""
# bd_request.volume_capabilities[0].mount = None
return self.bds.CreateVolume(bd_request, context)
@log_grpc_request
def DeleteVolume(self, request, context):
return self.bds.DeleteVolume(request, context)
@log_grpc_request
def ControllerExpandVolume(self, request, context):
response = self.bds.ControllerExpandVolume(request, context)
assert response.node_expansion_required
return response

View File

@ -1,4 +1,5 @@
import os
import subprocess
from pathlib import Path
from util import run
@ -46,3 +47,41 @@ def be_unmounted(path):
path = Path(path)
while path.is_mount():
run(f"umount {path}")
def current_fs(device):
res = subprocess.run(
f"blkid -o value -s TYPE {device}", shell=True, capture_output=True
)
if res.returncode == 2: # specified token was not found
return None
return res.stdout.decode().strip()
def be_formatted(dev, fs):
def init_fs(device, filesystem):
if fs == "ext4":
run(f"mkfs.ext4 {device}")
elif fs == "btrfs":
run(f"mkfs.btrfs {device}")
else:
raise Exception(f"Unsupported fs type: {filesystem}")
dev = Path(dev).resolve()
current = current_fs(dev)
if current is None:
init_fs(dev, fs)
else:
if current != fs:
raise Exception(f"Existing filesystem does not match: {current}/{fs}")
def be_fs_expanded(fs, dev, path):
dev = Path(dev).resolve()
path = Path(path).resolve()
if fs == "ext4":
run(f"resize2fs {dev}")
elif fs == "btrfs":
run(f"btrfs filesystem resize max {path}")
else:
raise Exception(f"Unsupported fsType: {fs}")

View File

@ -10,6 +10,16 @@ import rawfile_util
from rawfile_util import attached_loops
def path_stats(path):
fs_stat = os.statvfs(path)
return {
"fs_size": fs_stat.f_frsize * fs_stat.f_blocks,
"fs_free": fs_stat.f_frsize * fs_stat.f_bfree,
"fs_files": fs_stat.f_files,
"fs_files_free": fs_stat.f_ffree,
}
def volume_stats(volume_id: str) -> dict:
img_file = rawfile_util.img_file(volume_id)
dev_stat = img_file.stat()
@ -19,15 +29,7 @@ def volume_stats(volume_id: str) -> dict:
}
mountpoint = volume_to_mountpoint(img_file)
if mountpoint is not None:
fs_stat = os.statvfs(mountpoint)
stats.update(
{
"fs_size": fs_stat.f_frsize * fs_stat.f_blocks,
"fs_free": fs_stat.f_frsize * fs_stat.f_bfree,
"fs_files": fs_stat.f_files,
"fs_files_free": fs_stat.f_ffree,
}
)
stats.update(path_stats(mountpoint))
return stats
@ -102,6 +104,18 @@ def dev_to_mountpoint(dev_name):
return None
def mountpoint_to_dev(mountpoint):
res = subprocess.run(
f"findmnt --json --first-only --mountpoint {mountpoint}",
shell=True,
capture_output=True,
)
if res.returncode != 0:
return None
data = json.loads(res.stdout.decode().strip())
return data["filesystems"][0]["source"]
def expose_metrics():
REGISTRY.register(VolumeStatsCollector())
start_http_server(9100)

View File

@ -5,6 +5,7 @@ from concurrent import futures
import click
import grpc
import bd2fs
import rawfile_servicer
from consts import CONFIG
from csi import csi_pb2_grpc
@ -30,13 +31,15 @@ def csi_driver(endpoint, nodeid, enable_metrics):
expose_metrics()
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
csi_pb2_grpc.add_IdentityServicer_to_server(
rawfile_servicer.RawFileIdentityServicer(), server
bd2fs.Bd2FsIdentityServicer(rawfile_servicer.RawFileIdentityServicer()), server
)
csi_pb2_grpc.add_NodeServicer_to_server(
rawfile_servicer.RawFileNodeServicer(node_name=nodeid), server
bd2fs.Bd2FsNodeServicer(rawfile_servicer.RawFileNodeServicer(node_name=nodeid)),
server,
)
csi_pb2_grpc.add_ControllerServicer_to_server(
rawfile_servicer.RawFileControllerServicer(), server
bd2fs.Bd2FsControllerServicer(rawfile_servicer.RawFileControllerServicer()),
server,
)
server.add_insecure_port(endpoint)
server.start()

View File

@ -6,7 +6,7 @@ from google.protobuf.wrappers_pb2 import BoolValue
import rawfile_util
from consts import PROVISIONER_VERSION, PROVISIONER_NAME
from csi import csi_pb2, csi_pb2_grpc
from declarative import be_mounted, be_unmounted, be_symlink, be_absent
from declarative import be_symlink, be_absent
from metrics import volume_stats
from orchestrator.k8s import volume_to_node, run_on_node
from rawfile_util import attach_loop, detach_loops
@ -64,15 +64,16 @@ class RawFileNodeServicer(csi_pb2_grpc.NodeServicer):
@log_grpc_request
def NodePublishVolume(self, request, context):
mount_path = request.target_path
target_path = request.target_path
staging_path = request.staging_target_path
be_mounted(dev=f"{staging_path}/device", mountpoint=mount_path)
staging_dev_path = Path(f"{staging_path}/dev")
be_symlink(path=target_path, to=staging_dev_path)
return csi_pb2.NodePublishVolumeResponse()
@log_grpc_request
def NodeUnpublishVolume(self, request, context):
mount_path = request.target_path
be_unmounted(mount_path)
target_path = request.target_path
be_absent(path=target_path)
return csi_pb2.NodeUnpublishVolumeResponse()
@log_grpc_request
@ -89,62 +90,37 @@ class RawFileNodeServicer(csi_pb2_grpc.NodeServicer):
img_file = rawfile_util.img_file(request.volume_id)
loop_file = attach_loop(img_file)
staging_path = request.staging_target_path
device_path = Path(f"{staging_path}/device")
be_symlink(path=device_path, to=loop_file)
mount_path = Path(f"{staging_path}/mount")
mount_path.mkdir(exist_ok=True)
be_mounted(dev=device_path, mountpoint=mount_path)
staging_dev_path = Path(f"{staging_path}/dev")
be_symlink(path=staging_dev_path, to=loop_file)
return csi_pb2.NodeStageVolumeResponse()
@log_grpc_request
def NodeUnstageVolume(self, request, context):
img_file = rawfile_util.img_file(request.volume_id)
staging_path = request.staging_target_path
mount_path = Path(f"{staging_path}/mount")
be_unmounted(mount_path)
be_absent(mount_path)
device_path = Path(f"{staging_path}/device")
be_absent(device_path)
staging_dev_path = Path(f"{staging_path}/dev")
be_absent(staging_dev_path)
detach_loops(img_file)
return csi_pb2.NodeUnstageVolumeResponse()
# @log_grpc_request
def NodeGetVolumeStats(self, request, context):
volume_id = request.volume_id
stats = volume_stats(volume_id)
stats = volume_stats(volume_id) # FIXME
return csi_pb2.NodeGetVolumeStatsResponse(
usage=[
csi_pb2.VolumeUsage(
available=stats["fs_free"],
total=stats["fs_size"],
used=stats["fs_size"] - stats["fs_free"],
unit=csi_pb2.VolumeUsage.Unit.BYTES,
),
csi_pb2.VolumeUsage(
available=stats["fs_files_free"],
total=stats["fs_files"],
used=stats["fs_files"] - stats["fs_files_free"],
unit=csi_pb2.VolumeUsage.Unit.INODES,
total=stats["dev_size"], unit=csi_pb2.VolumeUsage.Unit.BYTES,
),
]
)
@log_grpc_request
def NodeExpandVolume(self, request, context):
volume_id = request.volume_id
volume_path = request.volume_path
size = request.capacity_range.required_bytes
fs_type = rawfile_util.metadata(volume_id)["fs_type"]
img_file = rawfile_util.img_file(volume_id)
for dev in rawfile_util.attached_loops(img_file):
run(f"losetup -c {dev}")
if fs_type == "ext4":
run(f"resize2fs {dev}")
elif fs_type == "btrfs":
run(f"btrfs filesystem resize max {volume_path}")
else:
raise Exception(f"Unsupported fsType: {fs_type}")
break
volume_path = Path(volume_path).resolve()
run(f"losetup -c {volume_path}")
return csi_pb2.NodeExpandVolumeResponse(capacity_bytes=size)
@ -179,22 +155,17 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer):
f"Unsupported access mode: {AccessModeEnum.Name(volume_capability.access_mode.mode)}",
)
access_type = volume_capability.WhichOneof("access_type")
if access_type == "mount":
fs_type = volume_capability.mount.fs_type
if fs_type == "":
fs_type = "ext4"
elif access_type == "block":
context.abort(
grpc.StatusCode.INVALID_ARGUMENT, "Block mode not supported (yet)"
)
else:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT, f"Unknown access type: {access_type}"
)
# FIXME: re-enable access_type after bd2fs is fixed
# access_type = volume_capability.WhichOneof("access_type")
# if access_type == "block":
# pass
# else:
# context.abort(
# grpc.StatusCode.INVALID_ARGUMENT,
# "PANIC! This should be handled by bd2fs!",
# )
size = request.capacity_range.required_bytes
size = max(size, 10 * 1024 * 1024) # At least 10MB
try:
node_name = request.accessibility_requirements.preferred[0].segments[
@ -211,8 +182,7 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer):
)
run_on_node(
init_rawfile.as_cmd(volume_id=request.name, size=size, fs_type=fs_type),
node=node_name,
init_rawfile.as_cmd(volume_id=request.name, size=size), node=node_name,
)
return csi_pb2.CreateVolumeResponse(

View File

@ -10,7 +10,7 @@ def scrub(volume_id):
@remote_fn
def init_rawfile(volume_id, size, fs_type):
def init_rawfile(volume_id, size):
import time
import rawfile_util
from volume_schema import LATEST_SCHEMA_VERSION
@ -31,16 +31,9 @@ def init_rawfile(volume_id, size, fs_type):
"created_at": time.time(),
"img_file": img_file.as_posix(),
"size": size,
"fs_type": fs_type,
},
)
run(f"truncate -s {size} {img_file}")
if fs_type == "ext4":
run(f"mkfs.ext4 {img_file}")
elif fs_type == "btrfs":
run(f"mkfs.btrfs {img_file}")
else:
raise Exception(f"Unsupported fsType: {fs_type}")
@remote_fn