217 lines
5.5 KiB
Go
217 lines
5.5 KiB
Go
// Package sql provides SQL implementations of the storage interface.
|
|
package sql
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"regexp"
|
|
"time"
|
|
|
|
"github.com/lib/pq"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
// import third party drivers
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
// flavor represents a specific SQL implementation, and is used to translate query strings
|
|
// between different drivers. Flavors shouldn't aim to translate all possible SQL statements,
|
|
// only the specific queries used by the SQL storages.
|
|
type flavor struct {
|
|
queryReplacers []replacer
|
|
|
|
// Optional function to create and finish a transaction.
|
|
executeTx func(db *sql.DB, fn func(*sql.Tx) error) error
|
|
|
|
// Does the flavor support timezones?
|
|
supportsTimezones bool
|
|
}
|
|
|
|
// A regexp with a replacement string.
|
|
type replacer struct {
|
|
re *regexp.Regexp
|
|
with string
|
|
}
|
|
|
|
// Match a postgres query binds. E.g. "$1", "$12", etc.
|
|
var bindRegexp = regexp.MustCompile(`\$\d+`)
|
|
|
|
func matchLiteral(s string) *regexp.Regexp {
|
|
return regexp.MustCompile(`\b` + regexp.QuoteMeta(s) + `\b`)
|
|
}
|
|
|
|
// Detect a serialization failure, which should trigger retrying the
|
|
// transaction according to PostgreSQL docs:
|
|
//
|
|
// https://www.postgresql.org/docs/current/transaction-iso.html#XACT-SERIALIZABLE
|
|
//
|
|
// "applications using this level must be prepared to retry transactions due to
|
|
// serialization failures"
|
|
func isRetryableSerializationFailure(err error) bool {
|
|
if pqErr, ok := err.(*pq.Error); ok {
|
|
return pqErr.Code.Name() == "serialization_failure"
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
var (
|
|
// The "github.com/lib/pq" driver is the default flavor. All others are
|
|
// translations of this.
|
|
flavorPostgres = flavor{
|
|
// The default behavior for Postgres transactions is consistent reads, not
|
|
// consistent writes. For each transaction opened, ensure it has the
|
|
// correct isolation level.
|
|
//
|
|
// See: https://www.postgresql.org/docs/9.3/static/sql-set-transaction.html
|
|
//
|
|
// Be careful not to wrap sql errors in the callback 'fn', otherwise
|
|
// serialization failures will not be detected and retried.
|
|
executeTx: func(db *sql.DB, fn func(sqlTx *sql.Tx) error) error {
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
defer cancel()
|
|
|
|
opts := &sql.TxOptions{
|
|
Isolation: sql.LevelSerializable,
|
|
}
|
|
|
|
for {
|
|
tx, err := db.BeginTx(ctx, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := fn(tx); err != nil {
|
|
if isRetryableSerializationFailure(err) {
|
|
continue
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
if isRetryableSerializationFailure(err) {
|
|
continue
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
},
|
|
|
|
supportsTimezones: true,
|
|
}
|
|
|
|
flavorSQLite3 = flavor{
|
|
queryReplacers: []replacer{
|
|
{bindRegexp, "?"},
|
|
// Translate for booleans to integers.
|
|
{matchLiteral("true"), "1"},
|
|
{matchLiteral("false"), "0"},
|
|
{matchLiteral("boolean"), "integer"},
|
|
// Translate other types.
|
|
{matchLiteral("bytea"), "blob"},
|
|
{matchLiteral("timestamptz"), "timestamp"},
|
|
// SQLite doesn't have a "now()" method, replace with "date('now')"
|
|
{regexp.MustCompile(`\bnow\(\)`), "date('now')"},
|
|
},
|
|
}
|
|
)
|
|
|
|
func (f flavor) translate(query string) string {
|
|
// TODO(ericchiang): Heavy cashing.
|
|
for _, r := range f.queryReplacers {
|
|
query = r.re.ReplaceAllString(query, r.with)
|
|
}
|
|
return query
|
|
}
|
|
|
|
// translateArgs translates query parameters that may be unique to
|
|
// a specific SQL flavor. For example, standardizing "time.Time"
|
|
// types to UTC for clients that don't provide timezone support.
|
|
func (c *conn) translateArgs(args []interface{}) []interface{} {
|
|
if c.flavor.supportsTimezones {
|
|
return args
|
|
}
|
|
|
|
for i, arg := range args {
|
|
if t, ok := arg.(time.Time); ok {
|
|
args[i] = t.UTC()
|
|
}
|
|
}
|
|
return args
|
|
}
|
|
|
|
// conn is the main database connection.
|
|
type conn struct {
|
|
db *sql.DB
|
|
flavor flavor
|
|
logger logrus.FieldLogger
|
|
alreadyExistsCheck func(err error) bool
|
|
}
|
|
|
|
func (c *conn) Close() error {
|
|
return c.db.Close()
|
|
}
|
|
|
|
// conn implements the same method signatures as encoding/sql.DB.
|
|
|
|
func (c *conn) Exec(query string, args ...interface{}) (sql.Result, error) {
|
|
query = c.flavor.translate(query)
|
|
return c.db.Exec(query, c.translateArgs(args)...)
|
|
}
|
|
|
|
func (c *conn) Query(query string, args ...interface{}) (*sql.Rows, error) {
|
|
query = c.flavor.translate(query)
|
|
return c.db.Query(query, c.translateArgs(args)...)
|
|
}
|
|
|
|
func (c *conn) QueryRow(query string, args ...interface{}) *sql.Row {
|
|
query = c.flavor.translate(query)
|
|
return c.db.QueryRow(query, c.translateArgs(args)...)
|
|
}
|
|
|
|
// ExecTx runs a method which operates on a transaction.
|
|
func (c *conn) ExecTx(fn func(tx *trans) error) error {
|
|
if c.flavor.executeTx != nil {
|
|
return c.flavor.executeTx(c.db, func(sqlTx *sql.Tx) error {
|
|
return fn(&trans{sqlTx, c})
|
|
})
|
|
}
|
|
|
|
sqlTx, err := c.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := fn(&trans{sqlTx, c}); err != nil {
|
|
sqlTx.Rollback()
|
|
return err
|
|
}
|
|
return sqlTx.Commit()
|
|
}
|
|
|
|
type trans struct {
|
|
tx *sql.Tx
|
|
c *conn
|
|
}
|
|
|
|
// trans implements the same method signatures as encoding/sql.Tx.
|
|
|
|
func (t *trans) Exec(query string, args ...interface{}) (sql.Result, error) {
|
|
query = t.c.flavor.translate(query)
|
|
return t.tx.Exec(query, t.c.translateArgs(args)...)
|
|
}
|
|
|
|
func (t *trans) Query(query string, args ...interface{}) (*sql.Rows, error) {
|
|
query = t.c.flavor.translate(query)
|
|
return t.tx.Query(query, t.c.translateArgs(args)...)
|
|
}
|
|
|
|
func (t *trans) QueryRow(query string, args ...interface{}) *sql.Row {
|
|
query = t.c.flavor.translate(query)
|
|
return t.tx.QueryRow(query, t.c.translateArgs(args)...)
|
|
}
|