package kubernetes

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"net/http"
	"strings"
	"time"

	"github.com/dexidp/dex/pkg/log"
	"github.com/dexidp/dex/storage"
	"github.com/dexidp/dex/storage/kubernetes/k8sapi"
)

const (
	kindAuthCode        = "AuthCode"
	kindAuthRequest     = "AuthRequest"
	kindClient          = "OAuth2Client"
	kindRefreshToken    = "RefreshToken"
	kindKeys            = "SigningKey"
	kindPassword        = "Password"
	kindOfflineSessions = "OfflineSessions"
	kindConnector       = "Connector"
	kindDeviceRequest   = "DeviceRequest"
	kindDeviceToken     = "DeviceToken"
)

const (
	resourceAuthCode        = "authcodes"
	resourceAuthRequest     = "authrequests"
	resourceClient          = "oauth2clients"
	resourceRefreshToken    = "refreshtokens"
	resourceKeys            = "signingkeies" // Kubernetes attempts to pluralize.
	resourcePassword        = "passwords"
	resourceOfflineSessions = "offlinesessionses" // Again attempts to pluralize.
	resourceConnector       = "connectors"
	resourceDeviceRequest   = "devicerequests"
	resourceDeviceToken     = "devicetokens"
)

// Config values for the Kubernetes storage type.
type Config struct {
	InCluster      bool   `json:"inCluster"`
	KubeConfigFile string `json:"kubeConfigFile"`
}

// Open returns a storage using Kubernetes third party resource.
func (c *Config) Open(logger log.Logger) (storage.Storage, error) {
	cli, err := c.open(logger, false)
	if err != nil {
		return nil, err
	}
	return cli, nil
}

// open returns a kubernetes client, initializing the third party resources used
// by dex.
//
// waitForResources controls if errors creating the resources cause this method to return
// immediately (used during testing), or if the client will asynchronously retry.
func (c *Config) open(logger log.Logger, waitForResources bool) (*client, error) {
	if c.InCluster && (c.KubeConfigFile != "") {
		return nil, errors.New("cannot specify both 'inCluster' and 'kubeConfigFile'")
	}
	if !c.InCluster && (c.KubeConfigFile == "") {
		return nil, errors.New("must specify either 'inCluster' or 'kubeConfigFile'")
	}

	var (
		cluster   k8sapi.Cluster
		user      k8sapi.AuthInfo
		namespace string
		err       error
	)
	if c.InCluster {
		cluster, user, namespace, err = inClusterConfig()
	} else {
		cluster, user, namespace, err = loadKubeConfig(c.KubeConfigFile)
	}
	if err != nil {
		return nil, err
	}

	cli, err := newClient(cluster, user, namespace, logger, c.InCluster)
	if err != nil {
		return nil, fmt.Errorf("create client: %v", err)
	}

	if err = cli.detectKubernetesVersion(); err != nil {
		return nil, fmt.Errorf("cannot get kubernetes version: %v", err)
	}

	ctx, cancel := context.WithCancel(context.Background())

	logger.Info("creating custom Kubernetes resources")
	if !cli.registerCustomResources() {
		if waitForResources {
			cancel()
			return nil, fmt.Errorf("failed creating custom resources")
		}

		// Try to synchronously create the custom resources once. This doesn't mean
		// they'll immediately be available, but ensures that the client will actually try
		// once.
		go func() {
			for {
				if cli.registerCustomResources() {
					return
				}

				select {
				case <-ctx.Done():
					return
				case <-time.After(30 * time.Second):
				}
			}
		}()
	}

	if waitForResources {
		if err := cli.waitForCRDs(ctx); err != nil {
			cancel()
			return nil, err
		}
	}

	// If the client is closed, stop trying to create resources.
	cli.cancel = cancel
	return cli, nil
}

// registerCustomResources attempts to create the custom resources dex
// requires or identifies that they're already enabled. This function creates
// custom resource definitions(CRDs)
// It logs all errors, returning true if the resources were created successfully.
//
// Creating a custom resource does not mean that they'll be immediately available.
func (cli *client) registerCustomResources() (ok bool) {
	ok = true

	definitions := customResourceDefinitions(cli.crdAPIVersion)
	length := len(definitions)

	for i := 0; i < length; i++ {
		var err error
		var resourceName string

		r := definitions[i]
		var i interface{}
		cli.logger.Infof("checking if custom resource %s has already been created...", r.ObjectMeta.Name)
		if err := cli.list(r.Spec.Names.Plural, &i); err == nil {
			cli.logger.Infof("The custom resource %s already available, skipping create", r.ObjectMeta.Name)
			continue
		} else {
			cli.logger.Infof("failed to list custom resource %s, attempting to create: %v", r.ObjectMeta.Name, err)
		}

		err = cli.postResource(cli.crdAPIVersion, "", "customresourcedefinitions", r)
		resourceName = r.ObjectMeta.Name

		if err != nil {
			switch err {
			case storage.ErrAlreadyExists:
				cli.logger.Infof("custom resource already created %s", resourceName)
			case storage.ErrNotFound:
				cli.logger.Errorf("custom resources not found, please enable the respective API group")
				ok = false
			default:
				cli.logger.Errorf("creating custom resource %s: %v", resourceName, err)
				ok = false
			}
			continue
		}
		cli.logger.Errorf("create custom resource %s", resourceName)
	}
	return ok
}

// waitForCRDs waits for all CRDs to be in a ready state, and is used
// by the tests to synchronize before running conformance.
func (cli *client) waitForCRDs(ctx context.Context) error {
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	for _, crd := range customResourceDefinitions(cli.crdAPIVersion) {
		for {
			err := cli.isCRDReady(crd.Name)
			if err == nil {
				break
			}

			cli.logger.Errorf("checking CRD: %v", err)

			select {
			case <-ctx.Done():
				return errors.New("timed out waiting for CRDs to be available")
			case <-time.After(time.Millisecond * 100):
			}
		}
	}
	return nil
}

// isCRDReady determines if a CRD is ready by inspecting its conditions.
func (cli *client) isCRDReady(name string) error {
	var r k8sapi.CustomResourceDefinition
	err := cli.getResource(cli.crdAPIVersion, "", "customresourcedefinitions", name, &r)
	if err != nil {
		return fmt.Errorf("get crd %s: %v", name, err)
	}

	conds := make(map[string]string) // For debugging, keep the conditions around.
	for _, c := range r.Status.Conditions {
		if c.Type == k8sapi.Established && c.Status == k8sapi.ConditionTrue {
			return nil
		}
		conds[string(c.Type)] = string(c.Status)
	}
	return fmt.Errorf("crd %s not ready %#v", name, conds)
}

func (cli *client) Close() error {
	if cli.cancel != nil {
		cli.cancel()
	}
	return nil
}

func (cli *client) CreateAuthRequest(a storage.AuthRequest) error {
	return cli.post(resourceAuthRequest, cli.fromStorageAuthRequest(a))
}

func (cli *client) CreateClient(c storage.Client) error {
	return cli.post(resourceClient, cli.fromStorageClient(c))
}

func (cli *client) CreateAuthCode(c storage.AuthCode) error {
	return cli.post(resourceAuthCode, cli.fromStorageAuthCode(c))
}

func (cli *client) CreatePassword(p storage.Password) error {
	return cli.post(resourcePassword, cli.fromStoragePassword(p))
}

func (cli *client) CreateRefresh(r storage.RefreshToken) error {
	return cli.post(resourceRefreshToken, cli.fromStorageRefreshToken(r))
}

func (cli *client) CreateOfflineSessions(o storage.OfflineSessions) error {
	return cli.post(resourceOfflineSessions, cli.fromStorageOfflineSessions(o))
}

func (cli *client) CreateConnector(c storage.Connector) error {
	return cli.post(resourceConnector, cli.fromStorageConnector(c))
}

func (cli *client) GetAuthRequest(id string) (storage.AuthRequest, error) {
	var req AuthRequest
	if err := cli.get(resourceAuthRequest, id, &req); err != nil {
		return storage.AuthRequest{}, err
	}
	return toStorageAuthRequest(req), nil
}

func (cli *client) GetAuthCode(id string) (storage.AuthCode, error) {
	var code AuthCode
	if err := cli.get(resourceAuthCode, id, &code); err != nil {
		return storage.AuthCode{}, err
	}
	return toStorageAuthCode(code), nil
}

func (cli *client) GetClient(id string) (storage.Client, error) {
	c, err := cli.getClient(id)
	if err != nil {
		return storage.Client{}, err
	}
	return toStorageClient(c), nil
}

func (cli *client) getClient(id string) (Client, error) {
	var c Client
	name := cli.idToName(id)
	if err := cli.get(resourceClient, name, &c); err != nil {
		return Client{}, err
	}
	if c.ID != id {
		return Client{}, fmt.Errorf("get client: ID %q mapped to client with ID %q", id, c.ID)
	}
	return c, nil
}

func (cli *client) GetPassword(email string) (storage.Password, error) {
	p, err := cli.getPassword(email)
	if err != nil {
		return storage.Password{}, err
	}
	return toStoragePassword(p), nil
}

func (cli *client) getPassword(email string) (Password, error) {
	// TODO(ericchiang): Figure out whose job it is to lowercase emails.
	email = strings.ToLower(email)
	var p Password
	name := cli.idToName(email)
	if err := cli.get(resourcePassword, name, &p); err != nil {
		return Password{}, err
	}
	if email != p.Email {
		return Password{}, fmt.Errorf("get email: email %q mapped to password with email %q", email, p.Email)
	}
	return p, nil
}

func (cli *client) GetKeys() (storage.Keys, error) {
	var keys Keys
	if err := cli.get(resourceKeys, keysName, &keys); err != nil {
		return storage.Keys{}, err
	}
	return toStorageKeys(keys), nil
}

func (cli *client) GetRefresh(id string) (storage.RefreshToken, error) {
	r, err := cli.getRefreshToken(id)
	if err != nil {
		return storage.RefreshToken{}, err
	}
	return toStorageRefreshToken(r), nil
}

func (cli *client) getRefreshToken(id string) (r RefreshToken, err error) {
	err = cli.get(resourceRefreshToken, id, &r)
	return
}

func (cli *client) GetOfflineSessions(userID string, connID string) (storage.OfflineSessions, error) {
	o, err := cli.getOfflineSessions(userID, connID)
	if err != nil {
		return storage.OfflineSessions{}, err
	}
	return toStorageOfflineSessions(o), nil
}

func (cli *client) getOfflineSessions(userID string, connID string) (o OfflineSessions, err error) {
	name := cli.offlineTokenName(userID, connID)
	if err = cli.get(resourceOfflineSessions, name, &o); err != nil {
		return OfflineSessions{}, err
	}
	if userID != o.UserID || connID != o.ConnID {
		return OfflineSessions{}, fmt.Errorf("get offline session: wrong session retrieved")
	}
	return o, nil
}

func (cli *client) GetConnector(id string) (storage.Connector, error) {
	var c Connector
	if err := cli.get(resourceConnector, id, &c); err != nil {
		return storage.Connector{}, err
	}
	return toStorageConnector(c), nil
}

func (cli *client) ListClients() ([]storage.Client, error) {
	return nil, errors.New("not implemented")
}

func (cli *client) ListRefreshTokens() ([]storage.RefreshToken, error) {
	return nil, errors.New("not implemented")
}

func (cli *client) ListPasswords() (passwords []storage.Password, err error) {
	var passwordList PasswordList
	if err = cli.list(resourcePassword, &passwordList); err != nil {
		return passwords, fmt.Errorf("failed to list passwords: %v", err)
	}

	for _, password := range passwordList.Passwords {
		p := storage.Password{
			Email:    password.Email,
			Hash:     password.Hash,
			Username: password.Username,
			UserID:   password.UserID,
		}
		passwords = append(passwords, p)
	}

	return
}

func (cli *client) ListConnectors() (connectors []storage.Connector, err error) {
	var connectorList ConnectorList
	if err = cli.list(resourceConnector, &connectorList); err != nil {
		return connectors, fmt.Errorf("failed to list connectors: %v", err)
	}

	connectors = make([]storage.Connector, len(connectorList.Connectors))
	for i, connector := range connectorList.Connectors {
		connectors[i] = toStorageConnector(connector)
	}

	return
}

func (cli *client) DeleteAuthRequest(id string) error {
	return cli.delete(resourceAuthRequest, id)
}

func (cli *client) DeleteAuthCode(code string) error {
	return cli.delete(resourceAuthCode, code)
}

func (cli *client) DeleteClient(id string) error {
	// Check for hash collision.
	c, err := cli.getClient(id)
	if err != nil {
		return err
	}
	return cli.delete(resourceClient, c.ObjectMeta.Name)
}

func (cli *client) DeleteRefresh(id string) error {
	return cli.delete(resourceRefreshToken, id)
}

func (cli *client) DeletePassword(email string) error {
	// Check for hash collision.
	p, err := cli.getPassword(email)
	if err != nil {
		return err
	}
	return cli.delete(resourcePassword, p.ObjectMeta.Name)
}

func (cli *client) DeleteOfflineSessions(userID string, connID string) error {
	// Check for hash collision.
	o, err := cli.getOfflineSessions(userID, connID)
	if err != nil {
		return err
	}
	return cli.delete(resourceOfflineSessions, o.ObjectMeta.Name)
}

func (cli *client) DeleteConnector(id string) error {
	return cli.delete(resourceConnector, id)
}

func (cli *client) UpdateRefreshToken(id string, updater func(old storage.RefreshToken) (storage.RefreshToken, error)) error {
	return retryOnConflict(context.TODO(), func() error {
		r, err := cli.getRefreshToken(id)
		if err != nil {
			return err
		}
		updated, err := updater(toStorageRefreshToken(r))
		if err != nil {
			return err
		}
		updated.ID = id

		newToken := cli.fromStorageRefreshToken(updated)
		newToken.ObjectMeta = r.ObjectMeta
		return cli.put(resourceRefreshToken, r.ObjectMeta.Name, newToken)
	})
}

func (cli *client) UpdateClient(id string, updater func(old storage.Client) (storage.Client, error)) error {
	c, err := cli.getClient(id)
	if err != nil {
		return err
	}

	updated, err := updater(toStorageClient(c))
	if err != nil {
		return err
	}
	updated.ID = c.ID

	newClient := cli.fromStorageClient(updated)
	newClient.ObjectMeta = c.ObjectMeta
	return cli.put(resourceClient, c.ObjectMeta.Name, newClient)
}

func (cli *client) UpdatePassword(email string, updater func(old storage.Password) (storage.Password, error)) error {
	p, err := cli.getPassword(email)
	if err != nil {
		return err
	}

	updated, err := updater(toStoragePassword(p))
	if err != nil {
		return err
	}
	updated.Email = p.Email

	newPassword := cli.fromStoragePassword(updated)
	newPassword.ObjectMeta = p.ObjectMeta
	return cli.put(resourcePassword, p.ObjectMeta.Name, newPassword)
}

func (cli *client) UpdateOfflineSessions(userID string, connID string, updater func(old storage.OfflineSessions) (storage.OfflineSessions, error)) error {
	return retryOnConflict(context.TODO(), func() error {
		o, err := cli.getOfflineSessions(userID, connID)
		if err != nil {
			return err
		}

		updated, err := updater(toStorageOfflineSessions(o))
		if err != nil {
			return err
		}

		newOfflineSessions := cli.fromStorageOfflineSessions(updated)
		newOfflineSessions.ObjectMeta = o.ObjectMeta
		return cli.put(resourceOfflineSessions, o.ObjectMeta.Name, newOfflineSessions)
	})
}

func (cli *client) UpdateKeys(updater func(old storage.Keys) (storage.Keys, error)) error {
	firstUpdate := false
	var keys Keys
	if err := cli.get(resourceKeys, keysName, &keys); err != nil {
		if err != storage.ErrNotFound {
			return err
		}
		firstUpdate = true
	}

	var oldKeys storage.Keys
	if !firstUpdate {
		oldKeys = toStorageKeys(keys)
	}

	updated, err := updater(oldKeys)
	if err != nil {
		return err
	}

	newKeys := cli.fromStorageKeys(updated)
	if firstUpdate {
		err = cli.post(resourceKeys, newKeys)
		if err != nil && errors.Is(err, storage.ErrAlreadyExists) {
			// We need to tolerate conflicts here in case of HA mode.
			cli.logger.Debugf("Keys creation failed: %v. It is possible that keys have already been created by another dex instance.", err)
			return errors.New("keys already created by another server instance")
		}

		return err
	}

	newKeys.ObjectMeta = keys.ObjectMeta

	err = cli.put(resourceKeys, keysName, newKeys)
	if isKubernetesAPIConflictError(err) {
		// We need to tolerate conflicts here in case of HA mode.
		// Dex instances run keys rotation at the same time because they use SigningKey.nextRotation CR field as a trigger.
		cli.logger.Debugf("Keys rotation failed: %v. It is possible that keys have already been rotated by another dex instance.", err)
		return errors.New("keys already rotated by another server instance")
	}

	return err
}

func (cli *client) UpdateAuthRequest(id string, updater func(a storage.AuthRequest) (storage.AuthRequest, error)) error {
	var req AuthRequest
	err := cli.get(resourceAuthRequest, id, &req)
	if err != nil {
		return err
	}

	updated, err := updater(toStorageAuthRequest(req))
	if err != nil {
		return err
	}

	newReq := cli.fromStorageAuthRequest(updated)
	newReq.ObjectMeta = req.ObjectMeta
	return cli.put(resourceAuthRequest, id, newReq)
}

func (cli *client) UpdateConnector(id string, updater func(a storage.Connector) (storage.Connector, error)) error {
	return retryOnConflict(context.TODO(), func() error {
		var c Connector
		err := cli.get(resourceConnector, id, &c)
		if err != nil {
			return err
		}

		updated, err := updater(toStorageConnector(c))
		if err != nil {
			return err
		}

		newConn := cli.fromStorageConnector(updated)
		newConn.ObjectMeta = c.ObjectMeta
		return cli.put(resourceConnector, id, newConn)
	})
}

func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err error) {
	var authRequests AuthRequestList
	if err := cli.list(resourceAuthRequest, &authRequests); err != nil {
		return result, fmt.Errorf("failed to list auth requests: %v", err)
	}

	var delErr error
	for _, authRequest := range authRequests.AuthRequests {
		if now.After(authRequest.Expiry) {
			if err := cli.delete(resourceAuthRequest, authRequest.ObjectMeta.Name); err != nil {
				cli.logger.Errorf("failed to delete auth request: %v", err)
				delErr = fmt.Errorf("failed to delete auth request: %v", err)
			}
			result.AuthRequests++
		}
	}
	if delErr != nil {
		return result, delErr
	}

	var authCodes AuthCodeList
	if err := cli.list(resourceAuthCode, &authCodes); err != nil {
		return result, fmt.Errorf("failed to list auth codes: %v", err)
	}

	for _, authCode := range authCodes.AuthCodes {
		if now.After(authCode.Expiry) {
			if err := cli.delete(resourceAuthCode, authCode.ObjectMeta.Name); err != nil {
				cli.logger.Errorf("failed to delete auth code %v", err)
				delErr = fmt.Errorf("failed to delete auth code: %v", err)
			}
			result.AuthCodes++
		}
	}

	var deviceRequests DeviceRequestList
	if err := cli.list(resourceDeviceRequest, &deviceRequests); err != nil {
		return result, fmt.Errorf("failed to list device requests: %v", err)
	}

	for _, deviceRequest := range deviceRequests.DeviceRequests {
		if now.After(deviceRequest.Expiry) {
			if err := cli.delete(resourceDeviceRequest, deviceRequest.ObjectMeta.Name); err != nil {
				cli.logger.Errorf("failed to delete device request: %v", err)
				delErr = fmt.Errorf("failed to delete device request: %v", err)
			}
			result.DeviceRequests++
		}
	}

	var deviceTokens DeviceTokenList
	if err := cli.list(resourceDeviceToken, &deviceTokens); err != nil {
		return result, fmt.Errorf("failed to list device tokens: %v", err)
	}

	for _, deviceToken := range deviceTokens.DeviceTokens {
		if now.After(deviceToken.Expiry) {
			if err := cli.delete(resourceDeviceToken, deviceToken.ObjectMeta.Name); err != nil {
				cli.logger.Errorf("failed to delete device token: %v", err)
				delErr = fmt.Errorf("failed to delete device token: %v", err)
			}
			result.DeviceTokens++
		}
	}

	if delErr != nil {
		return result, delErr
	}
	return result, delErr
}

func (cli *client) CreateDeviceRequest(d storage.DeviceRequest) error {
	return cli.post(resourceDeviceRequest, cli.fromStorageDeviceRequest(d))
}

func (cli *client) GetDeviceRequest(userCode string) (storage.DeviceRequest, error) {
	var req DeviceRequest
	if err := cli.get(resourceDeviceRequest, strings.ToLower(userCode), &req); err != nil {
		return storage.DeviceRequest{}, err
	}
	return toStorageDeviceRequest(req), nil
}

func (cli *client) CreateDeviceToken(t storage.DeviceToken) error {
	return cli.post(resourceDeviceToken, cli.fromStorageDeviceToken(t))
}

func (cli *client) GetDeviceToken(deviceCode string) (storage.DeviceToken, error) {
	var token DeviceToken
	if err := cli.get(resourceDeviceToken, deviceCode, &token); err != nil {
		return storage.DeviceToken{}, err
	}
	return toStorageDeviceToken(token), nil
}

func (cli *client) getDeviceToken(deviceCode string) (t DeviceToken, err error) {
	err = cli.get(resourceDeviceToken, deviceCode, &t)
	return
}

func (cli *client) UpdateDeviceToken(deviceCode string, updater func(old storage.DeviceToken) (storage.DeviceToken, error)) error {
	return retryOnConflict(context.TODO(), func() error {
		r, err := cli.getDeviceToken(deviceCode)
		if err != nil {
			return err
		}
		updated, err := updater(toStorageDeviceToken(r))
		if err != nil {
			return err
		}
		updated.DeviceCode = deviceCode

		newToken := cli.fromStorageDeviceToken(updated)
		newToken.ObjectMeta = r.ObjectMeta
		return cli.put(resourceDeviceToken, r.ObjectMeta.Name, newToken)
	})
}

func isKubernetesAPIConflictError(err error) bool {
	if httpErr, ok := err.(httpError); ok {
		if httpErr.StatusCode() == http.StatusConflict {
			return true
		}
	}
	return false
}

func retryOnConflict(ctx context.Context, action func() error) error {
	policy := []int{10, 20, 100, 300, 600}

	attempts := 0
	getNextStep := func() time.Duration {
		step := policy[attempts]
		return time.Duration(step*5+rand.Intn(step)) * time.Microsecond
	}

	if err := action(); err == nil || !isKubernetesAPIConflictError(err) {
		return err
	}

	for {
		select {
		case <-time.After(getNextStep()):
			err := action()
			if err == nil || !isKubernetesAPIConflictError(err) {
				return err
			}

			attempts++
			if attempts >= 4 {
				return fmt.Errorf("maximum timeout reached while retrying a conflicted request: %w", err)
			}
		case <-ctx.Done():
			return errors.New("canceled")
		}
	}
}