From 0f33c8a5446c153312d3f9a15845eea2d3fcf98d Mon Sep 17 00:00:00 2001 From: rasmus Date: Fri, 16 Dec 2022 14:21:11 +0200 Subject: [PATCH] use util/retry instead of wait fork directly --- go.mod | 6 +- go.sum | 12 ++-- pkg/file/file.go | 14 ++--- vendor/github.com/jtagcat/util/retry/retry.go | 15 ++++- vendor/github.com/jtagcat/util/std/error.go | 21 +++++++ vendor/github.com/jtagcat/util/std/exec.go | 32 ++++++++++ vendor/github.com/jtagcat/util/std/url.go | 60 +++++++++++++++++++ .../k8s.io/apimachinery/pkg/util/wait/wait.go | 36 +---------- vendor/modules.txt | 5 +- 9 files changed, 144 insertions(+), 57 deletions(-) create mode 100644 vendor/github.com/jtagcat/util/std/error.go create mode 100644 vendor/github.com/jtagcat/util/std/exec.go create mode 100644 vendor/github.com/jtagcat/util/std/url.go diff --git a/go.mod b/go.mod index 8e027ad..5df5a29 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/fsnotify/fsnotify v1.6.0 - github.com/jtagcat/util v0.0.0-20221112215320-924d264211be + github.com/jtagcat/util v0.0.0-20221216121757-90da8d3bc885 github.com/prometheus/client_golang v1.14.0 github.com/urfave/cli/v2 v2.23.5 go.mongodb.org/mongo-driver v1.11.0 @@ -40,7 +40,3 @@ require ( k8s.io/klog/v2 v2.80.1 // indirect k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 // indirect ) - -// https://github.com/kubernetes/kubernetes/pull/113398 -// go get github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery@ManagedExponentialBackoff -replace k8s.io/apimachinery => github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff diff --git a/go.sum b/go.sum index 9d0f516..b326100 100644 --- a/go.sum +++ b/go.sum @@ -121,7 +121,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -145,10 +145,10 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff h1:ZcCL47dbIlY58XGBk10Onnig0Ce+w0kWxJhaEDHJfmY= -github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff/go.mod h1:C5R3NoUmJXuT6/sTJpOktLUfvCl+H4/7c2QHOp6qwCo= -github.com/jtagcat/util v0.0.0-20221112215320-924d264211be h1:WZnF7Cfq7hQ4v/9VXR7UTgIONhmbAJ5RU0jjM3dLw60= -github.com/jtagcat/util v0.0.0-20221112215320-924d264211be/go.mod h1:rYF5HxFwMvJBOgdcHyJC0VAJ2RonK1Y03omgM33k1nE= +github.com/jtagcat/util v0.0.0-20221216121415-66bc180475ec h1:FSkPhnaMOnfv317DRa+Kpkeomkmy8UcdB+0u/qhS6AI= +github.com/jtagcat/util v0.0.0-20221216121415-66bc180475ec/go.mod h1:gqFwvwa7Sw5NiJVFETjpEuEG1icAjvOCaj6PF7LzyHU= +github.com/jtagcat/util v0.0.0-20221216121757-90da8d3bc885 h1:bFeLiYDpOvog4gBGERRF6Duv5zuH2FkgigqgbBvVXHQ= +github.com/jtagcat/util v0.0.0-20221216121757-90da8d3bc885/go.mod h1:gqFwvwa7Sw5NiJVFETjpEuEG1icAjvOCaj6PF7LzyHU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -535,6 +535,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +k8s.io/apimachinery v0.25.4 h1:CtXsuaitMESSu339tfhVXhQrPET+EiWnIY1rcurKnAc= +k8s.io/apimachinery v0.25.4/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo= k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 h1:GfD9OzL11kvZN5iArC6oTS7RTj7oJOIfnislxYlqTj8= diff --git a/pkg/file/file.go b/pkg/file/file.go index 7e8cfc0..e8c97e6 100644 --- a/pkg/file/file.go +++ b/pkg/file/file.go @@ -12,6 +12,7 @@ import ( "git.k-space.ee/k-space/logmower-shipper/pkg/lines" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" "git.k-space.ee/k-space/logmower-shipper/pkg/sender" + "github.com/jtagcat/util/retry" "github.com/jtagcat/util/std" "github.com/jtagcat/util/tail" "go.mongodb.org/mongo-driver/bson" @@ -39,17 +40,14 @@ type File struct { // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly func (f File) Process(ctx context.Context, db *mongo.Collection) { - _ = wait.ManagedExponentialBackoffWithContext(ctx, backoff(), func() (done bool, _ error) { + _ = retry.OnErrorManagedBackoff(ctx, backoff(), func() (retryable bool, _ error) { err := f.process(ctx, db) - if err == nil { - return true, nil + if err != nil { + promFileErr.WithLabelValues(f.MetricsName).Add(1) + log.Printf("processing file %q: %e", f.MetricsName, err) } - promFileErr.WithLabelValues(f.MetricsName).Add(1) - log.Printf("processing file %q: %e", f.MetricsName, err) - - // nil: loop and keep retrying indefinitely - return false, nil + return true, err }) } diff --git a/vendor/github.com/jtagcat/util/retry/retry.go b/vendor/github.com/jtagcat/util/retry/retry.go index e0d24a7..e0b18f6 100644 --- a/vendor/github.com/jtagcat/util/retry/retry.go +++ b/vendor/github.com/jtagcat/util/retry/retry.go @@ -1,6 +1,8 @@ package retry import ( + "context" + "k8s.io/apimachinery/pkg/util/wait" ) @@ -11,7 +13,18 @@ func OnError(backoff wait.Backoff, fn func() (retryable bool, err error)) error return wait.ExponentialBackoff(backoff, func() (done bool, _ error) { retryable, err := fn() if err == nil || !retryable { - return true, err + return true, nil + } + return false, nil + }) +} + +// error is only returned by context +func OnErrorManagedBackoff(ctx context.Context, backoff wait.Backoff, fn func() (retryable bool, err error)) error { + return wait.ManagedExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) { + retryable, err := fn() + if err == nil || !retryable { + return true, nil } return false, nil }) diff --git a/vendor/github.com/jtagcat/util/std/error.go b/vendor/github.com/jtagcat/util/std/error.go new file mode 100644 index 0000000..02e263d --- /dev/null +++ b/vendor/github.com/jtagcat/util/std/error.go @@ -0,0 +1,21 @@ +package std + +import "errors" + +// for errors.Is(err, ERr) +type GenericErr struct { + Err error + Wrapped error +} + +func (a GenericErr) Is(target error) bool { + return errors.Is(a.Err, target) +} + +func (a GenericErr) Unwrap() error { + return a.Wrapped +} + +func (a GenericErr) Error() string { + return a.Err.Error() + ": " + a.Wrapped.Error() +} diff --git a/vendor/github.com/jtagcat/util/std/exec.go b/vendor/github.com/jtagcat/util/std/exec.go new file mode 100644 index 0000000..6528081 --- /dev/null +++ b/vendor/github.com/jtagcat/util/std/exec.go @@ -0,0 +1,32 @@ +package std + +import ( + "context" + "fmt" + "os/exec" +) + +func RunCmdWithCtx(ctx context.Context, cmd *exec.Cmd) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := cmd.Start(); err != nil { + return fmt.Errorf("starting command: %w", err) + } + + wait := make(chan error) + go func() { + wait <- cmd.Wait() + close(wait) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-wait: + return err + } +} diff --git a/vendor/github.com/jtagcat/util/std/url.go b/vendor/github.com/jtagcat/util/std/url.go new file mode 100644 index 0000000..6c1d4c4 --- /dev/null +++ b/vendor/github.com/jtagcat/util/std/url.go @@ -0,0 +1,60 @@ +package std + +import ( + "net/url" + "path" + "strings" +) + +// k8s.io/helm/pkg/urlutil +// mod: supports // and / +// in case of multiple absolute paths, last is used +func URLJoin(baseURL string, paths ...string) (string, error) { + // mod: + // base is replaced by first with // + newBase := -1 + for i, p := range paths { + if strings.HasPrefix(p, "//") { + newBase = i + } + } + + u, err := url.Parse(baseURL) + if err != nil { + return "", err + } + if newBase > -1 { + old := u + u, err = url.Parse(paths[newBase]) + if err != nil { + return "", err + } + + u.Scheme = old.Scheme + if u.User == nil { + u.User = old.User + } + + paths = paths[newBase+1:] + } + + // mod: + // allow rooting to domain with / + absPath := -1 + for i, p := range paths { + if strings.HasPrefix(p, "/") { + absPath = i + } + } + + // We want path instead of filepath because path always uses /. + if absPath > -1 { + u.Path = path.Join(paths[absPath:]...) + } else { + all := []string{u.Path} + all = append(all, paths...) + u.Path = path.Join(all...) + } + + return u.String(), nil +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go b/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go index 4471adb..137627b 100644 --- a/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go +++ b/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -276,7 +276,7 @@ func (b *Backoff) Step() time.Duration { duration := b.Duration // calculate the next step - if b.Factor != 0 && b.Steps != 0 { + if b.Factor != 0 { b.Duration = time.Duration(float64(b.Duration) * b.Factor) if b.Cap > 0 && b.Duration > b.Cap { b.Duration = b.Cap @@ -431,17 +431,6 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { return ErrWaitTimeout } -// ManagedExponentialBackoff, unlike ExponentialBackoff does not return ErrWaitTimeout. -// Instead the loop continues indefinitely with Sleep being maxDuration (by Steps or Cap) + Jitter -func ManagedExponentialBackoff(backoff Backoff, condition ConditionFunc) error { - for { - if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { - return err - } - time.Sleep(backoff.Step()) - } -} - // Poll tries a condition func until it returns true, an error, or the timeout // is reached. // @@ -766,26 +755,3 @@ func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, conditi return ErrWaitTimeout } - -// ManagedExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never -// exceeds the deadline specified by the request context. -func ManagedExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionFunc) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { - return err - } - - waitBeforeRetry := backoff.Step() - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(waitBeforeRetry): - } - } -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 9e339b1..14ce3b8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -20,7 +20,7 @@ github.com/golang/protobuf/ptypes/timestamp # github.com/golang/snappy v0.0.4 ## explicit github.com/golang/snappy -# github.com/jtagcat/util v0.0.0-20221112215320-924d264211be +# github.com/jtagcat/util v0.0.0-20221216121757-90da8d3bc885 ## explicit; go 1.18 github.com/jtagcat/util/batch github.com/jtagcat/util/retry @@ -170,7 +170,7 @@ google.golang.org/protobuf/runtime/protoiface google.golang.org/protobuf/runtime/protoimpl google.golang.org/protobuf/types/descriptorpb google.golang.org/protobuf/types/known/timestamppb -# k8s.io/apimachinery v0.25.4 => github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff +# k8s.io/apimachinery v0.25.4 ## explicit; go 1.19 k8s.io/apimachinery/pkg/util/runtime k8s.io/apimachinery/pkg/util/wait @@ -185,4 +185,3 @@ k8s.io/klog/v2/internal/severity # k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 ## explicit; go 1.18 k8s.io/utils/clock -# k8s.io/apimachinery => github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff