From e73c30689ccb6cc3017fcae636fe18564d6068e6 Mon Sep 17 00:00:00 2001 From: rasmus Date: Sun, 13 Nov 2022 00:19:19 +0200 Subject: [PATCH] update deps --- go.mod | 2 +- go.sum | 4 +- pkg/file/file.go | 7 +- pkg/sender/sender.go | 7 +- vendor/github.com/jtagcat/util/README.md | 3 - .../jtagcat/util/{ => batch}/batch.go | 2 +- .../jtagcat/util/{ => retry}/retry.go | 4 +- .../github.com/jtagcat/util/{ => std}/go.go | 2 +- .../jtagcat/util/{ => std}/strings.go | 2 +- vendor/github.com/jtagcat/util/tail/shared.go | 143 +++++++++++++++++ .../jtagcat/util/{ => tail}/tail.go | 148 +----------------- .../jtagcat/util/{ => tail}/tail_single.go | 10 +- vendor/modules.txt | 7 +- 13 files changed, 175 insertions(+), 166 deletions(-) delete mode 100644 vendor/github.com/jtagcat/util/README.md rename vendor/github.com/jtagcat/util/{ => batch}/batch.go (98%) rename vendor/github.com/jtagcat/util/{ => retry}/retry.go (77%) rename vendor/github.com/jtagcat/util/{ => std}/go.go (95%) rename vendor/github.com/jtagcat/util/{ => std}/strings.go (95%) create mode 100644 vendor/github.com/jtagcat/util/tail/shared.go rename vendor/github.com/jtagcat/util/{ => tail}/tail.go (51%) rename vendor/github.com/jtagcat/util/{ => tail}/tail_single.go (83%) diff --git a/go.mod b/go.mod index 8a3114b..8e027ad 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/fsnotify/fsnotify v1.6.0 - github.com/jtagcat/util v0.0.0-20221109214318-07460aca28b1 + github.com/jtagcat/util v0.0.0-20221112215320-924d264211be github.com/prometheus/client_golang v1.14.0 github.com/urfave/cli/v2 v2.23.5 go.mongodb.org/mongo-driver v1.11.0 diff --git a/go.sum b/go.sum index ace9ccc..9d0f516 100644 --- a/go.sum +++ b/go.sum @@ -147,8 +147,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff h1:ZcCL47dbIlY58XGBk10Onnig0Ce+w0kWxJhaEDHJfmY= github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff/go.mod h1:C5R3NoUmJXuT6/sTJpOktLUfvCl+H4/7c2QHOp6qwCo= -github.com/jtagcat/util v0.0.0-20221109214318-07460aca28b1 h1:hHHc1uSm9meea1HkxE0+FjUsfJp02LerfLmIFRMKu6c= -github.com/jtagcat/util v0.0.0-20221109214318-07460aca28b1/go.mod h1:rYF5HxFwMvJBOgdcHyJC0VAJ2RonK1Y03omgM33k1nE= +github.com/jtagcat/util v0.0.0-20221112215320-924d264211be h1:WZnF7Cfq7hQ4v/9VXR7UTgIONhmbAJ5RU0jjM3dLw60= +github.com/jtagcat/util v0.0.0-20221112215320-924d264211be/go.mod h1:rYF5HxFwMvJBOgdcHyJC0VAJ2RonK1Y03omgM33k1nE= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= diff --git a/pkg/file/file.go b/pkg/file/file.go index fdaac0a..7e8cfc0 100644 --- a/pkg/file/file.go +++ b/pkg/file/file.go @@ -12,7 +12,8 @@ import ( "git.k-space.ee/k-space/logmower-shipper/pkg/lines" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" "git.k-space.ee/k-space/logmower-shipper/pkg/sender" - "github.com/jtagcat/util" + "github.com/jtagcat/util/std" + "github.com/jtagcat/util/tail" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" mongoOpt "go.mongodb.org/mongo-driver/mongo/options" @@ -64,7 +65,7 @@ func (f File) launchChannels(cancel func(), db *mongo.Collection) (_ chan<- line dbQueue := make(chan m.Record, SendQueueLimit) go lines.RawC(lineOut).Process(sctx, dbQueue) - waitBatchSend := util.GoWg(func() { + waitBatchSend := std.GoWg(func() { sender.Queue(dbQueue).Sender(db, f.MetricsName, cancelAll) }) @@ -102,7 +103,7 @@ func (f File) process(ctx context.Context, db *mongo.Collection) error { } startSize := fi.Size() - lineIn, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) + lineIn, errChan, err := tail.New(sctx, f.Path, dbOffset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) } diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index 91c0023..ec8cf51 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -7,7 +7,8 @@ import ( "time" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" - "github.com/jtagcat/util" + "github.com/jtagcat/util/batch" + "github.com/jtagcat/util/retry" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "k8s.io/apimachinery/pkg/util/wait" @@ -56,7 +57,7 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn } }() - util.Batch(MaxBatchItems, MaxBatchTime, queue, batched) + batch.Batch(MaxBatchItems, MaxBatchTime, queue, batched) // returns when sendQueue is closed }() @@ -71,7 +72,7 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn promShipperSynced.WithLabelValues(metricsFilename).Set(0) - err := util.RetryOnError(backoff(), func() (_ bool, _ error) { + err := retry.OnError(backoff(), func() (_ bool, _ error) { result, err := insertManyWithSimulate(db, batch) var succeedCount int diff --git a/vendor/github.com/jtagcat/util/README.md b/vendor/github.com/jtagcat/util/README.md deleted file mode 100644 index 9f266b2..0000000 --- a/vendor/github.com/jtagcat/util/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# util - -Functions and primitives I use. Copy code instead of depending on this library. diff --git a/vendor/github.com/jtagcat/util/batch.go b/vendor/github.com/jtagcat/util/batch/batch.go similarity index 98% rename from vendor/github.com/jtagcat/util/batch.go rename to vendor/github.com/jtagcat/util/batch/batch.go index 6b2f34e..01d7335 100644 --- a/vendor/github.com/jtagcat/util/batch.go +++ b/vendor/github.com/jtagcat/util/batch/batch.go @@ -1,4 +1,4 @@ -package util +package batch import ( "time" diff --git a/vendor/github.com/jtagcat/util/retry.go b/vendor/github.com/jtagcat/util/retry/retry.go similarity index 77% rename from vendor/github.com/jtagcat/util/retry.go rename to vendor/github.com/jtagcat/util/retry/retry.go index 18aa3bd..e0d24a7 100644 --- a/vendor/github.com/jtagcat/util/retry.go +++ b/vendor/github.com/jtagcat/util/retry/retry.go @@ -1,4 +1,4 @@ -package util +package retry import ( "k8s.io/apimachinery/pkg/util/wait" @@ -7,7 +7,7 @@ import ( // Do copy this code instead of depending on this library // // similar to "k8s.io/apimachinery/pkg/util/retry" -func RetryOnError(backoff wait.Backoff, fn func() (retryable bool, err error)) error { +func OnError(backoff wait.Backoff, fn func() (retryable bool, err error)) error { return wait.ExponentialBackoff(backoff, func() (done bool, _ error) { retryable, err := fn() if err == nil || !retryable { diff --git a/vendor/github.com/jtagcat/util/go.go b/vendor/github.com/jtagcat/util/std/go.go similarity index 95% rename from vendor/github.com/jtagcat/util/go.go rename to vendor/github.com/jtagcat/util/std/go.go index 35704ad..c007c7c 100644 --- a/vendor/github.com/jtagcat/util/go.go +++ b/vendor/github.com/jtagcat/util/std/go.go @@ -1,4 +1,4 @@ -package util +package std import "sync" diff --git a/vendor/github.com/jtagcat/util/strings.go b/vendor/github.com/jtagcat/util/std/strings.go similarity index 95% rename from vendor/github.com/jtagcat/util/strings.go rename to vendor/github.com/jtagcat/util/std/strings.go index 6310a1d..40889e7 100644 --- a/vendor/github.com/jtagcat/util/strings.go +++ b/vendor/github.com/jtagcat/util/std/strings.go @@ -1,4 +1,4 @@ -package util +package std import "strings" diff --git a/vendor/github.com/jtagcat/util/tail/shared.go b/vendor/github.com/jtagcat/util/tail/shared.go new file mode 100644 index 0000000..279c555 --- /dev/null +++ b/vendor/github.com/jtagcat/util/tail/shared.go @@ -0,0 +1,143 @@ +package tail + +import ( + "bufio" + "context" + "errors" + "io" + "os" + "sync" +) + +type Line struct { + Filename *string + Bytes []byte + EndOffset int64 // io.SeekStart + ReachedEOF bool +} + +type Tailable struct { + Name string + // os.Seek() on first open: + Offset int64 + Whence int + + // loop-persisting + existed bool + + // copied use + wakeup chan struct{} +} + +type orderedLines struct { + c chan<- *Line + sync.Mutex // guarantee ordered lines across multiple files of same name +} + +// Handles tailing a file within its lifespan +func fileHandle(ctx context.Context, file Tailable, useOffset bool, lineChan *orderedLines, errChan chan<- error) { + f, err := os.Open(file.Name) + if err != nil && !errors.Is(err, os.ErrNotExist) { // ignore ErrNotExist, as it may have been race deleted + errChan <- err + return + } + defer f.Close() + + var offset int64 + if useOffset { + offset, err = f.Seek(file.Offset, file.Whence) + if err != nil { + errChan <- err + return + } + } + + first, breakNext, b := true, false, bufio.NewReader(f) + for { + + select { + case <-ctx.Done(): + errChan <- ctx.Err() + return + default: + } + + if first { + first = false + } else { + offset, err = detectTruncation(f, offset) + if err != nil { + errChan <- err + return + } + } + + offset, err = readToEOF(b, &file.Name, offset, lineChan.c) + if err != nil { + errChan <- err + return + } + + if breakNext { + return + } + select { + case <-ctx.Done(): + case _, ok := <-file.wakeup: + if !ok { + fs, err := f.Stat() + if err != nil && !errors.Is(err, os.ErrNotExist) { + errChan <- ctx.Err() + return + } + + if err != nil || fs.Size() == offset { + return + } + + breakNext = true + } + } + } +} + +// offset is io.SeekStart +func detectTruncation(f *os.File, offset int64) (int64, error) { + fs, err := f.Stat() + if err != nil && !errors.Is(err, os.ErrNotExist) { // ignore ErrNotExist, as it may have been race deleted + return 0, err + } + + if fs.Size() < offset { + // file has been truncated + return f.Seek(0, io.SeekStart) + } + + return offset, nil +} + +func readToEOF(buf *bufio.Reader, name *string, offset int64, c chan<- *Line) (int64, error) { + for { + b, err := buf.ReadBytes('\n') + offset += int64(len(b)) + + if err != nil && !errors.Is(err, io.EOF) { + return offset, err + } + + if err == nil { + b = b[:len(b)-1] // remove \n + } + + c <- &Line{ + Filename: name, + Bytes: b, + EndOffset: offset, + ReachedEOF: err != nil, + } + + if err != nil { // EOF + return offset, nil + } + } +} diff --git a/vendor/github.com/jtagcat/util/tail.go b/vendor/github.com/jtagcat/util/tail/tail.go similarity index 51% rename from vendor/github.com/jtagcat/util/tail.go rename to vendor/github.com/jtagcat/util/tail/tail.go index 21ac53d..0d462d7 100644 --- a/vendor/github.com/jtagcat/util/tail.go +++ b/vendor/github.com/jtagcat/util/tail/tail.go @@ -1,49 +1,26 @@ -package util +package tail import ( - "bufio" "context" "errors" "fmt" - "io" "os" "path/filepath" - "sync" "github.com/fsnotify/fsnotify" ) -type Line struct { - Filename *string - Bytes []byte - EndOffset int64 // io.SeekStart - ReachedEOF bool -} - // File starts tailing file from offset and whence (os.Seek()). // It follows target file for appends, truncations, and replacements. // Errors abort connected operations. -type Tailable struct { - Name string - // os.Seek() on first open: - Offset int64 - Whence int - - // loop-persisting - existed bool - - // copied use - wakeup chan struct{} -} - var ErrScatteredFiles = errors.New("all Tailable files must be in the same directory") // Unstable, beta // // All files must be in the same directory. // Channels will be closed after file is deleted //TODO: -func TailFiles(ctx context.Context, files []Tailable) (<-chan *Line, <-chan error, error) { +func Files(ctx context.Context, files []Tailable) (<-chan *Line, <-chan error, error) { if len(files) == 0 { return nil, nil, nil } @@ -84,13 +61,13 @@ func TailFiles(ctx context.Context, files []Tailable) (<-chan *Line, <-chan erro lineChan, errChan := make(chan *Line), make(chan error) - go tailFiles(ctx, w, &files, lineChan, errChan) + go multipleFiles(ctx, w, &files, lineChan, errChan) return lineChan, errChan, err } // Consumes Watcher -func tailFiles(ctx context.Context, +func multipleFiles(ctx context.Context, w *fsnotify.Watcher, files *[]Tailable, lineChan chan<- *Line, errChan chan<- error, ) { @@ -102,7 +79,7 @@ func tailFiles(ctx context.Context, type mapWrap struct { *Tailable seen bool - lineChan orderedLineChan + lineChan orderedLines } names := make(map[string]*mapWrap) @@ -111,7 +88,7 @@ func tailFiles(ctx context.Context, c := make(chan *Line) names[filepath.Base(file.Name)] = &mapWrap{ Tailable: &file, - lineChan: orderedLineChan{c: c}, + lineChan: orderedLines{c: c}, } go func() { // relay files @@ -186,116 +163,3 @@ func tailFiles(ctx context.Context, } } } - -type orderedLineChan struct { - c chan<- *Line - sync.Mutex // guarantee ordered lines across multiple files of same name -} - -// Handles tailing a file within its lifespan -func fileHandle(ctx context.Context, file Tailable, useOffset bool, lineChan *orderedLineChan, errChan chan<- error) { - f, err := os.Open(file.Name) - if err != nil && !errors.Is(err, os.ErrNotExist) { // ignore ErrNotExist, as it may have been race deleted - errChan <- err - return - } - defer f.Close() - - var offset int64 - if useOffset { - offset, err = f.Seek(file.Offset, file.Whence) - if err != nil { - errChan <- err - return - } - } - - first, breakNext, b := true, false, bufio.NewReader(f) - for { - - select { - case <-ctx.Done(): - errChan <- ctx.Err() - return - default: - } - - if first { - first = false - } else { - offset, err = detectTruncation(f, offset) - if err != nil { - errChan <- err - return - } - } - - offset, err = readToEOF(b, &file.Name, offset, lineChan.c) - if err != nil { - errChan <- err - return - } - - if breakNext { - return - } - select { - case <-ctx.Done(): - case _, ok := <-file.wakeup: - if !ok { - fs, err := f.Stat() - if err != nil && !errors.Is(err, os.ErrNotExist) { - errChan <- ctx.Err() - return - } - - if err != nil || fs.Size() == offset { - return - } - - breakNext = true - } - } - } -} - -// offset is io.SeekStart -func detectTruncation(f *os.File, offset int64) (int64, error) { - fs, err := f.Stat() - if err != nil && !errors.Is(err, os.ErrNotExist) { // ignore ErrNotExist, as it may have been race deleted - return 0, err - } - - if fs.Size() < offset { - // file has been truncated - return f.Seek(0, io.SeekStart) - } - - return offset, nil -} - -func readToEOF(buf *bufio.Reader, name *string, offset int64, c chan<- *Line) (int64, error) { - for { - b, err := buf.ReadBytes('\n') - offset += int64(len(b)) - - if err != nil && !errors.Is(err, io.EOF) { - return offset, err - } - - if err == nil { - b = b[:len(b)-1] // remove \n - } - - c <- &Line{ - Filename: name, - Bytes: b, - EndOffset: offset, - ReachedEOF: err != nil, - } - - if err != nil { // EOF - return offset, nil - } - } -} diff --git a/vendor/github.com/jtagcat/util/tail_single.go b/vendor/github.com/jtagcat/util/tail/tail_single.go similarity index 83% rename from vendor/github.com/jtagcat/util/tail_single.go rename to vendor/github.com/jtagcat/util/tail/tail_single.go index da39677..9d269d4 100644 --- a/vendor/github.com/jtagcat/util/tail_single.go +++ b/vendor/github.com/jtagcat/util/tail/tail_single.go @@ -1,4 +1,4 @@ -package util +package tail import ( "context" @@ -8,7 +8,7 @@ import ( ) // Unstable, beta -func TailFile(ctx context.Context, name string, offset int64, whence int) (<-chan *Line, <-chan error, error) { +func New(ctx context.Context, name string, offset int64, whence int) (<-chan *Line, <-chan error, error) { w, err := fsnotify.NewWatcher() if err != nil { return nil, nil, err @@ -20,7 +20,7 @@ func TailFile(ctx context.Context, name string, offset int64, whence int) (<-cha lineChan, errChan := make(chan *Line), make(chan error) - go tailSingleFile(ctx, w, &Tailable{ + go singleFile(ctx, w, &Tailable{ Name: name, Offset: offset, Whence: whence, }, lineChan, errChan) @@ -28,7 +28,7 @@ func TailFile(ctx context.Context, name string, offset int64, whence int) (<-cha } // assumes file exists -func tailSingleFile(ctx context.Context, +func singleFile(ctx context.Context, w *fsnotify.Watcher, file *Tailable, lineChan chan<- *Line, errChan chan<- error, ) { @@ -40,7 +40,7 @@ func tailSingleFile(ctx context.Context, wg.Add(1) go func() { // no need to lock/unlock orderedLineChan, as we only have one same-named file across its life - fileHandle(sctx, *file, true, &orderedLineChan{c: lineChan}, errChan) + fileHandle(sctx, *file, true, &orderedLines{c: lineChan}, errChan) wg.Done() }() diff --git a/vendor/modules.txt b/vendor/modules.txt index 80eda09..9e339b1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -20,9 +20,12 @@ github.com/golang/protobuf/ptypes/timestamp # github.com/golang/snappy v0.0.4 ## explicit github.com/golang/snappy -# github.com/jtagcat/util v0.0.0-20221109214318-07460aca28b1 +# github.com/jtagcat/util v0.0.0-20221112215320-924d264211be ## explicit; go 1.18 -github.com/jtagcat/util +github.com/jtagcat/util/batch +github.com/jtagcat/util/retry +github.com/jtagcat/util/std +github.com/jtagcat/util/tail # github.com/klauspost/compress v1.15.12 ## explicit; go 1.17 github.com/klauspost/compress