166 lines
6.0 KiB
Go
166 lines
6.0 KiB
Go
package grpc_prometheus
|
|
|
|
import (
|
|
"io"
|
|
|
|
prom "github.com/prometheus/client_golang/prometheus"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
)
|
|
|
|
// ClientMetrics represents a collection of metrics to be registered on a
|
|
// Prometheus metrics registry for a gRPC client.
|
|
type ClientMetrics struct {
|
|
clientStartedCounter *prom.CounterVec
|
|
clientHandledCounter *prom.CounterVec
|
|
clientStreamMsgReceived *prom.CounterVec
|
|
clientStreamMsgSent *prom.CounterVec
|
|
clientHandledHistogramEnabled bool
|
|
clientHandledHistogramOpts prom.HistogramOpts
|
|
clientHandledHistogram *prom.HistogramVec
|
|
}
|
|
|
|
// NewClientMetrics returns a ClientMetrics object. Use a new instance of
|
|
// ClientMetrics when not using the default Prometheus metrics registry, for
|
|
// example when wanting to control which metrics are added to a registry as
|
|
// opposed to automatically adding metrics via init functions.
|
|
func NewClientMetrics() *ClientMetrics {
|
|
return &ClientMetrics{
|
|
clientStartedCounter: prom.NewCounterVec(
|
|
prom.CounterOpts{
|
|
Name: "grpc_client_started_total",
|
|
Help: "Total number of RPCs started on the client.",
|
|
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
|
|
|
clientHandledCounter: prom.NewCounterVec(
|
|
prom.CounterOpts{
|
|
Name: "grpc_client_handled_total",
|
|
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
|
|
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
|
|
|
|
clientStreamMsgReceived: prom.NewCounterVec(
|
|
prom.CounterOpts{
|
|
Name: "grpc_client_msg_received_total",
|
|
Help: "Total number of RPC stream messages received by the client.",
|
|
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
|
|
|
clientStreamMsgSent: prom.NewCounterVec(
|
|
prom.CounterOpts{
|
|
Name: "grpc_client_msg_sent_total",
|
|
Help: "Total number of gRPC stream messages sent by the client.",
|
|
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
|
|
|
clientHandledHistogramEnabled: false,
|
|
clientHandledHistogramOpts: prom.HistogramOpts{
|
|
Name: "grpc_client_handling_seconds",
|
|
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
|
|
Buckets: prom.DefBuckets,
|
|
},
|
|
clientHandledHistogram: nil,
|
|
}
|
|
}
|
|
|
|
// Describe sends the super-set of all possible descriptors of metrics
|
|
// collected by this Collector to the provided channel and returns once
|
|
// the last descriptor has been sent.
|
|
func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
|
|
m.clientStartedCounter.Describe(ch)
|
|
m.clientHandledCounter.Describe(ch)
|
|
m.clientStreamMsgReceived.Describe(ch)
|
|
m.clientStreamMsgSent.Describe(ch)
|
|
if m.clientHandledHistogramEnabled {
|
|
m.clientHandledHistogram.Describe(ch)
|
|
}
|
|
}
|
|
|
|
// Collect is called by the Prometheus registry when collecting
|
|
// metrics. The implementation sends each collected metric via the
|
|
// provided channel and returns once the last metric has been sent.
|
|
func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
|
|
m.clientStartedCounter.Collect(ch)
|
|
m.clientHandledCounter.Collect(ch)
|
|
m.clientStreamMsgReceived.Collect(ch)
|
|
m.clientStreamMsgSent.Collect(ch)
|
|
if m.clientHandledHistogramEnabled {
|
|
m.clientHandledHistogram.Collect(ch)
|
|
}
|
|
}
|
|
|
|
// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
|
|
// Histogram metrics can be very expensive for Prometheus to retain and query.
|
|
func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
|
|
for _, o := range opts {
|
|
o(&m.clientHandledHistogramOpts)
|
|
}
|
|
if !m.clientHandledHistogramEnabled {
|
|
m.clientHandledHistogram = prom.NewHistogramVec(
|
|
m.clientHandledHistogramOpts,
|
|
[]string{"grpc_type", "grpc_service", "grpc_method"},
|
|
)
|
|
}
|
|
m.clientHandledHistogramEnabled = true
|
|
}
|
|
|
|
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
|
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
|
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
|
monitor := newClientReporter(m, Unary, method)
|
|
monitor.SentMessage()
|
|
err := invoker(ctx, method, req, reply, cc, opts...)
|
|
if err != nil {
|
|
monitor.ReceivedMessage()
|
|
}
|
|
monitor.Handled(grpc.Code(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
// StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
|
func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
|
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
|
monitor := newClientReporter(m, clientStreamType(desc), method)
|
|
clientStream, err := streamer(ctx, desc, cc, method, opts...)
|
|
if err != nil {
|
|
monitor.Handled(grpc.Code(err))
|
|
return nil, err
|
|
}
|
|
return &monitoredClientStream{clientStream, monitor}, nil
|
|
}
|
|
}
|
|
|
|
func clientStreamType(desc *grpc.StreamDesc) grpcType {
|
|
if desc.ClientStreams && !desc.ServerStreams {
|
|
return ClientStream
|
|
} else if !desc.ClientStreams && desc.ServerStreams {
|
|
return ServerStream
|
|
}
|
|
return BidiStream
|
|
}
|
|
|
|
// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
|
|
type monitoredClientStream struct {
|
|
grpc.ClientStream
|
|
monitor *clientReporter
|
|
}
|
|
|
|
func (s *monitoredClientStream) SendMsg(m interface{}) error {
|
|
err := s.ClientStream.SendMsg(m)
|
|
if err == nil {
|
|
s.monitor.SentMessage()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *monitoredClientStream) RecvMsg(m interface{}) error {
|
|
err := s.ClientStream.RecvMsg(m)
|
|
if err == nil {
|
|
s.monitor.ReceivedMessage()
|
|
} else if err == io.EOF {
|
|
s.monitor.Handled(codes.OK)
|
|
} else {
|
|
s.monitor.Handled(grpc.Code(err))
|
|
}
|
|
return err
|
|
}
|