This repository has been archived on 2023-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
dex/storage/kubernetes/lock.go

125 lines
3.0 KiB
Go

package kubernetes
import (
"fmt"
"time"
)
const (
lockAnnotation = "dexidp.com/resource-lock"
lockTimeFormat = time.RFC3339
)
var (
lockTimeout = 10 * time.Second
lockCheckPeriod = 100 * time.Millisecond
)
// refreshTokenLock is an implementation of annotation-based optimistic locking.
//
// Refresh token contains data to refresh identity in external authentication system.
// There is a requirement that refresh should be called only once because of several reasons:
// * Some of OIDC providers could use the refresh token rotation feature which requires calling refresh only once.
// * Providers can limit the rate of requests to the token endpoint, which will lead to the error
// in case of many concurrent requests.
type refreshTokenLock struct {
cli *client
waitingState bool
}
func newRefreshTokenLock(cli *client) *refreshTokenLock {
return &refreshTokenLock{cli: cli}
}
func (l *refreshTokenLock) Lock(id string) error {
for i := 0; i <= 60; i++ {
ok, err := l.setLockAnnotation(id)
if err != nil {
return err
}
if !ok {
return nil
}
time.Sleep(lockCheckPeriod)
}
return fmt.Errorf("timeout waiting for refresh token %s lock", id)
}
func (l *refreshTokenLock) Unlock(id string) {
if l.waitingState {
// Do not need to unlock for waiting goroutines, because the have not set it.
return
}
r, err := l.cli.getRefreshToken(id)
if err != nil {
l.cli.logger.Debugf("failed to get resource to release lock for refresh token %s: %v", id, err)
return
}
r.Annotations = nil
err = l.cli.put(resourceRefreshToken, r.ObjectMeta.Name, r)
if err != nil {
l.cli.logger.Debugf("failed to release lock for refresh token %s: %v", id, err)
}
}
func (l *refreshTokenLock) setLockAnnotation(id string) (bool, error) {
r, err := l.cli.getRefreshToken(id)
if err != nil {
return false, err
}
currentTime := time.Now()
lockData := map[string]string{
lockAnnotation: currentTime.Add(lockTimeout).Format(lockTimeFormat),
}
val, ok := r.Annotations[lockAnnotation]
if !ok {
if l.waitingState {
return false, nil
}
r.Annotations = lockData
err := l.cli.put(resourceRefreshToken, r.ObjectMeta.Name, r)
if err == nil {
return false, nil
}
if isKubernetesAPIConflictError(err) {
l.waitingState = true
return true, nil
}
return false, err
}
until, err := time.Parse(lockTimeFormat, val)
if err != nil {
return false, fmt.Errorf("lock annotation value is malformed: %v", err)
}
if !currentTime.After(until) {
// waiting for the lock to be released
l.waitingState = true
return true, nil
}
// Lock time is out, lets break the lock and take the advantage
r.Annotations = lockData
err = l.cli.put(resourceRefreshToken, r.ObjectMeta.Name, r)
if err == nil {
// break lock annotation
return false, nil
}
l.cli.logger.Debugf("break lock annotation error: %v", err)
if isKubernetesAPIConflictError(err) {
l.waitingState = true
// after breaking error waiting for the lock to be released
return true, nil
}
return false, err
}