From f82c217e1222ac82682d440ad555ade78d1f52a4 Mon Sep 17 00:00:00 2001 From: "m.nabokikh" Date: Tue, 26 Jan 2021 12:16:30 +0400 Subject: [PATCH 1/3] feat: graceful shutdown Signed-off-by: m.nabokikh --- cmd/dex/serve.go | 113 ++++++++++++++++++++++++++++++++--------------- go.mod | 1 + go.sum | 2 + 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/cmd/dex/serve.go b/cmd/dex/serve.go index 634e2606..4ec48956 100644 --- a/cmd/dex/serve.go +++ b/cmd/dex/serve.go @@ -10,11 +10,14 @@ import ( "net" "net/http" "os" + "os/signal" "strings" + "syscall" "time" "github.com/ghodss/yaml" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -68,6 +71,28 @@ func commandServe() *cobra.Command { return cmd } +func listenAndShutdownGracefully(logger log.Logger, gr *run.Group, srv *http.Server, name string) error { + l, err := net.Listen("tcp", srv.Addr) + if err != nil { + return fmt.Errorf("listening (%s) on %s: %v", name, srv.Addr, err) + } + + gr.Add(func() error { + logger.Infof("listening (%s) on %s", name, srv.Addr) + return srv.Serve(l) + }, func(err error) { + logger.Debugf("starting gracefully shutdown (%s)", name) + if err := l.Close(); err != nil { + logger.Errorf("gracefully close (%s) listener: %v", name, err) + } + + if err := srv.Shutdown(context.Background()); err != nil { + logger.Errorf("gracefully shutdown (%s): %v", name, err) + } + }) + return nil +} + func runServe(options serveOptions) error { configFile := options.config configData, err := ioutil.ReadFile(configFile) @@ -302,21 +327,21 @@ func runServe(options serveOptions) error { telemetryServ := http.NewServeMux() telemetryServ.Handle("/metrics", promhttp.HandlerFor(prometheusRegistry, promhttp.HandlerOpts{})) - errc := make(chan error, 3) + var gr run.Group if c.Telemetry.HTTP != "" { - logger.Infof("listening (http/telemetry) on %s", c.Telemetry.HTTP) - go func() { - err := http.ListenAndServe(c.Telemetry.HTTP, telemetryServ) - errc <- fmt.Errorf("listening on %s failed: %v", c.Telemetry.HTTP, err) - }() + telemetrySrv := &http.Server{Addr: c.Telemetry.HTTP, Handler: telemetryServ} + if err := listenAndShutdownGracefully(logger, &gr, telemetrySrv, "http/telemetry"); err != nil { + return err + } } + if c.Web.HTTP != "" { - logger.Infof("listening (http) on %s", c.Web.HTTP) - go func() { - err := http.ListenAndServe(c.Web.HTTP, serv) - errc <- fmt.Errorf("listening on %s failed: %v", c.Web.HTTP, err) - }() + httpSrv := &http.Server{Addr: c.Web.HTTP, Handler: serv} + if err := listenAndShutdownGracefully(logger, &gr, httpSrv, "http"); err != nil { + return err + } } + if c.Web.HTTPS != "" { httpsSrv := &http.Server{ Addr: c.Web.HTTPS, @@ -327,35 +352,51 @@ func runServe(options serveOptions) error { MinVersion: tls.VersionTLS12, }, } - - logger.Infof("listening (https) on %s", c.Web.HTTPS) - go func() { - err = httpsSrv.ListenAndServeTLS(c.Web.TLSCert, c.Web.TLSKey) - errc <- fmt.Errorf("listening on %s failed: %v", c.Web.HTTPS, err) - }() + if err := listenAndShutdownGracefully(logger, &gr, httpsSrv, "https"); err != nil { + return err + } } + if c.GRPC.Addr != "" { - logger.Infof("listening (grpc) on %s", c.GRPC.Addr) - go func() { - errc <- func() error { - list, err := net.Listen("tcp", c.GRPC.Addr) - if err != nil { - return fmt.Errorf("listening on %s failed: %v", c.GRPC.Addr, err) - } - s := grpc.NewServer(grpcOptions...) - api.RegisterDexServer(s, server.NewAPI(serverConfig.Storage, logger)) - grpcMetrics.InitializeMetrics(s) - if c.GRPC.Reflection { - logger.Info("enabling reflection in grpc service") - reflection.Register(s) - } - err = s.Serve(list) - return fmt.Errorf("listening on %s failed: %v", c.GRPC.Addr, err) - }() - }() + grpcListener, err := net.Listen("tcp", c.GRPC.Addr) + if err != nil { + return fmt.Errorf("listening (grcp) on %s: %w", c.GRPC.Addr, err) + } + + grpcSrv := grpc.NewServer(grpcOptions...) + api.RegisterDexServer(grpcSrv, server.NewAPI(serverConfig.Storage, logger)) + grpcMetrics.InitializeMetrics(grpcSrv) + if c.GRPC.Reflection { + logger.Info("enabling reflection in grpc service") + reflection.Register(grpcSrv) + } + + gr.Add(func() error { + logger.Infof("listening (grpc) on %s", c.GRPC.Addr) + return grpcSrv.Serve(grpcListener) + }, func(err error) { + logger.Debugf("starting gracefully shutdown (grpc)") + if err := grpcListener.Close(); err != nil { + logger.Errorf("failed to gracefully close (grpc) listener: %v", err) + } + grpcSrv.GracefulStop() + }) } - return <-errc + sig := make(chan os.Signal) + gr.Add(func() error { + signal.Notify(sig, os.Interrupt, syscall.SIGTERM) + receivedSig := <-sig + logger.Infof("received %s signal, shutting down", receivedSig) + return nil + }, func(err error) { + close(sig) + }) + + if err := gr.Run(); err != nil { + return fmt.Errorf("run groups: %w", err) + } + return nil } var ( diff --git a/go.mod b/go.mod index 8a73117c..2b279879 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/lib/pq v1.9.0 github.com/mattermost/xml-roundtrip-validator v0.0.0-20201219040909-8fd2afad43d1 github.com/mattn/go-sqlite3 v1.14.6 + github.com/oklog/run v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.4.0 github.com/russellhaering/goxmldsig v1.1.0 diff --git a/go.sum b/go.sum index b5c0804e..1147f6e5 100644 --- a/go.sum +++ b/go.sum @@ -253,6 +253,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c h1:nXxl5PrvVm2L/wCy8dQu6DMTwH4oIuGN8GJDAlqDdVE= github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= +github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.4.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= From 65a8bf2af37a57a00b1407be7dc367b976e6c69c Mon Sep 17 00:00:00 2001 From: Maksim Nabokikh <32434187+nabokihms@users.noreply.github.com> Date: Tue, 26 Jan 2021 14:30:35 +0400 Subject: [PATCH 2/3] feat: graceful shutdown fixes Signed-off-by: m.nabokikh --- cmd/dex/serve.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/cmd/dex/serve.go b/cmd/dex/serve.go index 4ec48956..d6934ea4 100644 --- a/cmd/dex/serve.go +++ b/cmd/dex/serve.go @@ -10,7 +10,6 @@ import ( "net" "net/http" "os" - "os/signal" "strings" "syscall" "time" @@ -81,12 +80,11 @@ func listenAndShutdownGracefully(logger log.Logger, gr *run.Group, srv *http.Ser logger.Infof("listening (%s) on %s", name, srv.Addr) return srv.Serve(l) }, func(err error) { - logger.Debugf("starting gracefully shutdown (%s)", name) - if err := l.Close(); err != nil { - logger.Errorf("gracefully close (%s) listener: %v", name, err) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() - if err := srv.Shutdown(context.Background()); err != nil { + logger.Debugf("starting gracefully shutdown (%s)", name) + if err := srv.Shutdown(ctx); err != nil { logger.Errorf("gracefully shutdown (%s): %v", name, err) } }) @@ -330,6 +328,8 @@ func runServe(options serveOptions) error { var gr run.Group if c.Telemetry.HTTP != "" { telemetrySrv := &http.Server{Addr: c.Telemetry.HTTP, Handler: telemetryServ} + + defer telemetrySrv.Close() if err := listenAndShutdownGracefully(logger, &gr, telemetrySrv, "http/telemetry"); err != nil { return err } @@ -337,6 +337,8 @@ func runServe(options serveOptions) error { if c.Web.HTTP != "" { httpSrv := &http.Server{Addr: c.Web.HTTP, Handler: serv} + + defer httpSrv.Close() if err := listenAndShutdownGracefully(logger, &gr, httpSrv, "http"); err != nil { return err } @@ -352,6 +354,8 @@ func runServe(options serveOptions) error { MinVersion: tls.VersionTLS12, }, } + + defer httpsSrv.Close() if err := listenAndShutdownGracefully(logger, &gr, httpsSrv, "https"); err != nil { return err } @@ -365,6 +369,7 @@ func runServe(options serveOptions) error { grpcSrv := grpc.NewServer(grpcOptions...) api.RegisterDexServer(grpcSrv, server.NewAPI(serverConfig.Storage, logger)) + grpcMetrics.InitializeMetrics(grpcSrv) if c.GRPC.Reflection { logger.Info("enabling reflection in grpc service") @@ -376,25 +381,16 @@ func runServe(options serveOptions) error { return grpcSrv.Serve(grpcListener) }, func(err error) { logger.Debugf("starting gracefully shutdown (grpc)") - if err := grpcListener.Close(); err != nil { - logger.Errorf("failed to gracefully close (grpc) listener: %v", err) - } grpcSrv.GracefulStop() }) } - sig := make(chan os.Signal) - gr.Add(func() error { - signal.Notify(sig, os.Interrupt, syscall.SIGTERM) - receivedSig := <-sig - logger.Infof("received %s signal, shutting down", receivedSig) - return nil - }, func(err error) { - close(sig) - }) - + gr.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM)) if err := gr.Run(); err != nil { - return fmt.Errorf("run groups: %w", err) + if _, ok := err.(run.SignalError); !ok { + return fmt.Errorf("run groups: %w", err) + } + logger.Infof("%v, shutdown now", err) } return nil } From 6664b5702dbf57d3e86cb27236d84d601aabfb7a Mon Sep 17 00:00:00 2001 From: Maksim Nabokikh <32434187+nabokihms@users.noreply.github.com> Date: Fri, 5 Feb 2021 13:15:09 +0400 Subject: [PATCH 3/3] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Márk Sági-Kazár Signed-off-by: m.nabokikh --- cmd/dex/serve.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/dex/serve.go b/cmd/dex/serve.go index d6934ea4..7afb8851 100644 --- a/cmd/dex/serve.go +++ b/cmd/dex/serve.go @@ -83,9 +83,9 @@ func listenAndShutdownGracefully(logger log.Logger, gr *run.Group, srv *http.Ser ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - logger.Debugf("starting gracefully shutdown (%s)", name) + logger.Debugf("starting graceful shutdown (%s)", name) if err := srv.Shutdown(ctx); err != nil { - logger.Errorf("gracefully shutdown (%s): %v", name, err) + logger.Errorf("graceful shutdown (%s): %v", name, err) } }) return nil @@ -380,7 +380,7 @@ func runServe(options serveOptions) error { logger.Infof("listening (grpc) on %s", c.GRPC.Addr) return grpcSrv.Serve(grpcListener) }, func(err error) { - logger.Debugf("starting gracefully shutdown (grpc)") + logger.Debugf("starting graceful shutdown (grpc)") grpcSrv.GracefulStop() }) }