Limit the amount of objects we attempt to GC on each cycle
If something causes the number k8s resources to increase beyond a certain threshold, garbage collection can fail because the query to retrieve those resources will time out, resulting in a perpetual cycle of being unable to garbage collect resources. In lieu of trying to get *every* object each cycle, we can limit the number of resources retrieved per GC cycle to some reasonable number. Signed-off-by: Michael Kelly <mkelly@arista.com>
This commit is contained in:
		| @@ -15,6 +15,7 @@ import ( | |||||||
| 	"io" | 	"io" | ||||||
| 	"net" | 	"net" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"net/url" | ||||||
| 	"os" | 	"os" | ||||||
| 	"path" | 	"path" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| @@ -82,7 +83,8 @@ func offlineTokenName(userID string, connID string, h func() hash.Hash) string { | |||||||
| 	return strings.TrimRight(encoding.EncodeToString(hash.Sum(nil)), "=") | 	return strings.TrimRight(encoding.EncodeToString(hash.Sum(nil)), "=") | ||||||
| } | } | ||||||
|  |  | ||||||
| func (cli *client) urlFor(apiVersion, namespace, resource, name string) string { | func (cli *client) urlForWithParams( | ||||||
|  | 	apiVersion, namespace, resource, name string, params url.Values) string { | ||||||
| 	basePath := "apis/" | 	basePath := "apis/" | ||||||
| 	if apiVersion == "v1" { | 	if apiVersion == "v1" { | ||||||
| 		basePath = "api/" | 		basePath = "api/" | ||||||
| @@ -97,7 +99,19 @@ func (cli *client) urlFor(apiVersion, namespace, resource, name string) string { | |||||||
| 	if strings.HasSuffix(cli.baseURL, "/") { | 	if strings.HasSuffix(cli.baseURL, "/") { | ||||||
| 		return cli.baseURL + p | 		return cli.baseURL + p | ||||||
| 	} | 	} | ||||||
| 	return cli.baseURL + "/" + p |  | ||||||
|  | 	r := cli.baseURL + "/" + p | ||||||
|  |  | ||||||
|  | 	encodedParams := params.Encode() | ||||||
|  | 	if len(encodedParams) > 0 { | ||||||
|  | 		return r + "?" + encodedParams | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return r | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (cli *client) urlFor(apiVersion, namespace, resource, name string) string { | ||||||
|  | 	return cli.urlForWithParams(apiVersion, namespace, resource, name, url.Values{}) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Define an error interface so we can get at the underlying status code if it's | // Define an error interface so we can get at the underlying status code if it's | ||||||
| @@ -163,8 +177,7 @@ func (cli *client) get(resource, name string, v interface{}) error { | |||||||
| 	return cli.getResource(cli.apiVersion, cli.namespace, resource, name, v) | 	return cli.getResource(cli.apiVersion, cli.namespace, resource, name, v) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (cli *client) getResource(apiVersion, namespace, resource, name string, v interface{}) error { | func (cli *client) getUrl(url string, v interface{}) error { | ||||||
| 	url := cli.urlFor(apiVersion, namespace, resource, name) |  | ||||||
| 	resp, err := cli.client.Get(url) | 	resp, err := cli.client.Get(url) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| @@ -176,8 +189,19 @@ func (cli *client) getResource(apiVersion, namespace, resource, name string, v i | |||||||
| 	return json.NewDecoder(resp.Body).Decode(v) | 	return json.NewDecoder(resp.Body).Decode(v) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (cli *client) getResource(apiVersion, namespace, resource, name string, v interface{}) error { | ||||||
|  | 	return cli.getUrl(cli.urlFor(apiVersion, namespace, resource, name), v) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (cli *client) listN(resource string, v interface{}, n int) error { | ||||||
|  | 	params := url.Values{} | ||||||
|  | 	params.Add("limit", fmt.Sprintf("%d", n)) | ||||||
|  | 	u := cli.urlForWithParams(cli.apiVersion, cli.namespace, resource, "", params) | ||||||
|  | 	return cli.getUrl(u, v) | ||||||
|  | } | ||||||
|  |  | ||||||
| func (cli *client) list(resource string, v interface{}) error { | func (cli *client) list(resource string, v interface{}) error { | ||||||
| 	return cli.get(resource, "", v) | 	return cli.listN(resource, v, -1) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (cli *client) post(resource string, v interface{}) error { | func (cli *client) post(resource string, v interface{}) error { | ||||||
|   | |||||||
| @@ -40,6 +40,10 @@ const ( | |||||||
| 	resourceDeviceToken     = "devicetokens" | 	resourceDeviceToken     = "devicetokens" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	gcResultLimit = 10000 | ||||||
|  | ) | ||||||
|  |  | ||||||
| // Config values for the Kubernetes storage type. | // Config values for the Kubernetes storage type. | ||||||
| type Config struct { | type Config struct { | ||||||
| 	InCluster      bool   `json:"inCluster"` | 	InCluster      bool   `json:"inCluster"` | ||||||
| @@ -599,7 +603,7 @@ func (cli *client) UpdateConnector(id string, updater func(a storage.Connector) | |||||||
|  |  | ||||||
| func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err error) { | func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err error) { | ||||||
| 	var authRequests AuthRequestList | 	var authRequests AuthRequestList | ||||||
| 	if err := cli.list(resourceAuthRequest, &authRequests); err != nil { | 	if err := cli.listN(resourceAuthRequest, &authRequests, gcResultLimit); err != nil { | ||||||
| 		return result, fmt.Errorf("failed to list auth requests: %v", err) | 		return result, fmt.Errorf("failed to list auth requests: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -618,7 +622,7 @@ func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err e | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var authCodes AuthCodeList | 	var authCodes AuthCodeList | ||||||
| 	if err := cli.list(resourceAuthCode, &authCodes); err != nil { | 	if err := cli.listN(resourceAuthCode, &authCodes, gcResultLimit); err != nil { | ||||||
| 		return result, fmt.Errorf("failed to list auth codes: %v", err) | 		return result, fmt.Errorf("failed to list auth codes: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -633,7 +637,7 @@ func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err e | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var deviceRequests DeviceRequestList | 	var deviceRequests DeviceRequestList | ||||||
| 	if err := cli.list(resourceDeviceRequest, &deviceRequests); err != nil { | 	if err := cli.listN(resourceDeviceRequest, &deviceRequests, gcResultLimit); err != nil { | ||||||
| 		return result, fmt.Errorf("failed to list device requests: %v", err) | 		return result, fmt.Errorf("failed to list device requests: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -648,7 +652,7 @@ func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err e | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var deviceTokens DeviceTokenList | 	var deviceTokens DeviceTokenList | ||||||
| 	if err := cli.list(resourceDeviceToken, &deviceTokens); err != nil { | 	if err := cli.listN(resourceDeviceToken, &deviceTokens, gcResultLimit); err != nil { | ||||||
| 		return result, fmt.Errorf("failed to list device tokens: %v", err) | 		return result, fmt.Errorf("failed to list device tokens: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user