Enable "Storage Capacity Tracking"

This commit is contained in:
Mehran Kholdi 2021-10-02 16:15:17 +03:30
parent 45d1ab1aa3
commit 110dee7d3d
10 changed files with 70 additions and 29 deletions

View File

@ -21,4 +21,7 @@ DriverInfo:
nodeExpansion: true nodeExpansion: true
onlineExpansion: true onlineExpansion: true
volumeLimits: false volumeLimits: false
singleNodeVolume: true
topology: true
capacity: true
InlineVolumes: [] InlineVolumes: []

View File

@ -2,8 +2,8 @@
set -ex set -ex
source .ci/common source .ci/common
K8S_VERSION=1.19.12 K8S_VERSION=1.21.5
MINIKUBE_VERSION=1.19.0 MINIKUBE_VERSION=1.21.0
curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v${K8S_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl && sudo mv kubectl /usr/local/bin/ curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v${K8S_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl && sudo mv kubectl /usr/local/bin/
sudo apt update && sudo apt install -y conntrack sudo apt update && sudo apt install -y conntrack
curl -Lo minikube https://storage.googleapis.com/minikube/releases/v${MINIKUBE_VERSION}/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/ curl -Lo minikube https://storage.googleapis.com/minikube/releases/v${MINIKUBE_VERSION}/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/

View File

@ -7,7 +7,7 @@ Kubernetes LocalPVs on Steroids
Prerequisite Prerequisite
--- ---
- Kubernetes: 1.19+ - Kubernetes: 1.21+
Install Install
--- ---

View File

@ -211,6 +211,10 @@ class Bd2FsControllerServicer(csi_pb2_grpc.ControllerServicer):
def DeleteVolume(self, request, context): def DeleteVolume(self, request, context):
return self.bds.DeleteVolume(request, context) return self.bds.DeleteVolume(request, context)
@log_grpc_request
def GetCapacity(self, request, context):
return self.bds.GetCapacity(request, context)
@log_grpc_request @log_grpc_request
def ControllerExpandVolume(self, request, context): def ControllerExpandVolume(self, request, context):
response = self.bds.ControllerExpandVolume(request, context) response = self.bds.ControllerExpandVolume(request, context)

View File

@ -6,5 +6,6 @@ spec:
attachRequired: false attachRequired: false
podInfoOnMount: true podInfoOnMount: true
fsGroupPolicy: File fsGroupPolicy: File
storageCapacity: true
volumeLifecycleModes: volumeLifecycleModes:
- Persistent - Persistent

View File

@ -40,6 +40,15 @@ rules:
- apiGroups: ["storage.k8s.io"] - apiGroups: ["storage.k8s.io"]
resources: ["volumeattachments"] resources: ["volumeattachments"]
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch"]
- apiGroups: ["storage.k8s.io"]
resources: ["csistoragecapacities"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["get"]
--- ---
kind: ClusterRoleBinding kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1

View File

@ -63,20 +63,6 @@ spec:
containerPort: 9808 containerPort: 9808
resources: resources:
{{- toYaml .Values.controller.resources | nindent 12 }} {{- toYaml .Values.controller.resources | nindent 12 }}
- name: external-provisioner
image: k8s.gcr.io/sig-storage/csi-provisioner:v2.2.2
imagePullPolicy: IfNotPresent
args:
- "--csi-address=$(ADDRESS)"
- "--feature-gates=Topology=true"
- "--strict-topology"
- "--timeout=120s"
env:
- name: ADDRESS
value: /csi/csi.sock
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: external-resizer - name: external-resizer
image: k8s.gcr.io/sig-storage/csi-resizer:v1.2.0 image: k8s.gcr.io/sig-storage/csi-resizer:v1.2.0
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent

View File

@ -123,3 +123,33 @@ spec:
requests: requests:
cpu: 10m cpu: 10m
memory: 100Mi memory: 100Mi
- name: external-provisioner
image: k8s.gcr.io/sig-storage/csi-provisioner:v2.2.2
imagePullPolicy: IfNotPresent
args:
- "--csi-address=$(ADDRESS)"
- "--feature-gates=Topology=true"
- "--strict-topology"
- "--immediate-topology=false"
- "--timeout=120s"
- "--enable-capacity=true"
- "--capacity-ownerref-level=1" # DaemonSet
- "--node-deployment=true"
env:
- name: ADDRESS
value: /csi/csi.sock
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- name: socket-dir
mountPath: /csi

View File

@ -11,7 +11,7 @@ from declarative import be_symlink, be_absent
from fs_util import device_stats, mountpoint_to_dev from fs_util import device_stats, mountpoint_to_dev
from orchestrator.k8s import volume_to_node, run_on_node from orchestrator.k8s import volume_to_node, run_on_node
from rawfile_util import attach_loop, detach_loops from rawfile_util import attach_loop, detach_loops
from remote import init_rawfile, scrub, expand_rawfile from remote import init_rawfile, scrub, get_capacity, expand_rawfile
from util import log_grpc_request, run from util import log_grpc_request, run
NODE_NAME_TOPOLOGY_KEY = "hostname" NODE_NAME_TOPOLOGY_KEY = "hostname"
@ -134,6 +134,7 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer):
return csi_pb2.ControllerGetCapabilitiesResponse( return csi_pb2.ControllerGetCapabilitiesResponse(
capabilities=[ capabilities=[
Cap(rpc=Cap.RPC(type=Cap.RPC.CREATE_DELETE_VOLUME)), Cap(rpc=Cap.RPC(type=Cap.RPC.CREATE_DELETE_VOLUME)),
Cap(rpc=Cap.RPC(type=Cap.RPC.GET_CAPACITY)),
Cap(rpc=Cap.RPC(type=Cap.RPC.EXPAND_VOLUME)), Cap(rpc=Cap.RPC(type=Cap.RPC.EXPAND_VOLUME)),
] ]
) )
@ -186,10 +187,7 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer):
) )
try: try:
run_on_node( init_rawfile(volume_id=request.name, size=size),
init_rawfile.as_cmd(volume_id=request.name, size=size),
node=node_name,
)
except CalledProcessError as exc: except CalledProcessError as exc:
if exc.returncode == RESOURCE_EXHAUSTED_EXIT_CODE: if exc.returncode == RESOURCE_EXHAUSTED_EXIT_CODE:
context.abort( context.abort(
@ -210,10 +208,15 @@ class RawFileControllerServicer(csi_pb2_grpc.ControllerServicer):
@log_grpc_request @log_grpc_request
def DeleteVolume(self, request, context): def DeleteVolume(self, request, context):
node_name = volume_to_node(request.volume_id) scrub(volume_id=request.volume_id)
run_on_node(scrub.as_cmd(volume_id=request.volume_id), node=node_name)
return csi_pb2.DeleteVolumeResponse() return csi_pb2.DeleteVolumeResponse()
@log_grpc_request
def GetCapacity(self, request, context):
return csi_pb2.GetCapacityResponse(
available_capacity=get_capacity(),
)
@log_grpc_request @log_grpc_request
def ControllerExpandVolume(self, request, context): def ControllerExpandVolume(self, request, context):
volume_id = request.volume_id volume_id = request.volume_id

View File

@ -1,7 +1,6 @@
from util import remote_fn from util import remote_fn
@remote_fn
def scrub(volume_id): def scrub(volume_id):
import time import time
import rawfile_util import rawfile_util
@ -17,18 +16,18 @@ def scrub(volume_id):
rawfile_util.gc_if_needed(volume_id, dry_run=False) rawfile_util.gc_if_needed(volume_id, dry_run=False)
@remote_fn
def init_rawfile(volume_id, size): def init_rawfile(volume_id, size):
import time import time
import rawfile_util from subprocess import CalledProcessError
from volume_schema import LATEST_SCHEMA_VERSION
from pathlib import Path from pathlib import Path
import rawfile_util
from volume_schema import LATEST_SCHEMA_VERSION
from util import run from util import run
from consts import RESOURCE_EXHAUSTED_EXIT_CODE from consts import RESOURCE_EXHAUSTED_EXIT_CODE
if rawfile_util.get_capacity() < size: if rawfile_util.get_capacity() < size:
exit(RESOURCE_EXHAUSTED_EXIT_CODE) raise CalledProcessError(returncode=RESOURCE_EXHAUSTED_EXIT_CODE, cmd="")
img_dir = rawfile_util.img_dir(volume_id) img_dir = rawfile_util.img_dir(volume_id)
img_dir.mkdir(exist_ok=True) img_dir.mkdir(exist_ok=True)
@ -48,6 +47,12 @@ def init_rawfile(volume_id, size):
run(f"truncate -s {size} {img_file}") run(f"truncate -s {size} {img_file}")
def get_capacity():
import rawfile_util
return rawfile_util.get_capacity()
@remote_fn @remote_fn
def expand_rawfile(volume_id, size): def expand_rawfile(volume_id, size):
import rawfile_util import rawfile_util