use util/retry instead of wait fork directly
This commit is contained in:
		
							
								
								
									
										6
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								go.mod
									
									
									
									
									
								
							| @@ -4,7 +4,7 @@ go 1.19 | ||||
|  | ||||
| require ( | ||||
| 	github.com/fsnotify/fsnotify v1.6.0 | ||||
| 	github.com/jtagcat/util v0.0.0-20221112215320-924d264211be | ||||
| 	github.com/jtagcat/util v0.0.0-20221216121757-90da8d3bc885 | ||||
| 	github.com/prometheus/client_golang v1.14.0 | ||||
| 	github.com/urfave/cli/v2 v2.23.5 | ||||
| 	go.mongodb.org/mongo-driver v1.11.0 | ||||
| @@ -40,7 +40,3 @@ require ( | ||||
| 	k8s.io/klog/v2 v2.80.1 // indirect | ||||
| 	k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 // indirect | ||||
| ) | ||||
|  | ||||
| // https://github.com/kubernetes/kubernetes/pull/113398 | ||||
| // go get github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery@ManagedExponentialBackoff | ||||
| replace k8s.io/apimachinery => github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff | ||||
|   | ||||
							
								
								
									
										12
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								go.sum
									
									
									
									
									
								
							| @@ -121,7 +121,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ | ||||
| github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | ||||
| github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | ||||
| github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | ||||
| github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= | ||||
| github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= | ||||
| github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= | ||||
| github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= | ||||
| github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= | ||||
| @@ -145,10 +145,10 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ | ||||
| github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= | ||||
| github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= | ||||
| github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= | ||||
| github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff h1:ZcCL47dbIlY58XGBk10Onnig0Ce+w0kWxJhaEDHJfmY= | ||||
| github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff/go.mod h1:C5R3NoUmJXuT6/sTJpOktLUfvCl+H4/7c2QHOp6qwCo= | ||||
| github.com/jtagcat/util v0.0.0-20221112215320-924d264211be h1:WZnF7Cfq7hQ4v/9VXR7UTgIONhmbAJ5RU0jjM3dLw60= | ||||
| github.com/jtagcat/util v0.0.0-20221112215320-924d264211be/go.mod h1:rYF5HxFwMvJBOgdcHyJC0VAJ2RonK1Y03omgM33k1nE= | ||||
| github.com/jtagcat/util v0.0.0-20221216121415-66bc180475ec h1:FSkPhnaMOnfv317DRa+Kpkeomkmy8UcdB+0u/qhS6AI= | ||||
| github.com/jtagcat/util v0.0.0-20221216121415-66bc180475ec/go.mod h1:gqFwvwa7Sw5NiJVFETjpEuEG1icAjvOCaj6PF7LzyHU= | ||||
| github.com/jtagcat/util v0.0.0-20221216121757-90da8d3bc885 h1:bFeLiYDpOvog4gBGERRF6Duv5zuH2FkgigqgbBvVXHQ= | ||||
| github.com/jtagcat/util v0.0.0-20221216121757-90da8d3bc885/go.mod h1:gqFwvwa7Sw5NiJVFETjpEuEG1icAjvOCaj6PF7LzyHU= | ||||
| github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= | ||||
| github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= | ||||
| github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= | ||||
| @@ -535,6 +535,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh | ||||
| honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= | ||||
| honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= | ||||
| honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= | ||||
| k8s.io/apimachinery v0.25.4 h1:CtXsuaitMESSu339tfhVXhQrPET+EiWnIY1rcurKnAc= | ||||
| k8s.io/apimachinery v0.25.4/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo= | ||||
| k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= | ||||
| k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= | ||||
| k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 h1:GfD9OzL11kvZN5iArC6oTS7RTj7oJOIfnislxYlqTj8= | ||||
|   | ||||
| @@ -12,6 +12,7 @@ import ( | ||||
| 	"git.k-space.ee/k-space/logmower-shipper/pkg/lines" | ||||
| 	m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" | ||||
| 	"git.k-space.ee/k-space/logmower-shipper/pkg/sender" | ||||
| 	"github.com/jtagcat/util/retry" | ||||
| 	"github.com/jtagcat/util/std" | ||||
| 	"github.com/jtagcat/util/tail" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| @@ -39,17 +40,14 @@ type File struct { | ||||
|  | ||||
| // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly | ||||
| func (f File) Process(ctx context.Context, db *mongo.Collection) { | ||||
| 	_ = wait.ManagedExponentialBackoffWithContext(ctx, backoff(), func() (done bool, _ error) { | ||||
| 	_ = retry.OnErrorManagedBackoff(ctx, backoff(), func() (retryable bool, _ error) { | ||||
| 		err := f.process(ctx, db) | ||||
| 		if err == nil { | ||||
| 			return true, nil | ||||
| 		if err != nil { | ||||
| 			promFileErr.WithLabelValues(f.MetricsName).Add(1) | ||||
| 			log.Printf("processing file %q: %e", f.MetricsName, err) | ||||
| 		} | ||||
|  | ||||
| 		promFileErr.WithLabelValues(f.MetricsName).Add(1) | ||||
| 		log.Printf("processing file %q: %e", f.MetricsName, err) | ||||
|  | ||||
| 		// nil: loop and keep retrying indefinitely | ||||
| 		return false, nil | ||||
| 		return true, err | ||||
| 	}) | ||||
| } | ||||
|  | ||||
|   | ||||
							
								
								
									
										15
									
								
								vendor/github.com/jtagcat/util/retry/retry.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										15
									
								
								vendor/github.com/jtagcat/util/retry/retry.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -1,6 +1,8 @@ | ||||
| package retry | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| ) | ||||
|  | ||||
| @@ -11,7 +13,18 @@ func OnError(backoff wait.Backoff, fn func() (retryable bool, err error)) error | ||||
| 	return wait.ExponentialBackoff(backoff, func() (done bool, _ error) { | ||||
| 		retryable, err := fn() | ||||
| 		if err == nil || !retryable { | ||||
| 			return true, err | ||||
| 			return true, nil | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // error is only returned by context | ||||
| func OnErrorManagedBackoff(ctx context.Context, backoff wait.Backoff, fn func() (retryable bool, err error)) error { | ||||
| 	return wait.ManagedExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) { | ||||
| 		retryable, err := fn() | ||||
| 		if err == nil || !retryable { | ||||
| 			return true, nil | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	}) | ||||
|   | ||||
							
								
								
									
										21
									
								
								vendor/github.com/jtagcat/util/std/error.go
									
									
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								vendor/github.com/jtagcat/util/std/error.go
									
									
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,21 @@ | ||||
| package std | ||||
|  | ||||
| import "errors" | ||||
|  | ||||
| // for errors.Is(err, ERr) | ||||
| type GenericErr struct { | ||||
| 	Err     error | ||||
| 	Wrapped error | ||||
| } | ||||
|  | ||||
| func (a GenericErr) Is(target error) bool { | ||||
| 	return errors.Is(a.Err, target) | ||||
| } | ||||
|  | ||||
| func (a GenericErr) Unwrap() error { | ||||
| 	return a.Wrapped | ||||
| } | ||||
|  | ||||
| func (a GenericErr) Error() string { | ||||
| 	return a.Err.Error() + ": " + a.Wrapped.Error() | ||||
| } | ||||
							
								
								
									
										32
									
								
								vendor/github.com/jtagcat/util/std/exec.go
									
									
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										32
									
								
								vendor/github.com/jtagcat/util/std/exec.go
									
									
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,32 @@ | ||||
| package std | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"os/exec" | ||||
| ) | ||||
|  | ||||
| func RunCmdWithCtx(ctx context.Context, cmd *exec.Cmd) error { | ||||
| 	select { | ||||
| 	case <-ctx.Done(): | ||||
| 		return ctx.Err() | ||||
| 	default: | ||||
| 	} | ||||
|  | ||||
| 	if err := cmd.Start(); err != nil { | ||||
| 		return fmt.Errorf("starting command: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	wait := make(chan error) | ||||
| 	go func() { | ||||
| 		wait <- cmd.Wait() | ||||
| 		close(wait) | ||||
| 	}() | ||||
|  | ||||
| 	select { | ||||
| 	case <-ctx.Done(): | ||||
| 		return ctx.Err() | ||||
| 	case err := <-wait: | ||||
| 		return err | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										60
									
								
								vendor/github.com/jtagcat/util/std/url.go
									
									
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										60
									
								
								vendor/github.com/jtagcat/util/std/url.go
									
									
									
										generated
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,60 @@ | ||||
| package std | ||||
|  | ||||
| import ( | ||||
| 	"net/url" | ||||
| 	"path" | ||||
| 	"strings" | ||||
| ) | ||||
|  | ||||
| // k8s.io/helm/pkg/urlutil | ||||
| // mod: supports // and / | ||||
| // in case of multiple absolute paths, last is used | ||||
| func URLJoin(baseURL string, paths ...string) (string, error) { | ||||
| 	// mod: | ||||
| 	// base is replaced by first with // | ||||
| 	newBase := -1 | ||||
| 	for i, p := range paths { | ||||
| 		if strings.HasPrefix(p, "//") { | ||||
| 			newBase = i | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	u, err := url.Parse(baseURL) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 	if newBase > -1 { | ||||
| 		old := u | ||||
| 		u, err = url.Parse(paths[newBase]) | ||||
| 		if err != nil { | ||||
| 			return "", err | ||||
| 		} | ||||
|  | ||||
| 		u.Scheme = old.Scheme | ||||
| 		if u.User == nil { | ||||
| 			u.User = old.User | ||||
| 		} | ||||
|  | ||||
| 		paths = paths[newBase+1:] | ||||
| 	} | ||||
|  | ||||
| 	// mod: | ||||
| 	// allow rooting to domain with / | ||||
| 	absPath := -1 | ||||
| 	for i, p := range paths { | ||||
| 		if strings.HasPrefix(p, "/") { | ||||
| 			absPath = i | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// We want path instead of filepath because path always uses /. | ||||
| 	if absPath > -1 { | ||||
| 		u.Path = path.Join(paths[absPath:]...) | ||||
| 	} else { | ||||
| 		all := []string{u.Path} | ||||
| 		all = append(all, paths...) | ||||
| 		u.Path = path.Join(all...) | ||||
| 	} | ||||
|  | ||||
| 	return u.String(), nil | ||||
| } | ||||
							
								
								
									
										36
									
								
								vendor/k8s.io/apimachinery/pkg/util/wait/wait.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										36
									
								
								vendor/k8s.io/apimachinery/pkg/util/wait/wait.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -276,7 +276,7 @@ func (b *Backoff) Step() time.Duration { | ||||
| 	duration := b.Duration | ||||
|  | ||||
| 	// calculate the next step | ||||
| 	if b.Factor != 0 && b.Steps != 0 { | ||||
| 	if b.Factor != 0 { | ||||
| 		b.Duration = time.Duration(float64(b.Duration) * b.Factor) | ||||
| 		if b.Cap > 0 && b.Duration > b.Cap { | ||||
| 			b.Duration = b.Cap | ||||
| @@ -431,17 +431,6 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { | ||||
| 	return ErrWaitTimeout | ||||
| } | ||||
|  | ||||
| // ManagedExponentialBackoff, unlike ExponentialBackoff does not return ErrWaitTimeout. | ||||
| // Instead the loop continues indefinitely with Sleep being maxDuration (by Steps or Cap) + Jitter | ||||
| func ManagedExponentialBackoff(backoff Backoff, condition ConditionFunc) error { | ||||
| 	for { | ||||
| 		if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { | ||||
| 			return err | ||||
| 		} | ||||
| 		time.Sleep(backoff.Step()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Poll tries a condition func until it returns true, an error, or the timeout | ||||
| // is reached. | ||||
| // | ||||
| @@ -766,26 +755,3 @@ func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, conditi | ||||
|  | ||||
| 	return ErrWaitTimeout | ||||
| } | ||||
|  | ||||
| // ManagedExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never | ||||
| // exceeds the deadline specified by the request context. | ||||
| func ManagedExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionFunc) error { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return ctx.Err() | ||||
| 		default: | ||||
| 		} | ||||
|  | ||||
| 		if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		waitBeforeRetry := backoff.Step() | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return ctx.Err() | ||||
| 		case <-time.After(waitBeforeRetry): | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
							
								
								
									
										5
									
								
								vendor/modules.txt
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										5
									
								
								vendor/modules.txt
									
									
									
									
										vendored
									
									
								
							| @@ -20,7 +20,7 @@ github.com/golang/protobuf/ptypes/timestamp | ||||
| # github.com/golang/snappy v0.0.4 | ||||
| ## explicit | ||||
| github.com/golang/snappy | ||||
| # github.com/jtagcat/util v0.0.0-20221112215320-924d264211be | ||||
| # github.com/jtagcat/util v0.0.0-20221216121757-90da8d3bc885 | ||||
| ## explicit; go 1.18 | ||||
| github.com/jtagcat/util/batch | ||||
| github.com/jtagcat/util/retry | ||||
| @@ -170,7 +170,7 @@ google.golang.org/protobuf/runtime/protoiface | ||||
| google.golang.org/protobuf/runtime/protoimpl | ||||
| google.golang.org/protobuf/types/descriptorpb | ||||
| google.golang.org/protobuf/types/known/timestamppb | ||||
| # k8s.io/apimachinery v0.25.4 => github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff | ||||
| # k8s.io/apimachinery v0.25.4 | ||||
| ## explicit; go 1.19 | ||||
| k8s.io/apimachinery/pkg/util/runtime | ||||
| k8s.io/apimachinery/pkg/util/wait | ||||
| @@ -185,4 +185,3 @@ k8s.io/klog/v2/internal/severity | ||||
| # k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 | ||||
| ## explicit; go 1.18 | ||||
| k8s.io/utils/clock | ||||
| # k8s.io/apimachinery => github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff | ||||
|   | ||||
		Reference in New Issue
	
	Block a user