server: update health check endpoint to query storage periodically
Instead of querying the storage every time a health check is performed query it periodically and save the result.
This commit is contained in:
parent
be171a2a53
commit
8935a1479c
@ -1,6 +1,7 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -10,6 +11,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
@ -20,31 +22,85 @@ import (
|
|||||||
"github.com/dexidp/dex/storage"
|
"github.com/dexidp/dex/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
// newHealthChecker returns the healthz handler. The handler runs until the
|
||||||
start := s.now()
|
// provided context is canceled.
|
||||||
err := func() error {
|
func (s *Server) newHealthChecker(ctx context.Context) http.Handler {
|
||||||
// Instead of trying to introspect health, just try to use the underlying storage.
|
h := &healthChecker{s: s}
|
||||||
|
|
||||||
|
// Perform one health check synchronously so the returned handler returns
|
||||||
|
// valid data immediately.
|
||||||
|
h.runHealthCheck()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(time.Second * 15):
|
||||||
|
}
|
||||||
|
h.runHealthCheck()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
// healthChecker periodically performs health checks on server dependenices.
|
||||||
|
// Currently, it only checks that the storage layer is avialable.
|
||||||
|
type healthChecker struct {
|
||||||
|
s *Server
|
||||||
|
|
||||||
|
// Result of the last health check: any error and the amount of time it took
|
||||||
|
// to query the storage.
|
||||||
|
mu sync.RWMutex
|
||||||
|
// Guarded by the mutex
|
||||||
|
err error
|
||||||
|
passed time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// runHealthCheck performs a single health check and makes the result available
|
||||||
|
// for any clients performing and HTTP request against the healthChecker.
|
||||||
|
func (h *healthChecker) runHealthCheck() {
|
||||||
|
t := h.s.now()
|
||||||
|
err := checkStorageHealth(h.s.storage, h.s.now)
|
||||||
|
passed := h.s.now().Sub(t)
|
||||||
|
if err != nil {
|
||||||
|
h.s.logger.Errorf("Storage health check failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure to only hold the mutex to access the fields, and not while
|
||||||
|
// we're querying the storage object.
|
||||||
|
h.mu.Lock()
|
||||||
|
h.err = err
|
||||||
|
h.passed = passed
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkStorageHealth(s storage.Storage, now func() time.Time) error {
|
||||||
a := storage.AuthRequest{
|
a := storage.AuthRequest{
|
||||||
ID: storage.NewID(),
|
ID: storage.NewID(),
|
||||||
ClientID: storage.NewID(),
|
ClientID: storage.NewID(),
|
||||||
|
|
||||||
// Set a short expiry so if the delete fails this will be cleaned up quickly by garbage collection.
|
// Set a short expiry so if the delete fails this will be cleaned up quickly by garbage collection.
|
||||||
Expiry: s.now().Add(time.Minute),
|
Expiry: now().Add(time.Minute),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.storage.CreateAuthRequest(a); err != nil {
|
if err := s.CreateAuthRequest(a); err != nil {
|
||||||
return fmt.Errorf("create auth request: %v", err)
|
return fmt.Errorf("create auth request: %v", err)
|
||||||
}
|
}
|
||||||
if err := s.storage.DeleteAuthRequest(a.ID); err != nil {
|
if err := s.DeleteAuthRequest(a.ID); err != nil {
|
||||||
return fmt.Errorf("delete auth request: %v", err)
|
return fmt.Errorf("delete auth request: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}()
|
}
|
||||||
|
|
||||||
|
func (h *healthChecker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
h.mu.RLock()
|
||||||
|
err := h.err
|
||||||
|
t := h.passed
|
||||||
|
h.mu.RUnlock()
|
||||||
|
|
||||||
t := s.now().Sub(start)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Errorf("Storage health check failed: %v", err)
|
h.s.renderError(w, http.StatusInternalServerError, "Health check failed.")
|
||||||
s.renderError(w, http.StatusInternalServerError, "Health check failed.")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Fprintf(w, "Health check passed in %s", t)
|
fmt.Fprintf(w, "Health check passed in %s", t)
|
||||||
|
@ -2,9 +2,12 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/dexidp/dex/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHandleHealth(t *testing.T) {
|
func TestHandleHealth(t *testing.T) {
|
||||||
@ -15,9 +18,33 @@ func TestHandleHealth(t *testing.T) {
|
|||||||
defer httpServer.Close()
|
defer httpServer.Close()
|
||||||
|
|
||||||
rr := httptest.NewRecorder()
|
rr := httptest.NewRecorder()
|
||||||
server.handleHealth(rr, httptest.NewRequest("GET", "/healthz", nil))
|
server.ServeHTTP(rr, httptest.NewRequest("GET", "/healthz", nil))
|
||||||
if rr.Code != http.StatusOK {
|
if rr.Code != http.StatusOK {
|
||||||
t.Errorf("expected 200 got %d", rr.Code)
|
t.Errorf("expected 200 got %d", rr.Code)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type badStorage struct {
|
||||||
|
storage.Storage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *badStorage) CreateAuthRequest(r storage.AuthRequest) error {
|
||||||
|
return errors.New("storage unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandleHealthFailure(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
httpServer, server := newTestServer(ctx, t, func(c *Config) {
|
||||||
|
c.Storage = &badStorage{c.Storage}
|
||||||
|
})
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
|
rr := httptest.NewRecorder()
|
||||||
|
server.ServeHTTP(rr, httptest.NewRequest("GET", "/healthz", nil))
|
||||||
|
if rr.Code != http.StatusInternalServerError {
|
||||||
|
t.Errorf("expected 500 got %d", rr.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -242,8 +242,11 @@ func newServer(ctx context.Context, c Config, rotationStrategy rotationStrategy)
|
|||||||
}
|
}
|
||||||
|
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
|
handle := func(p string, h http.Handler) {
|
||||||
|
r.Handle(path.Join(issuerURL.Path, p), instrumentHandlerCounter(p, h))
|
||||||
|
}
|
||||||
handleFunc := func(p string, h http.HandlerFunc) {
|
handleFunc := func(p string, h http.HandlerFunc) {
|
||||||
r.HandleFunc(path.Join(issuerURL.Path, p), instrumentHandlerCounter(p, h))
|
handle(p, h)
|
||||||
}
|
}
|
||||||
handlePrefix := func(p string, h http.Handler) {
|
handlePrefix := func(p string, h http.Handler) {
|
||||||
prefix := path.Join(issuerURL.Path, p)
|
prefix := path.Join(issuerURL.Path, p)
|
||||||
@ -284,7 +287,7 @@ func newServer(ctx context.Context, c Config, rotationStrategy rotationStrategy)
|
|||||||
// "authproxy" connector.
|
// "authproxy" connector.
|
||||||
handleFunc("/callback/{connector}", s.handleConnectorCallback)
|
handleFunc("/callback/{connector}", s.handleConnectorCallback)
|
||||||
handleFunc("/approval", s.handleApproval)
|
handleFunc("/approval", s.handleApproval)
|
||||||
handleFunc("/healthz", s.handleHealth)
|
handle("/healthz", s.newHealthChecker(ctx))
|
||||||
handlePrefix("/static", static)
|
handlePrefix("/static", static)
|
||||||
handlePrefix("/theme", theme)
|
handlePrefix("/theme", theme)
|
||||||
s.mux = r
|
s.mux = r
|
||||||
|
Reference in New Issue
Block a user