This commit is contained in:
parent
1c31b19fd2
commit
e73c30689c
2
go.mod
2
go.mod
@ -4,7 +4,7 @@ go 1.19
|
||||
|
||||
require (
|
||||
github.com/fsnotify/fsnotify v1.6.0
|
||||
github.com/jtagcat/util v0.0.0-20221109214318-07460aca28b1
|
||||
github.com/jtagcat/util v0.0.0-20221112215320-924d264211be
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
github.com/urfave/cli/v2 v2.23.5
|
||||
go.mongodb.org/mongo-driver v1.11.0
|
||||
|
4
go.sum
4
go.sum
@ -147,8 +147,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
|
||||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||
github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff h1:ZcCL47dbIlY58XGBk10Onnig0Ce+w0kWxJhaEDHJfmY=
|
||||
github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff/go.mod h1:C5R3NoUmJXuT6/sTJpOktLUfvCl+H4/7c2QHOp6qwCo=
|
||||
github.com/jtagcat/util v0.0.0-20221109214318-07460aca28b1 h1:hHHc1uSm9meea1HkxE0+FjUsfJp02LerfLmIFRMKu6c=
|
||||
github.com/jtagcat/util v0.0.0-20221109214318-07460aca28b1/go.mod h1:rYF5HxFwMvJBOgdcHyJC0VAJ2RonK1Y03omgM33k1nE=
|
||||
github.com/jtagcat/util v0.0.0-20221112215320-924d264211be h1:WZnF7Cfq7hQ4v/9VXR7UTgIONhmbAJ5RU0jjM3dLw60=
|
||||
github.com/jtagcat/util v0.0.0-20221112215320-924d264211be/go.mod h1:rYF5HxFwMvJBOgdcHyJC0VAJ2RonK1Y03omgM33k1nE=
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
|
@ -12,7 +12,8 @@ import (
|
||||
"git.k-space.ee/k-space/logmower-shipper/pkg/lines"
|
||||
m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
|
||||
"git.k-space.ee/k-space/logmower-shipper/pkg/sender"
|
||||
"github.com/jtagcat/util"
|
||||
"github.com/jtagcat/util/std"
|
||||
"github.com/jtagcat/util/tail"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
|
||||
@ -64,7 +65,7 @@ func (f File) launchChannels(cancel func(), db *mongo.Collection) (_ chan<- line
|
||||
dbQueue := make(chan m.Record, SendQueueLimit)
|
||||
go lines.RawC(lineOut).Process(sctx, dbQueue)
|
||||
|
||||
waitBatchSend := util.GoWg(func() {
|
||||
waitBatchSend := std.GoWg(func() {
|
||||
sender.Queue(dbQueue).Sender(db, f.MetricsName, cancelAll)
|
||||
})
|
||||
|
||||
@ -102,7 +103,7 @@ func (f File) process(ctx context.Context, db *mongo.Collection) error {
|
||||
}
|
||||
startSize := fi.Size()
|
||||
|
||||
lineIn, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart)
|
||||
lineIn, errChan, err := tail.New(sctx, f.Path, dbOffset, io.SeekStart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("tailing file: %w", err)
|
||||
}
|
||||
|
@ -7,7 +7,8 @@ import (
|
||||
"time"
|
||||
|
||||
m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
|
||||
"github.com/jtagcat/util"
|
||||
"github.com/jtagcat/util/batch"
|
||||
"github.com/jtagcat/util/retry"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@ -56,7 +57,7 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn
|
||||
}
|
||||
}()
|
||||
|
||||
util.Batch(MaxBatchItems, MaxBatchTime, queue, batched)
|
||||
batch.Batch(MaxBatchItems, MaxBatchTime, queue, batched)
|
||||
// returns when sendQueue is closed
|
||||
}()
|
||||
|
||||
@ -71,7 +72,7 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn
|
||||
|
||||
promShipperSynced.WithLabelValues(metricsFilename).Set(0)
|
||||
|
||||
err := util.RetryOnError(backoff(), func() (_ bool, _ error) {
|
||||
err := retry.OnError(backoff(), func() (_ bool, _ error) {
|
||||
result, err := insertManyWithSimulate(db, batch)
|
||||
|
||||
var succeedCount int
|
||||
|
3
vendor/github.com/jtagcat/util/README.md
generated
vendored
3
vendor/github.com/jtagcat/util/README.md
generated
vendored
@ -1,3 +0,0 @@
|
||||
# util
|
||||
|
||||
Functions and primitives I use. Copy code instead of depending on this library.
|
@ -1,4 +1,4 @@
|
||||
package util
|
||||
package batch
|
||||
|
||||
import (
|
||||
"time"
|
@ -1,4 +1,4 @@
|
||||
package util
|
||||
package retry
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@ -7,7 +7,7 @@ import (
|
||||
// Do copy this code instead of depending on this library
|
||||
//
|
||||
// similar to "k8s.io/apimachinery/pkg/util/retry"
|
||||
func RetryOnError(backoff wait.Backoff, fn func() (retryable bool, err error)) error {
|
||||
func OnError(backoff wait.Backoff, fn func() (retryable bool, err error)) error {
|
||||
return wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
|
||||
retryable, err := fn()
|
||||
if err == nil || !retryable {
|
2
vendor/github.com/jtagcat/util/go.go → vendor/github.com/jtagcat/util/std/go.go
generated
vendored
2
vendor/github.com/jtagcat/util/go.go → vendor/github.com/jtagcat/util/std/go.go
generated
vendored
@ -1,4 +1,4 @@
|
||||
package util
|
||||
package std
|
||||
|
||||
import "sync"
|
||||
|
@ -1,4 +1,4 @@
|
||||
package util
|
||||
package std
|
||||
|
||||
import "strings"
|
||||
|
143
vendor/github.com/jtagcat/util/tail/shared.go
generated
vendored
Normal file
143
vendor/github.com/jtagcat/util/tail/shared.go
generated
vendored
Normal file
@ -0,0 +1,143 @@
|
||||
package tail
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Line struct {
|
||||
Filename *string
|
||||
Bytes []byte
|
||||
EndOffset int64 // io.SeekStart
|
||||
ReachedEOF bool
|
||||
}
|
||||
|
||||
type Tailable struct {
|
||||
Name string
|
||||
// os.Seek() on first open:
|
||||
Offset int64
|
||||
Whence int
|
||||
|
||||
// loop-persisting
|
||||
existed bool
|
||||
|
||||
// copied use
|
||||
wakeup chan struct{}
|
||||
}
|
||||
|
||||
type orderedLines struct {
|
||||
c chan<- *Line
|
||||
sync.Mutex // guarantee ordered lines across multiple files of same name
|
||||
}
|
||||
|
||||
// Handles tailing a file within its lifespan
|
||||
func fileHandle(ctx context.Context, file Tailable, useOffset bool, lineChan *orderedLines, errChan chan<- error) {
|
||||
f, err := os.Open(file.Name)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) { // ignore ErrNotExist, as it may have been race deleted
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var offset int64
|
||||
if useOffset {
|
||||
offset, err = f.Seek(file.Offset, file.Whence)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
first, breakNext, b := true, false, bufio.NewReader(f)
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
errChan <- ctx.Err()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if first {
|
||||
first = false
|
||||
} else {
|
||||
offset, err = detectTruncation(f, offset)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
offset, err = readToEOF(b, &file.Name, offset, lineChan.c)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
if breakNext {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case _, ok := <-file.wakeup:
|
||||
if !ok {
|
||||
fs, err := f.Stat()
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
errChan <- ctx.Err()
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil || fs.Size() == offset {
|
||||
return
|
||||
}
|
||||
|
||||
breakNext = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// offset is io.SeekStart
|
||||
func detectTruncation(f *os.File, offset int64) (int64, error) {
|
||||
fs, err := f.Stat()
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) { // ignore ErrNotExist, as it may have been race deleted
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if fs.Size() < offset {
|
||||
// file has been truncated
|
||||
return f.Seek(0, io.SeekStart)
|
||||
}
|
||||
|
||||
return offset, nil
|
||||
}
|
||||
|
||||
func readToEOF(buf *bufio.Reader, name *string, offset int64, c chan<- *Line) (int64, error) {
|
||||
for {
|
||||
b, err := buf.ReadBytes('\n')
|
||||
offset += int64(len(b))
|
||||
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return offset, err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
b = b[:len(b)-1] // remove \n
|
||||
}
|
||||
|
||||
c <- &Line{
|
||||
Filename: name,
|
||||
Bytes: b,
|
||||
EndOffset: offset,
|
||||
ReachedEOF: err != nil,
|
||||
}
|
||||
|
||||
if err != nil { // EOF
|
||||
return offset, nil
|
||||
}
|
||||
}
|
||||
}
|
148
vendor/github.com/jtagcat/util/tail.go → vendor/github.com/jtagcat/util/tail/tail.go
generated
vendored
148
vendor/github.com/jtagcat/util/tail.go → vendor/github.com/jtagcat/util/tail/tail.go
generated
vendored
@ -1,49 +1,26 @@
|
||||
package util
|
||||
package tail
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
type Line struct {
|
||||
Filename *string
|
||||
Bytes []byte
|
||||
EndOffset int64 // io.SeekStart
|
||||
ReachedEOF bool
|
||||
}
|
||||
|
||||
// File starts tailing file from offset and whence (os.Seek()).
|
||||
// It follows target file for appends, truncations, and replacements.
|
||||
// Errors abort connected operations.
|
||||
|
||||
type Tailable struct {
|
||||
Name string
|
||||
// os.Seek() on first open:
|
||||
Offset int64
|
||||
Whence int
|
||||
|
||||
// loop-persisting
|
||||
existed bool
|
||||
|
||||
// copied use
|
||||
wakeup chan struct{}
|
||||
}
|
||||
|
||||
var ErrScatteredFiles = errors.New("all Tailable files must be in the same directory")
|
||||
|
||||
// Unstable, beta
|
||||
//
|
||||
// All files must be in the same directory.
|
||||
// Channels will be closed after file is deleted //TODO:
|
||||
func TailFiles(ctx context.Context, files []Tailable) (<-chan *Line, <-chan error, error) {
|
||||
func Files(ctx context.Context, files []Tailable) (<-chan *Line, <-chan error, error) {
|
||||
if len(files) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
@ -84,13 +61,13 @@ func TailFiles(ctx context.Context, files []Tailable) (<-chan *Line, <-chan erro
|
||||
|
||||
lineChan, errChan := make(chan *Line), make(chan error)
|
||||
|
||||
go tailFiles(ctx, w, &files, lineChan, errChan)
|
||||
go multipleFiles(ctx, w, &files, lineChan, errChan)
|
||||
|
||||
return lineChan, errChan, err
|
||||
}
|
||||
|
||||
// Consumes Watcher
|
||||
func tailFiles(ctx context.Context,
|
||||
func multipleFiles(ctx context.Context,
|
||||
w *fsnotify.Watcher, files *[]Tailable,
|
||||
lineChan chan<- *Line, errChan chan<- error,
|
||||
) {
|
||||
@ -102,7 +79,7 @@ func tailFiles(ctx context.Context,
|
||||
type mapWrap struct {
|
||||
*Tailable
|
||||
seen bool
|
||||
lineChan orderedLineChan
|
||||
lineChan orderedLines
|
||||
}
|
||||
|
||||
names := make(map[string]*mapWrap)
|
||||
@ -111,7 +88,7 @@ func tailFiles(ctx context.Context,
|
||||
c := make(chan *Line)
|
||||
names[filepath.Base(file.Name)] = &mapWrap{
|
||||
Tailable: &file,
|
||||
lineChan: orderedLineChan{c: c},
|
||||
lineChan: orderedLines{c: c},
|
||||
}
|
||||
|
||||
go func() { // relay files
|
||||
@ -186,116 +163,3 @@ func tailFiles(ctx context.Context,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type orderedLineChan struct {
|
||||
c chan<- *Line
|
||||
sync.Mutex // guarantee ordered lines across multiple files of same name
|
||||
}
|
||||
|
||||
// Handles tailing a file within its lifespan
|
||||
func fileHandle(ctx context.Context, file Tailable, useOffset bool, lineChan *orderedLineChan, errChan chan<- error) {
|
||||
f, err := os.Open(file.Name)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) { // ignore ErrNotExist, as it may have been race deleted
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var offset int64
|
||||
if useOffset {
|
||||
offset, err = f.Seek(file.Offset, file.Whence)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
first, breakNext, b := true, false, bufio.NewReader(f)
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
errChan <- ctx.Err()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if first {
|
||||
first = false
|
||||
} else {
|
||||
offset, err = detectTruncation(f, offset)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
offset, err = readToEOF(b, &file.Name, offset, lineChan.c)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
if breakNext {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case _, ok := <-file.wakeup:
|
||||
if !ok {
|
||||
fs, err := f.Stat()
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
errChan <- ctx.Err()
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil || fs.Size() == offset {
|
||||
return
|
||||
}
|
||||
|
||||
breakNext = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// offset is io.SeekStart
|
||||
func detectTruncation(f *os.File, offset int64) (int64, error) {
|
||||
fs, err := f.Stat()
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) { // ignore ErrNotExist, as it may have been race deleted
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if fs.Size() < offset {
|
||||
// file has been truncated
|
||||
return f.Seek(0, io.SeekStart)
|
||||
}
|
||||
|
||||
return offset, nil
|
||||
}
|
||||
|
||||
func readToEOF(buf *bufio.Reader, name *string, offset int64, c chan<- *Line) (int64, error) {
|
||||
for {
|
||||
b, err := buf.ReadBytes('\n')
|
||||
offset += int64(len(b))
|
||||
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return offset, err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
b = b[:len(b)-1] // remove \n
|
||||
}
|
||||
|
||||
c <- &Line{
|
||||
Filename: name,
|
||||
Bytes: b,
|
||||
EndOffset: offset,
|
||||
ReachedEOF: err != nil,
|
||||
}
|
||||
|
||||
if err != nil { // EOF
|
||||
return offset, nil
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package util
|
||||
package tail
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
// Unstable, beta
|
||||
func TailFile(ctx context.Context, name string, offset int64, whence int) (<-chan *Line, <-chan error, error) {
|
||||
func New(ctx context.Context, name string, offset int64, whence int) (<-chan *Line, <-chan error, error) {
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -20,7 +20,7 @@ func TailFile(ctx context.Context, name string, offset int64, whence int) (<-cha
|
||||
|
||||
lineChan, errChan := make(chan *Line), make(chan error)
|
||||
|
||||
go tailSingleFile(ctx, w, &Tailable{
|
||||
go singleFile(ctx, w, &Tailable{
|
||||
Name: name, Offset: offset, Whence: whence,
|
||||
}, lineChan, errChan)
|
||||
|
||||
@ -28,7 +28,7 @@ func TailFile(ctx context.Context, name string, offset int64, whence int) (<-cha
|
||||
}
|
||||
|
||||
// assumes file exists
|
||||
func tailSingleFile(ctx context.Context,
|
||||
func singleFile(ctx context.Context,
|
||||
w *fsnotify.Watcher, file *Tailable,
|
||||
lineChan chan<- *Line, errChan chan<- error,
|
||||
) {
|
||||
@ -40,7 +40,7 @@ func tailSingleFile(ctx context.Context,
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
// no need to lock/unlock orderedLineChan, as we only have one same-named file across its life
|
||||
fileHandle(sctx, *file, true, &orderedLineChan{c: lineChan}, errChan)
|
||||
fileHandle(sctx, *file, true, &orderedLines{c: lineChan}, errChan)
|
||||
wg.Done()
|
||||
}()
|
||||
|
7
vendor/modules.txt
vendored
7
vendor/modules.txt
vendored
@ -20,9 +20,12 @@ github.com/golang/protobuf/ptypes/timestamp
|
||||
# github.com/golang/snappy v0.0.4
|
||||
## explicit
|
||||
github.com/golang/snappy
|
||||
# github.com/jtagcat/util v0.0.0-20221109214318-07460aca28b1
|
||||
# github.com/jtagcat/util v0.0.0-20221112215320-924d264211be
|
||||
## explicit; go 1.18
|
||||
github.com/jtagcat/util
|
||||
github.com/jtagcat/util/batch
|
||||
github.com/jtagcat/util/retry
|
||||
github.com/jtagcat/util/std
|
||||
github.com/jtagcat/util/tail
|
||||
# github.com/klauspost/compress v1.15.12
|
||||
## explicit; go 1.17
|
||||
github.com/klauspost/compress
|
||||
|
Loading…
Reference in New Issue
Block a user