Implement metadata schema migration

Summary: Trying to add custom `fsType`s, new metadata fields need to be stored. We need a mechanism to migrate existing volume metadata.

Test Plan:
- Install the chart using an older image tag like `36fc480`
- Create and use a pvc
- Verify that the volume's metadata file, located at `/var/csi/rawfile/pvc-.../disk.meta` does not contain the `schema_version` field
- Upgrade the chart to use the image tag `feature-schema-migration`
- Wait until all node pods are upgraded
- Verify that the volume's metadata file contains the new `schema_version` field

Reviewers: bghadiri, h.marvi, mhyousefi, sina_rad

Reviewed By: bghadiri, h.marvi, mhyousefi

Differential Revision: https://phab.hamravesh.ir/D832
This commit is contained in:
Mehran Kholdi 2020-07-16 19:54:04 +04:30
parent d606f8f064
commit 0095c0e83a
4 changed files with 44 additions and 3 deletions

View File

@ -9,6 +9,7 @@ import rawfile_servicer
from consts import CONFIG from consts import CONFIG
from csi import csi_pb2_grpc from csi import csi_pb2_grpc
from metrics import expose_metrics from metrics import expose_metrics
from rawfile_util import migrate_all_volume_schemas
@click.group() @click.group()
@ -24,6 +25,7 @@ def cli(image_repository, image_tag):
@click.option("--nodeid", envvar="NODE_ID") @click.option("--nodeid", envvar="NODE_ID")
@click.option("--enable-metrics/--disable-metrics", default=True) @click.option("--enable-metrics/--disable-metrics", default=True)
def csi_driver(endpoint, nodeid, enable_metrics): def csi_driver(endpoint, nodeid, enable_metrics):
migrate_all_volume_schemas()
if enable_metrics: if enable_metrics:
expose_metrics() expose_metrics()
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

View File

@ -4,6 +4,7 @@ from os.path import basename, dirname
from pathlib import Path from pathlib import Path
from consts import DATA_DIR from consts import DATA_DIR
from volume_schema import migrate_to, LATEST_SCHEMA_VERSION
from util import run, run_out from util import run, run_out
@ -26,11 +27,21 @@ def img_file(volume_id):
return Path(metadata(volume_id)["img_file"]) return Path(metadata(volume_id)["img_file"])
def patch_metadata(volume_id, obj): def update_metadata(volume_id: str, obj: dict) -> dict:
meta_file(volume_id).write_text(json.dumps(obj))
return obj
def patch_metadata(volume_id: str, obj: dict) -> dict:
old_data = metadata(volume_id) old_data = metadata(volume_id)
new_data = {**old_data, **obj} new_data = {**old_data, **obj}
meta_file(volume_id).write_text(json.dumps(new_data)) return update_metadata(volume_id, new_data)
return new_data
def migrate_metadata(volume_id, target_version):
old_data = metadata(volume_id)
new_data = migrate_to(old_data, target_version)
return update_metadata(volume_id, new_data)
def attached_loops(file: str) -> [str]: def attached_loops(file: str) -> [str]:
@ -66,3 +77,9 @@ def detach_loops(file) -> None:
def list_all_volumes(): def list_all_volumes():
metas = glob.glob(f"{DATA_DIR}/*/disk.meta") metas = glob.glob(f"{DATA_DIR}/*/disk.meta")
return [basename(dirname(meta)) for meta in metas] return [basename(dirname(meta)) for meta in metas]
def migrate_all_volume_schemas():
target_version = LATEST_SCHEMA_VERSION
for volume_id in list_all_volumes():
migrate_metadata(volume_id, target_version)

View File

@ -13,6 +13,7 @@ def scrub(volume_id):
def init_rawfile(volume_id, size): def init_rawfile(volume_id, size):
import time import time
import rawfile_util import rawfile_util
from volume_schema import LATEST_SCHEMA_VERSION
from pathlib import Path from pathlib import Path
from util import run from util import run
@ -25,6 +26,7 @@ def init_rawfile(volume_id, size):
rawfile_util.patch_metadata( rawfile_util.patch_metadata(
volume_id, volume_id,
{ {
"schema_version": LATEST_SCHEMA_VERSION,
"volume_id": volume_id, "volume_id": volume_id,
"created_at": time.time(), "created_at": time.time(),
"img_file": img_file.as_posix(), "img_file": img_file.as_posix(),

20
volume_schema.py Normal file
View File

@ -0,0 +1,20 @@
import sys
LATEST_SCHEMA_VERSION = 1 # type: int
def migrate_0_to_1(data: dict) -> dict:
data["schema_version"] = 1
return data
def migrate_to(data: dict, version: int) -> dict:
current = data.get("schema_version", 0)
if current > version:
raise Exception(
f"Current schema version ({current}) is newer than target schema version ({version})"
)
for i in range(current, version):
migrate_fn = getattr(sys.modules[__name__], f"migrate_{i}_to_{i + 1}")
data = migrate_fn(data)
return data