Allocate new volumes remotely

This commit is contained in:
Mehran Kholdi 2020-04-24 16:18:09 +04:30
parent 73a618228b
commit 9de82a9b47
9 changed files with 199 additions and 10 deletions

View File

@ -49,3 +49,28 @@ roleRef:
kind: ClusterRole
name: rawfile-external-provisioner-runner
apiGroup: rbac.authorization.k8s.io
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: rawfile-handler
rules:
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: rawfile-handler
subjects:
- kind: ServiceAccount
name: rawfile-csi-controller
namespace: kube-system
roleRef:
kind: ClusterRole
name: rawfile-handler
apiGroup: rbac.authorization.k8s.io

0
orchestrator/__init__.py Normal file
View File

65
orchestrator/k8s.py Normal file
View File

@ -0,0 +1,65 @@
import json
import uuid
from pathlib import Path
from time import sleep
import pykube
import yaml
from munch import Munch
api = pykube.HTTPClient(pykube.KubeConfig.from_env())
def volume_to_node(volume_id):
pv = pykube.PersistentVolume.objects(api).get_by_name(name=volume_id)
pv = Munch.fromDict(pv.obj)
node_name = pv.spec.nodeAffinity.required.nodeSelectorTerms[0].matchExpressions[0][
"values"
][0]
expected_node_affainity = yaml.safe_load(
f"""
required:
nodeSelectorTerms:
- matchExpressions:
- key: hostname
operator: In
values:
- {node_name}
"""
)
assert pv.spec.nodeAffinity == expected_node_affainity
return node_name
def wait_for(pred, desc=""):
print(f"Waiting for {desc}", end="", flush=True)
while not pred():
print(".", end="", flush=True)
sleep(0.5)
print(" done")
def run_on_node(fn, node):
name = f"task-{uuid.uuid4()}"
ctx = {
"name": name,
"namespace": "kube-system", # FIXME
"nodeSelector": json.dumps({"kubernetes.io/hostname": node}),
"cmd": json.dumps(fn),
}
template = Path("./templates/task.yaml").read_bytes().decode()
manifest = template.format(**ctx)
obj = yaml.safe_load(manifest)
task_pod = pykube.Pod(api, obj)
task_pod.create()
def is_finished():
task_pod.reload()
status = task_pod.obj["status"]
if status["phase"] in ["Succeeded", "Failed"]:
return True
return False
wait_for(is_finished, "task to finish")
if task_pod.obj["status"]["phase"] != "Succeeded":
raise Exception(f"Task {name} failed")

View File

@ -4,7 +4,9 @@ from google.protobuf.wrappers_pb2 import BoolValue
from consts import DATA_DIR
from csi import csi_pb2, csi_pb2_grpc
from orchestrator.k8s import volume_to_node, run_on_node
from util import log_grpc_request, run
from remote import init_rawfile, scrub
NODE_NAME_TOPOLOGY_KEY = "hostname"
@ -48,10 +50,7 @@ class RawFileNodeServicer(csi_pb2_grpc.NodeServicer):
mount_path = request.target_path
img_dir = Path(f"{DATA_DIR}/{request.volume_id}")
img_file = Path(f"{img_dir}/raw.img")
img_dir.mkdir(parents=False, exist_ok=True)
if not img_file.exists():
run(f"truncate -s1G {img_file}")
run(f"mkfs.ext4 {img_file}")
run(f"mount {img_file} {mount_path}")
return csi_pb2.NodePublishVolumeResponse()
@ -81,16 +80,20 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer):
@log_grpc_request
def CreateVolume(self, request, context):
# TODO: capacity_range
# TODO: volume_capabilities
size = request.capacity_range.required_bytes
node_name = request.accessibility_requirements.preferred[0].segments[
NODE_NAME_TOPOLOGY_KEY
]
run_on_node(
init_rawfile.as_cmd(volume_id=request.name, size=size), node=node_name
)
return csi_pb2.CreateVolumeResponse(
volume=csi_pb2.Volume(
volume_id=request.name,
capacity_bytes=0,
capacity_bytes=size,
accessible_topology=[
csi_pb2.Topology(segments={NODE_NAME_TOPOLOGY_KEY: node_name})
],
@ -99,6 +102,6 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer):
@log_grpc_request
def DeleteVolume(self, request, context):
pv_name = request.volume_id
# TODO: Run a pod on that node to scrub the data
node_name = volume_to_node(request.volume_id)
run_on_node(scrub.as_cmd(volume_id=request.volume_id), node=node_name)
return csi_pb2.DeleteVolumeResponse()

20
remote.py Normal file
View File

@ -0,0 +1,20 @@
from util import remote_fn
@remote_fn
def scrub(volume_id):
# TODO: stub
pass
@remote_fn
def init_rawfile(volume_id, size):
from util import run
from consts import DATA_DIR
from pathlib import Path
img_dir = Path(f"{DATA_DIR}/{volume_id}")
img_dir.mkdir(parents=False, exist_ok=False)
img_file = Path(f"{img_dir}/raw.img")
run(f"truncate -s {size} {img_file}")
run(f"mkfs.ext4 {img_file}")

View File

@ -1,3 +1,6 @@
grpcio-tools
grpcio
click
pyyaml
pykube-ng
munch

View File

@ -4,11 +4,19 @@
#
# pip-compile
#
certifi==2020.4.5.1 # via requests
chardet==3.0.4 # via requests
click==7.1.1 # via -r requirements.in
grpcio-tools==1.28.1 # via -r requirements.in
grpcio==1.28.1 # via -r requirements.in, grpcio-tools
idna==2.9 # via requests
munch==2.5.0 # via -r requirements.in
protobuf==3.11.3 # via grpcio-tools
six==1.14.0 # via grpcio, protobuf
pykube-ng==20.4.1 # via -r requirements.in
pyyaml==5.3.1 # via -r requirements.in, pykube-ng
requests==2.23.0 # via pykube-ng
six==1.14.0 # via grpcio, munch, protobuf
urllib3==1.25.9 # via requests
# The following packages are considered to be unsafe in a requirements file:
# setuptools

37
templates/task.yaml Normal file
View File

@ -0,0 +1,37 @@
apiVersion: v1
kind: Pod
metadata:
name: {name}
namespace: {namespace}
spec:
# FIXME: hardcoded
serviceAccount: rawfile-csi-controller
restartPolicy: Never
terminationGracePeriodSeconds: 0
tolerations:
- operator: Exists
volumes:
- name: data-dir
hostPath:
path: /var/csi/rawfile
type: DirectoryOrCreate
nodeSelector: {nodeSelector}
containers:
- name: task
# FIXME: use immutable tag
image: registry.hamdocker.ir/hamravesh/rawfile-csi:master
# FIXME: change to IfNotPresent
imagePullPolicy: Always
volumeMounts:
- name: data-dir
mountPath: /data
resources:
requests: &rsc
cpu: 100m
memory: 100Mi
limit: *rsc
command:
- /bin/sh
- -c
args:
- {cmd}

28
util.py
View File

@ -1,4 +1,7 @@
import base64
import functools
import inspect
import pickle
import subprocess
@ -24,3 +27,28 @@ def log_grpc_request(func):
def run(cmd):
return subprocess.run(cmd, shell=True, check=True)
class remote_fn(object):
def __init__(self, fn):
self.fn = fn
def as_cmd(self, *args, **kwargs):
call_data = [inspect.getsource(self.fn).encode(), args, kwargs]
call_data_serialized = base64.b64encode(pickle.dumps(call_data))
run_cmd = f"""
python <<EOF
import base64
import pickle
remote_fn = lambda fn: fn # FIXME: dirty hack
call_data = pickle.loads(base64.b64decode({call_data_serialized}))
exec(call_data[0])
{self.fn.__name__}(*call_data[1], **call_data[2])
EOF
"""
return run_cmd
def __call__(self, *args, **kwargs):
raise Exception("Should only be run inside pod")