Hotfix: Make side effects idempotent

This commit is contained in:
Mehran Kholdi 2020-06-14 04:17:32 +04:30
parent d1c0d49cf0
commit 8b1be18a15
3 changed files with 65 additions and 15 deletions

47
declarative.py Normal file
View File

@ -0,0 +1,47 @@
import os
from pathlib import Path
from util import run
def be_absent(path):
path = Path(path)
if not path.exists():
return
elif path.is_symlink() or path.is_file():
path.unlink()
elif path.is_dir():
path.rmdir()
# XXX: should we `shutil.rmtree(path)` instead?
else:
raise Exception("Unknown file type")
def be_symlink(path, to):
path = Path(path)
to = Path(to)
if path.exists():
if path.is_symlink():
if os.readlink(path) == str(to):
return
be_absent(path)
path.symlink_to(to)
def be_mounted(dev, mountpoint):
dev = Path(dev).resolve()
mountpoint = Path(mountpoint)
if mountpoint.is_mount():
if True: # TODO: verify that the right device is mounted
return
# noinspection PyUnreachableCode
be_unmounted(mountpoint)
run(f"mount {dev} {mountpoint}")
def be_unmounted(path):
path = Path(path)
while path.is_mount():
run(f"umount {path}")

View File

@ -5,6 +5,7 @@ from google.protobuf.wrappers_pb2 import BoolValue
import rawfile_util import rawfile_util
from csi import csi_pb2, csi_pb2_grpc from csi import csi_pb2, csi_pb2_grpc
from declarative import be_mounted, be_unmounted, be_symlink, be_absent
from orchestrator.k8s import volume_to_node, run_on_node from orchestrator.k8s import volume_to_node, run_on_node
from rawfile_util import attach_loop, detach_loops from rawfile_util import attach_loop, detach_loops
from remote import init_rawfile, scrub, expand_rawfile from remote import init_rawfile, scrub, expand_rawfile
@ -62,13 +63,13 @@ class RawFileNodeServicer(csi_pb2_grpc.NodeServicer):
def NodePublishVolume(self, request, context): def NodePublishVolume(self, request, context):
mount_path = request.target_path mount_path = request.target_path
staging_path = request.staging_target_path staging_path = request.staging_target_path
run(f"mount --bind {staging_path}/mount {mount_path}") be_mounted(dev=f"{staging_path}/device", mountpoint=mount_path)
return csi_pb2.NodePublishVolumeResponse() return csi_pb2.NodePublishVolumeResponse()
@log_grpc_request @log_grpc_request
def NodeUnpublishVolume(self, request, context): def NodeUnpublishVolume(self, request, context):
mount_path = request.target_path mount_path = request.target_path
run(f"umount {mount_path}") be_unmounted(mount_path)
return csi_pb2.NodeUnpublishVolumeResponse() return csi_pb2.NodeUnpublishVolumeResponse()
@log_grpc_request @log_grpc_request
@ -86,12 +87,10 @@ class RawFileNodeServicer(csi_pb2_grpc.NodeServicer):
loop_file = attach_loop(img_file) loop_file = attach_loop(img_file)
staging_path = request.staging_target_path staging_path = request.staging_target_path
device_path = Path(f"{staging_path}/device") device_path = Path(f"{staging_path}/device")
if not device_path.exists(): be_symlink(path=device_path, to=loop_file)
device_path.symlink_to(loop_file)
mount_path = Path(f"{staging_path}/mount") mount_path = Path(f"{staging_path}/mount")
if not mount_path.exists(): mount_path.mkdir(exist_ok=True)
mount_path.mkdir() be_mounted(dev=device_path, mountpoint=mount_path)
run(f"mount {device_path} {mount_path}")
return csi_pb2.NodeStageVolumeResponse() return csi_pb2.NodeStageVolumeResponse()
@log_grpc_request @log_grpc_request
@ -99,12 +98,10 @@ class RawFileNodeServicer(csi_pb2_grpc.NodeServicer):
img_file = rawfile_util.img_file(request.volume_id) img_file = rawfile_util.img_file(request.volume_id)
staging_path = request.staging_target_path staging_path = request.staging_target_path
mount_path = Path(f"{staging_path}/mount") mount_path = Path(f"{staging_path}/mount")
if mount_path.exists(): be_unmounted(mount_path)
run(f"umount {mount_path}") be_absent(mount_path)
mount_path.rmdir()
device_path = Path(f"{staging_path}/device") device_path = Path(f"{staging_path}/device")
if device_path.exists(): be_absent(device_path)
device_path.unlink()
detach_loops(img_file) detach_loops(img_file)
return csi_pb2.NodeUnstageVolumeResponse() return csi_pb2.NodeUnstageVolumeResponse()

View File

@ -13,17 +13,21 @@ def scrub(volume_id):
def init_rawfile(volume_id, size): def init_rawfile(volume_id, size):
import time import time
import rawfile_util import rawfile_util
from pathlib import Path
from util import run from util import run
img_dir = rawfile_util.img_dir(volume_id) img_dir = rawfile_util.img_dir(volume_id)
img_dir.mkdir(parents=False, exist_ok=False) img_dir.mkdir(exist_ok=True)
img_file = f"{img_dir}/disk.img" img_file = Path(f"{img_dir}/disk.img")
if img_file.exists():
return
rawfile_util.patch_metadata( rawfile_util.patch_metadata(
volume_id, volume_id,
{ {
"volume_id": volume_id, "volume_id": volume_id,
"created_at": time.time(), "created_at": time.time(),
"img_file": img_file, "img_file": img_file.as_posix(),
"size": size, "size": size,
}, },
) )
@ -37,6 +41,8 @@ def expand_rawfile(volume_id, size):
from util import run from util import run
img_file = rawfile_util.img_file(volume_id) img_file = rawfile_util.img_file(volume_id)
if rawfile_util.metadata(volume_id)["size"] >= size:
return
rawfile_util.patch_metadata( rawfile_util.patch_metadata(
volume_id, {"size": size}, volume_id, {"size": size},
) )