package lines import ( "context" "log" "sync" "git.k-space.ee/k-space/logmower-shipper/pkg/globals" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" ) type ( RawC <-chan Raw Raw struct { *File Offset int64 B []byte } // file.File, but avoiding import cycle File struct { *m.File MetricsName string // filepath.Base() } ) // assumes all lines are from same file func (unparsed RawC) Process(ctx context.Context, parsed chan<- m.Record) { lines := make(chan singleLine) go unparsed.parse(lines) var wg sync.WaitGroup wg.Add(2) stdOut, stdErr := make(chan singleLine), make(chan singleLine) go func() { singleLines(stdOut).process(ctx, parsed) wg.Done() }() go func() { singleLines(stdErr).process(ctx, parsed) wg.Done() }() defer func() { close(stdOut) close(stdErr) wg.Wait() close(parsed) }() // split stdout and stderr for { select { case <-ctx.Done(): return case line, ok := <-lines: if !ok { return } if line.StdErr { stdErr <- line } else { stdOut <- line } } } } func (lines singleLines) process(ctx context.Context, parsed chan<- m.Record) { var firstMetadata *m.ParsedMetadata var buffer []byte for { line, ok := <-lines if !ok { // partial line should always be finished with full line // discard any partial lines without end (full line) return } if len(buffer) == 0 { firstMetadata = &line.ParsedMetadata } buffer = append(buffer, line.B...) if len(buffer) > globals.BufferLimitBytes { promRecordDroppedTooLarge.WithLabelValues(line.MetricsName).Add(1) log.Printf("dropped record: size in bytes exceeds limit of %d", globals.BufferLimitBytes) buffer = nil continue } if !line.partial { select { case <-ctx.Done(): return case parsed <- m.Record{ File: line.File.File, Offset: line.Offset, String: string(buffer), ParsedMetadata: *firstMetadata, }: } buffer = nil } } }