Merge pull request #1963 from flant/graceful-shutdown
feat: graceful shutdown
This commit is contained in:
		
							
								
								
									
										111
									
								
								cmd/dex/serve.go
									
									
									
									
									
								
							
							
						
						
									
										111
									
								
								cmd/dex/serve.go
									
									
									
									
									
								
							@@ -11,10 +11,12 @@ import (
 | 
				
			|||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
 | 
						"syscall"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/ghodss/yaml"
 | 
						"github.com/ghodss/yaml"
 | 
				
			||||||
	grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
 | 
						grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
 | 
				
			||||||
 | 
						"github.com/oklog/run"
 | 
				
			||||||
	"github.com/prometheus/client_golang/prometheus"
 | 
						"github.com/prometheus/client_golang/prometheus"
 | 
				
			||||||
	"github.com/prometheus/client_golang/prometheus/promhttp"
 | 
						"github.com/prometheus/client_golang/prometheus/promhttp"
 | 
				
			||||||
	"github.com/sirupsen/logrus"
 | 
						"github.com/sirupsen/logrus"
 | 
				
			||||||
@@ -68,6 +70,27 @@ func commandServe() *cobra.Command {
 | 
				
			|||||||
	return cmd
 | 
						return cmd
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func listenAndShutdownGracefully(logger log.Logger, gr *run.Group, srv *http.Server, name string) error {
 | 
				
			||||||
 | 
						l, err := net.Listen("tcp", srv.Addr)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("listening (%s) on %s: %v", name, srv.Addr, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						gr.Add(func() error {
 | 
				
			||||||
 | 
							logger.Infof("listening (%s) on %s", name, srv.Addr)
 | 
				
			||||||
 | 
							return srv.Serve(l)
 | 
				
			||||||
 | 
						}, func(err error) {
 | 
				
			||||||
 | 
							ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
 | 
				
			||||||
 | 
							defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							logger.Debugf("starting graceful shutdown (%s)", name)
 | 
				
			||||||
 | 
							if err := srv.Shutdown(ctx); err != nil {
 | 
				
			||||||
 | 
								logger.Errorf("graceful shutdown (%s): %v", name, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func runServe(options serveOptions) error {
 | 
					func runServe(options serveOptions) error {
 | 
				
			||||||
	configFile := options.config
 | 
						configFile := options.config
 | 
				
			||||||
	configData, err := ioutil.ReadFile(configFile)
 | 
						configData, err := ioutil.ReadFile(configFile)
 | 
				
			||||||
@@ -302,21 +325,25 @@ func runServe(options serveOptions) error {
 | 
				
			|||||||
	telemetryServ := http.NewServeMux()
 | 
						telemetryServ := http.NewServeMux()
 | 
				
			||||||
	telemetryServ.Handle("/metrics", promhttp.HandlerFor(prometheusRegistry, promhttp.HandlerOpts{}))
 | 
						telemetryServ.Handle("/metrics", promhttp.HandlerFor(prometheusRegistry, promhttp.HandlerOpts{}))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	errc := make(chan error, 3)
 | 
						var gr run.Group
 | 
				
			||||||
	if c.Telemetry.HTTP != "" {
 | 
						if c.Telemetry.HTTP != "" {
 | 
				
			||||||
		logger.Infof("listening (http/telemetry) on %s", c.Telemetry.HTTP)
 | 
							telemetrySrv := &http.Server{Addr: c.Telemetry.HTTP, Handler: telemetryServ}
 | 
				
			||||||
		go func() {
 | 
					
 | 
				
			||||||
			err := http.ListenAndServe(c.Telemetry.HTTP, telemetryServ)
 | 
							defer telemetrySrv.Close()
 | 
				
			||||||
			errc <- fmt.Errorf("listening on %s failed: %v", c.Telemetry.HTTP, err)
 | 
							if err := listenAndShutdownGracefully(logger, &gr, telemetrySrv, "http/telemetry"); err != nil {
 | 
				
			||||||
		}()
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if c.Web.HTTP != "" {
 | 
						if c.Web.HTTP != "" {
 | 
				
			||||||
		logger.Infof("listening (http) on %s", c.Web.HTTP)
 | 
							httpSrv := &http.Server{Addr: c.Web.HTTP, Handler: serv}
 | 
				
			||||||
		go func() {
 | 
					
 | 
				
			||||||
			err := http.ListenAndServe(c.Web.HTTP, serv)
 | 
							defer httpSrv.Close()
 | 
				
			||||||
			errc <- fmt.Errorf("listening on %s failed: %v", c.Web.HTTP, err)
 | 
							if err := listenAndShutdownGracefully(logger, &gr, httpSrv, "http"); err != nil {
 | 
				
			||||||
		}()
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if c.Web.HTTPS != "" {
 | 
						if c.Web.HTTPS != "" {
 | 
				
			||||||
		httpsSrv := &http.Server{
 | 
							httpsSrv := &http.Server{
 | 
				
			||||||
			Addr:    c.Web.HTTPS,
 | 
								Addr:    c.Web.HTTPS,
 | 
				
			||||||
@@ -328,34 +355,44 @@ func runServe(options serveOptions) error {
 | 
				
			|||||||
			},
 | 
								},
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		logger.Infof("listening (https) on %s", c.Web.HTTPS)
 | 
							defer httpsSrv.Close()
 | 
				
			||||||
		go func() {
 | 
							if err := listenAndShutdownGracefully(logger, &gr, httpsSrv, "https"); err != nil {
 | 
				
			||||||
			err = httpsSrv.ListenAndServeTLS(c.Web.TLSCert, c.Web.TLSKey)
 | 
								return err
 | 
				
			||||||
			errc <- fmt.Errorf("listening on %s failed: %v", c.Web.HTTPS, err)
 | 
							}
 | 
				
			||||||
		}()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if c.GRPC.Addr != "" {
 | 
					 | 
				
			||||||
		logger.Infof("listening (grpc) on %s", c.GRPC.Addr)
 | 
					 | 
				
			||||||
		go func() {
 | 
					 | 
				
			||||||
			errc <- func() error {
 | 
					 | 
				
			||||||
				list, err := net.Listen("tcp", c.GRPC.Addr)
 | 
					 | 
				
			||||||
				if err != nil {
 | 
					 | 
				
			||||||
					return fmt.Errorf("listening on %s failed: %v", c.GRPC.Addr, err)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				s := grpc.NewServer(grpcOptions...)
 | 
					 | 
				
			||||||
				api.RegisterDexServer(s, server.NewAPI(serverConfig.Storage, logger))
 | 
					 | 
				
			||||||
				grpcMetrics.InitializeMetrics(s)
 | 
					 | 
				
			||||||
				if c.GRPC.Reflection {
 | 
					 | 
				
			||||||
					logger.Info("enabling reflection in grpc service")
 | 
					 | 
				
			||||||
					reflection.Register(s)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				err = s.Serve(list)
 | 
					 | 
				
			||||||
				return fmt.Errorf("listening on %s failed: %v", c.GRPC.Addr, err)
 | 
					 | 
				
			||||||
			}()
 | 
					 | 
				
			||||||
		}()
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return <-errc
 | 
						if c.GRPC.Addr != "" {
 | 
				
			||||||
 | 
							grpcListener, err := net.Listen("tcp", c.GRPC.Addr)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return fmt.Errorf("listening (grcp) on %s: %w", c.GRPC.Addr, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							grpcSrv := grpc.NewServer(grpcOptions...)
 | 
				
			||||||
 | 
							api.RegisterDexServer(grpcSrv, server.NewAPI(serverConfig.Storage, logger))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							grpcMetrics.InitializeMetrics(grpcSrv)
 | 
				
			||||||
 | 
							if c.GRPC.Reflection {
 | 
				
			||||||
 | 
								logger.Info("enabling reflection in grpc service")
 | 
				
			||||||
 | 
								reflection.Register(grpcSrv)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							gr.Add(func() error {
 | 
				
			||||||
 | 
								logger.Infof("listening (grpc) on %s", c.GRPC.Addr)
 | 
				
			||||||
 | 
								return grpcSrv.Serve(grpcListener)
 | 
				
			||||||
 | 
							}, func(err error) {
 | 
				
			||||||
 | 
								logger.Debugf("starting graceful shutdown (grpc)")
 | 
				
			||||||
 | 
								grpcSrv.GracefulStop()
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						gr.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM))
 | 
				
			||||||
 | 
						if err := gr.Run(); err != nil {
 | 
				
			||||||
 | 
							if _, ok := err.(run.SignalError); !ok {
 | 
				
			||||||
 | 
								return fmt.Errorf("run groups: %w", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							logger.Infof("%v, shutdown now", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								go.mod
									
									
									
									
									
								
							@@ -18,6 +18,7 @@ require (
 | 
				
			|||||||
	github.com/lib/pq v1.9.0
 | 
						github.com/lib/pq v1.9.0
 | 
				
			||||||
	github.com/mattermost/xml-roundtrip-validator v0.0.0-20201219040909-8fd2afad43d1
 | 
						github.com/mattermost/xml-roundtrip-validator v0.0.0-20201219040909-8fd2afad43d1
 | 
				
			||||||
	github.com/mattn/go-sqlite3 v1.14.6
 | 
						github.com/mattn/go-sqlite3 v1.14.6
 | 
				
			||||||
 | 
						github.com/oklog/run v1.1.0
 | 
				
			||||||
	github.com/pkg/errors v0.9.1
 | 
						github.com/pkg/errors v0.9.1
 | 
				
			||||||
	github.com/prometheus/client_golang v1.4.0
 | 
						github.com/prometheus/client_golang v1.4.0
 | 
				
			||||||
	github.com/russellhaering/goxmldsig v1.1.0
 | 
						github.com/russellhaering/goxmldsig v1.1.0
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							@@ -253,6 +253,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
 | 
				
			|||||||
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c h1:nXxl5PrvVm2L/wCy8dQu6DMTwH4oIuGN8GJDAlqDdVE=
 | 
					github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c h1:nXxl5PrvVm2L/wCy8dQu6DMTwH4oIuGN8GJDAlqDdVE=
 | 
				
			||||||
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
 | 
					github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
 | 
				
			||||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 | 
					github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 | 
				
			||||||
 | 
					github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
 | 
				
			||||||
 | 
					github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
 | 
				
			||||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
 | 
					github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
 | 
				
			||||||
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
 | 
					github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
 | 
				
			||||||
github.com/onsi/ginkgo v1.4.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 | 
					github.com/onsi/ginkgo v1.4.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user