feat: Add MySQL ent-based storage driver
Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>
This commit is contained in:
parent
e472fe668e
commit
eae3219e4d
14
.github/workflows/ci.yaml
vendored
14
.github/workflows/ci.yaml
vendored
@ -35,6 +35,15 @@ jobs:
|
|||||||
- 3306
|
- 3306
|
||||||
options: --health-cmd "mysql -proot -e \"show databases;\"" --health-interval 10s --health-timeout 5s --health-retries 5
|
options: --health-cmd "mysql -proot -e \"show databases;\"" --health-interval 10s --health-timeout 5s --health-retries 5
|
||||||
|
|
||||||
|
mysql-ent:
|
||||||
|
image: mysql:5.7
|
||||||
|
env:
|
||||||
|
MYSQL_ROOT_PASSWORD: root
|
||||||
|
MYSQL_DATABASE: dex
|
||||||
|
ports:
|
||||||
|
- 3306
|
||||||
|
options: --health-cmd "mysql -proot -e \"show databases;\"" --health-interval 10s --health-timeout 5s --health-retries 5
|
||||||
|
|
||||||
etcd:
|
etcd:
|
||||||
image: gcr.io/etcd-development/etcd:v3.5.0
|
image: gcr.io/etcd-development/etcd:v3.5.0
|
||||||
ports:
|
ports:
|
||||||
@ -77,6 +86,11 @@ jobs:
|
|||||||
DEX_MYSQL_PASSWORD: root
|
DEX_MYSQL_PASSWORD: root
|
||||||
DEX_MYSQL_HOST: 127.0.0.1
|
DEX_MYSQL_HOST: 127.0.0.1
|
||||||
DEX_MYSQL_PORT: ${{ job.services.mysql.ports[3306] }}
|
DEX_MYSQL_PORT: ${{ job.services.mysql.ports[3306] }}
|
||||||
|
DEX_MYSQL_ENT_DATABASE: dex
|
||||||
|
DEX_MYSQL_ENT_USER: root
|
||||||
|
DEX_MYSQL_ENT_PASSWORD: root
|
||||||
|
DEX_MYSQL_ENT_HOST: 127.0.0.1
|
||||||
|
DEX_MYSQL_ENT_PORT: ${{ job.services.mysql-ent.ports[3306] }}
|
||||||
DEX_POSTGRES_DATABASE: postgres
|
DEX_POSTGRES_DATABASE: postgres
|
||||||
DEX_POSTGRES_USER: postgres
|
DEX_POSTGRES_USER: postgres
|
||||||
DEX_POSTGRES_PASSWORD: postgres
|
DEX_POSTGRES_PASSWORD: postgres
|
||||||
|
@ -183,6 +183,7 @@ var (
|
|||||||
_ StorageConfig = (*sql.MySQL)(nil)
|
_ StorageConfig = (*sql.MySQL)(nil)
|
||||||
_ StorageConfig = (*ent.SQLite3)(nil)
|
_ StorageConfig = (*ent.SQLite3)(nil)
|
||||||
_ StorageConfig = (*ent.Postgres)(nil)
|
_ StorageConfig = (*ent.Postgres)(nil)
|
||||||
|
_ StorageConfig = (*ent.MySQL)(nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
func getORMBasedSQLStorage(normal, entBased StorageConfig) func() StorageConfig {
|
func getORMBasedSQLStorage(normal, entBased StorageConfig) func() StorageConfig {
|
||||||
@ -200,9 +201,9 @@ var storages = map[string]func() StorageConfig{
|
|||||||
"etcd": func() StorageConfig { return new(etcd.Etcd) },
|
"etcd": func() StorageConfig { return new(etcd.Etcd) },
|
||||||
"kubernetes": func() StorageConfig { return new(kubernetes.Config) },
|
"kubernetes": func() StorageConfig { return new(kubernetes.Config) },
|
||||||
"memory": func() StorageConfig { return new(memory.Config) },
|
"memory": func() StorageConfig { return new(memory.Config) },
|
||||||
"mysql": func() StorageConfig { return new(sql.MySQL) },
|
|
||||||
"sqlite3": getORMBasedSQLStorage(&sql.SQLite3{}, &ent.SQLite3{}),
|
"sqlite3": getORMBasedSQLStorage(&sql.SQLite3{}, &ent.SQLite3{}),
|
||||||
"postgres": getORMBasedSQLStorage(&sql.Postgres{}, &ent.Postgres{}),
|
"postgres": getORMBasedSQLStorage(&sql.Postgres{}, &ent.Postgres{}),
|
||||||
|
"mysql": getORMBasedSQLStorage(&sql.MySQL{}, &ent.MySQL{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// isExpandEnvEnabled returns if os.ExpandEnv should be used for each storage and connector config.
|
// isExpandEnvEnabled returns if os.ExpandEnv should be used for each storage and connector config.
|
||||||
|
162
storage/ent/mysql.go
Normal file
162
storage/ent/mysql.go
Normal file
@ -0,0 +1,162 @@
|
|||||||
|
package ent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
entSQL "entgo.io/ent/dialect/sql"
|
||||||
|
"github.com/go-sql-driver/mysql"
|
||||||
|
|
||||||
|
// Register postgres driver.
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
|
||||||
|
"github.com/dexidp/dex/pkg/log"
|
||||||
|
"github.com/dexidp/dex/storage"
|
||||||
|
"github.com/dexidp/dex/storage/ent/client"
|
||||||
|
"github.com/dexidp/dex/storage/ent/db"
|
||||||
|
)
|
||||||
|
|
||||||
|
// nolint
|
||||||
|
const (
|
||||||
|
// MySQL SSL modes
|
||||||
|
mysqlSSLTrue = "true"
|
||||||
|
mysqlSSLFalse = "false"
|
||||||
|
mysqlSSLSkipVerify = "skip-verify"
|
||||||
|
mysqlSSLCustom = "custom"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MySQL options for creating an SQL db.
|
||||||
|
type MySQL struct {
|
||||||
|
NetworkDB
|
||||||
|
|
||||||
|
SSL SSL `json:"ssl"`
|
||||||
|
|
||||||
|
params map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open always returns a new in sqlite3 storage.
|
||||||
|
func (m *MySQL) Open(logger log.Logger) (storage.Storage, error) {
|
||||||
|
logger.Debug("experimental ent-based storage driver is enabled")
|
||||||
|
drv, err := m.driver()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
databaseClient := client.NewDatabase(
|
||||||
|
client.WithClient(db.NewClient(db.Driver(drv))),
|
||||||
|
client.WithHasher(sha256.New),
|
||||||
|
// Set tx isolation leve for each transaction as dex does for postgres
|
||||||
|
client.WithTxIsolationLevel(sql.LevelSerializable),
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := databaseClient.Schema().Create(context.TODO()); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return databaseClient, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MySQL) driver() (*entSQL.Driver, error) {
|
||||||
|
var tlsConfig string
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case m.SSL.CAFile != "" || m.SSL.CertFile != "" || m.SSL.KeyFile != "":
|
||||||
|
if err := m.makeTLSConfig(); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to make TLS config: %v", err)
|
||||||
|
}
|
||||||
|
tlsConfig = mysqlSSLCustom
|
||||||
|
case m.SSL.Mode == "":
|
||||||
|
tlsConfig = mysqlSSLTrue
|
||||||
|
default:
|
||||||
|
tlsConfig = m.SSL.Mode
|
||||||
|
}
|
||||||
|
|
||||||
|
drv, err := entSQL.Open("mysql", m.dsn(tlsConfig))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.MaxIdleConns == 0 {
|
||||||
|
/* Override default behaviour to fix https://github.com/dexidp/dex/issues/1608 */
|
||||||
|
drv.DB().SetMaxIdleConns(0)
|
||||||
|
} else {
|
||||||
|
drv.DB().SetMaxIdleConns(m.MaxIdleConns)
|
||||||
|
}
|
||||||
|
|
||||||
|
return drv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MySQL) dsn(tlsConfig string) string {
|
||||||
|
cfg := mysql.Config{
|
||||||
|
User: m.User,
|
||||||
|
Passwd: m.Password,
|
||||||
|
DBName: m.Database,
|
||||||
|
AllowNativePasswords: true,
|
||||||
|
|
||||||
|
Timeout: time.Second * time.Duration(m.ConnectionTimeout),
|
||||||
|
|
||||||
|
TLSConfig: tlsConfig,
|
||||||
|
|
||||||
|
ParseTime: true,
|
||||||
|
Params: make(map[string]string),
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Host != "" {
|
||||||
|
if m.Host[0] != '/' {
|
||||||
|
cfg.Net = "tcp"
|
||||||
|
cfg.Addr = m.Host
|
||||||
|
|
||||||
|
if m.Port != 0 {
|
||||||
|
cfg.Addr = net.JoinHostPort(m.Host, strconv.Itoa(int(m.Port)))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cfg.Net = "unix"
|
||||||
|
cfg.Addr = m.Host
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range m.params {
|
||||||
|
cfg.Params[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return cfg.FormatDSN()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MySQL) makeTLSConfig() error {
|
||||||
|
cfg := &tls.Config{}
|
||||||
|
|
||||||
|
if m.SSL.CAFile != "" {
|
||||||
|
rootCertPool := x509.NewCertPool()
|
||||||
|
|
||||||
|
pem, err := ioutil.ReadFile(m.SSL.CAFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := rootCertPool.AppendCertsFromPEM(pem); !ok {
|
||||||
|
return fmt.Errorf("failed to append PEM")
|
||||||
|
}
|
||||||
|
cfg.RootCAs = rootCertPool
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.SSL.CertFile != "" && m.SSL.KeyFile != "" {
|
||||||
|
clientCert := make([]tls.Certificate, 0, 1)
|
||||||
|
certs, err := tls.LoadX509KeyPair(m.SSL.CertFile, m.SSL.KeyFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
clientCert = append(clientCert, certs)
|
||||||
|
cfg.Certificates = clientCert
|
||||||
|
}
|
||||||
|
|
||||||
|
mysql.RegisterTLSConfig(mysqlSSLCustom, cfg)
|
||||||
|
return nil
|
||||||
|
}
|
183
storage/ent/mysql_test.go
Normal file
183
storage/ent/mysql_test.go
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
package ent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/dexidp/dex/storage"
|
||||||
|
"github.com/dexidp/dex/storage/conformance"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MySQLEntHostEnv = "DEX_MYSQL_ENT_HOST"
|
||||||
|
MySQLEntPortEnv = "DEX_MYSQL_ENT_PORT"
|
||||||
|
MySQLEntDatabaseEnv = "DEX_MYSQL_ENT_DATABASE"
|
||||||
|
MySQLEntUserEnv = "DEX_MYSQL_ENT_USER"
|
||||||
|
MySQLEntPasswordEnv = "DEX_MYSQL_ENT_PASSWORD"
|
||||||
|
)
|
||||||
|
|
||||||
|
func mysqlTestConfig(host string, port uint64) *MySQL {
|
||||||
|
return &MySQL{
|
||||||
|
NetworkDB: NetworkDB{
|
||||||
|
Database: getenv(MySQLEntDatabaseEnv, "mysql"),
|
||||||
|
User: getenv(MySQLEntUserEnv, "mysql"),
|
||||||
|
Password: getenv(MySQLEntPasswordEnv, "mysql"),
|
||||||
|
Host: host,
|
||||||
|
Port: uint16(port),
|
||||||
|
},
|
||||||
|
SSL: SSL{
|
||||||
|
Mode: mysqlSSLSkipVerify,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMySQLStorage(host string, port uint64) storage.Storage {
|
||||||
|
logger := &logrus.Logger{
|
||||||
|
Out: os.Stderr,
|
||||||
|
Formatter: &logrus.TextFormatter{DisableColors: true},
|
||||||
|
Level: logrus.DebugLevel,
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := mysqlTestConfig(host, port)
|
||||||
|
s, err := cfg.Open(logger)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMySQL(t *testing.T) {
|
||||||
|
host := os.Getenv(MySQLEntHostEnv)
|
||||||
|
if host == "" {
|
||||||
|
t.Skipf("test environment variable %s not set, skipping", MySQLEntHostEnv)
|
||||||
|
}
|
||||||
|
|
||||||
|
port := uint64(3306)
|
||||||
|
if rawPort := os.Getenv(MySQLEntPortEnv); rawPort != "" {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
port, err = strconv.ParseUint(rawPort, 10, 32)
|
||||||
|
require.NoError(t, err, "invalid mysql port %q: %s", rawPort, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
newStorage := func() storage.Storage {
|
||||||
|
return newMySQLStorage(host, port)
|
||||||
|
}
|
||||||
|
conformance.RunTests(t, newStorage)
|
||||||
|
conformance.RunTransactionTests(t, newStorage)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMySQLDSN(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
cfg *MySQL
|
||||||
|
desiredDSN string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Host port",
|
||||||
|
cfg: &MySQL{
|
||||||
|
NetworkDB: NetworkDB{
|
||||||
|
Host: "localhost",
|
||||||
|
Port: uint16(3306),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
desiredDSN: "tcp(localhost:3306)/?checkConnLiveness=false&parseTime=true&tls=false&maxAllowedPacket=0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Host with port",
|
||||||
|
cfg: &MySQL{
|
||||||
|
NetworkDB: NetworkDB{
|
||||||
|
Host: "localhost:3306",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
desiredDSN: "tcp(localhost:3306)/?checkConnLiveness=false&parseTime=true&tls=false&maxAllowedPacket=0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Host ipv6 with port",
|
||||||
|
cfg: &MySQL{
|
||||||
|
NetworkDB: NetworkDB{
|
||||||
|
Host: "[a:b:c:d]:3306",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
desiredDSN: "tcp([a:b:c:d]:3306)/?checkConnLiveness=false&parseTime=true&tls=false&maxAllowedPacket=0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Credentials and timeout",
|
||||||
|
cfg: &MySQL{
|
||||||
|
NetworkDB: NetworkDB{
|
||||||
|
Database: "test",
|
||||||
|
User: "test",
|
||||||
|
Password: "test",
|
||||||
|
ConnectionTimeout: 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
desiredDSN: "test:test@/test?checkConnLiveness=false&parseTime=true&timeout=5s&tls=false&maxAllowedPacket=0",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "SSL",
|
||||||
|
cfg: &MySQL{
|
||||||
|
SSL: SSL{
|
||||||
|
CAFile: "/ca.crt",
|
||||||
|
KeyFile: "/cert.crt",
|
||||||
|
CertFile: "/cert.key",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
desiredDSN: "/?checkConnLiveness=false&parseTime=true&tls=false&maxAllowedPacket=0",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
require.Equal(t, tt.desiredDSN, tt.cfg.dsn(mysqlSSLFalse))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMySQLDriver(t *testing.T) {
|
||||||
|
host := os.Getenv(MySQLEntHostEnv)
|
||||||
|
if host == "" {
|
||||||
|
t.Skipf("test environment variable %s not set, skipping", MySQLEntHostEnv)
|
||||||
|
}
|
||||||
|
|
||||||
|
port := uint64(3306)
|
||||||
|
if rawPort := os.Getenv(MySQLEntPortEnv); rawPort != "" {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
port, err = strconv.ParseUint(rawPort, 10, 32)
|
||||||
|
require.NoError(t, err, "invalid mysql port %q: %s", rawPort, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
cfg func() *MySQL
|
||||||
|
desiredConns int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Defaults",
|
||||||
|
cfg: func() *MySQL { return mysqlTestConfig(host, port) },
|
||||||
|
desiredConns: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Tune",
|
||||||
|
cfg: func() *MySQL {
|
||||||
|
cfg := mysqlTestConfig(host, port)
|
||||||
|
cfg.MaxOpenConns = 101
|
||||||
|
return cfg
|
||||||
|
},
|
||||||
|
desiredConns: 101,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
drv, err := tt.cfg().driver()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, tt.desiredConns, drv.DB().Stats().MaxOpenConnections)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -20,13 +20,6 @@ const (
|
|||||||
PostgresEntPasswordEnv = "DEX_POSTGRES_ENT_PASSWORD"
|
PostgresEntPasswordEnv = "DEX_POSTGRES_ENT_PASSWORD"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getenv(key, defaultVal string) string {
|
|
||||||
if val := os.Getenv(key); val != "" {
|
|
||||||
return val
|
|
||||||
}
|
|
||||||
return defaultVal
|
|
||||||
}
|
|
||||||
|
|
||||||
func postgresTestConfig(host string, port uint64) *Postgres {
|
func postgresTestConfig(host string, port uint64) *Postgres {
|
||||||
return &Postgres{
|
return &Postgres{
|
||||||
NetworkDB: NetworkDB{
|
NetworkDB: NetworkDB{
|
||||||
|
@ -33,12 +33,10 @@ func (s *SQLite3) Open(logger log.Logger) (storage.Storage, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// always allow only one connection to sqlite3, any other thread/go-routine
|
||||||
|
// attempting concurrent access will have to wait
|
||||||
pool := drv.DB()
|
pool := drv.DB()
|
||||||
if s.File == ":memory:" {
|
|
||||||
// sqlite3 uses file locks to coordinate concurrent access. In memory
|
|
||||||
// doesn't support this, so limit the number of connections to 1.
|
|
||||||
pool.SetMaxOpenConns(1)
|
pool.SetMaxOpenConns(1)
|
||||||
}
|
|
||||||
|
|
||||||
databaseClient := client.NewDatabase(
|
databaseClient := client.NewDatabase(
|
||||||
client.WithClient(db.NewClient(db.Driver(drv))),
|
client.WithClient(db.NewClient(db.Driver(drv))),
|
||||||
|
10
storage/ent/utils.go
Normal file
10
storage/ent/utils.go
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
package ent
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
func getenv(key, defaultVal string) string {
|
||||||
|
if val := os.Getenv(key); val != "" {
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
return defaultVal
|
||||||
|
}
|
Reference in New Issue
Block a user