diff --git a/cmd/line.go b/cmd/line.go index 6937843..5836d98 100644 --- a/cmd/line.go +++ b/cmd/line.go @@ -28,7 +28,7 @@ var ( type ( rawLine struct { - recordMetadata + RecordMetadata line []byte } @@ -76,7 +76,7 @@ func (s *submitter) parseContainerLogLines(unparsed <-chan rawLine, parsed chan< s.l.Error("parsing container log line", zap.Error(err), zap.String("file", raw.File)) } - line.mLog.recordMetadata = raw.recordMetadata + line.mLog.RecordMetadata = raw.RecordMetadata parsed <- line } } @@ -126,7 +126,7 @@ func (s *submitter) parseStdChannel(bufferLimitBytes int, lines <-chan singleLin flush := func(last *mLog) { parsed <- mLog{ - recordMetadata: last.recordMetadata, + RecordMetadata: last.RecordMetadata, StdErr: last.StdErr, ContainerTime: firstTime, diff --git a/cmd/mongo_struct.go b/cmd/mongo_struct.go index f8e58a9..15d959a 100644 --- a/cmd/mongo_struct.go +++ b/cmd/mongo_struct.go @@ -13,7 +13,7 @@ func initializeIndexes(ctx context.Context, col *mongo.Collection) error { // (does not create duplicates) _, err := ind.CreateOne(mongoTimeoutCtx(ctx), mongo.IndexModel{ - Keys: bson.D{{Key: mongoKeyFileBasename, Value: 1}, {Key: mongoKeyOffset, Value: -1}}, + Keys: bson.D{{Key: mLogKeyFileBasename, Value: 1}, {Key: mLogKeyOffset, Value: -1}}, }) return err @@ -22,7 +22,7 @@ func initializeIndexes(ctx context.Context, col *mongo.Collection) error { // when editing, also edit everything in this file! type ( mLog struct { - recordMetadata + RecordMetadata Content any ContainerTime time.Time @@ -31,7 +31,7 @@ type ( // added by toBson() ShipTime time.Time } - recordMetadata struct { + RecordMetadata struct { HostInfo HostInfo File string Offset int64 // byte offset where log entry ends at @@ -39,28 +39,80 @@ type ( ) const ( - mongoKeyHostInfo = "host_info" - mongoKeyId = "id" - mongoKeyHostId = mongoKeyHostInfo + "." + mongoKeyId - mongoKeyFileBasename = "file" - mongoKeyOffset = "offset" + // used outside fromBson and toBson + mLogKeyHostId = mLogKeyHostInfo + "." + mLogKeyId +) + +const ( + mLogKeyHostInfo = "host_info" + mLogKeyId = "id" + mLogKeyName = "name" + mLogKeyArch = "arch" + + mLogKeyFileBasename = "file" + mLogKeyOffset = "offset" + mLogKeyContent = "content" + mLogKeyContainerTime = "container_time" + mLogKeyStderr = "stderr" + mLogKeyShipTime = "ship_time" ) // not using marshal, since it is <0.1x performance func (l *mLog) toBson() bson.M { // DO NOT USE QUOTED STRINGS! Move them to const and use variable instead return bson.M{ - mongoKeyHostInfo: bson.M{ - mongoKeyId: l.HostInfo.id, - "name": l.HostInfo.name, - "arch": l.HostInfo.arch, + mLogKeyHostInfo: bson.M{ + mLogKeyId: l.HostInfo.Id, + mLogKeyName: l.HostInfo.Name, + mLogKeyArch: l.HostInfo.Arch, }, - mongoKeyFileBasename: l.File, - mongoKeyOffset: l.Offset, - "content": l.Content, - "container_time": l.ContainerTime, - "stderr": l.StdErr, + mLogKeyFileBasename: l.File, + mLogKeyOffset: l.Offset, + mLogKeyContent: l.Content, + mLogKeyContainerTime: l.ContainerTime, + mLogKeyStderr: l.StdErr, - "ship_time": time.Now(), + mLogKeyShipTime: time.Now(), } } + +// really, mongo should support tagged structs +func mLogfromBson(b *bson.Raw) mLog { + return mLog{ + RecordMetadata: RecordMetadata{ + HostInfo: HostInfo{ + Id: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyId), + Name: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyName), + Arch: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyArch), + }, + File: bsonLookupStringValue(b, mLogKeyFileBasename), + Offset: bsonLookupInt64(b, mLogKeyOffset), + }, + Content: bsonLookupStringValue(b, mLogKeyContent), + ContainerTime: bsonLookupTime(b, mLogKeyContainerTime), + StdErr: bsonLookupBoolean(b, mLogKeyStderr), + ShipTime: bsonLookupTime(b, mLogKeyShipTime), + } +} + +// default values without ok + +func bsonLookupBoolean(b *bson.Raw, key ...string) bool { + v, _ := b.Lookup(key...).BooleanOK() + return v +} + +func bsonLookupStringValue(b *bson.Raw, key ...string) string { + v, _ := b.Lookup(key...).StringValueOK() + return v +} + +func bsonLookupInt64(b *bson.Raw, key ...string) int64 { + v, _ := b.Lookup(key...).Int64OK() + return v +} + +func bsonLookupTime(b *bson.Raw, key ...string) time.Time { + v, _ := b.Lookup(key...).TimeOK() + return v +} diff --git a/cmd/submitter.go b/cmd/submitter.go index e1a4168..ffec50c 100644 --- a/cmd/submitter.go +++ b/cmd/submitter.go @@ -101,19 +101,17 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue } // get files with offset - offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), - bson.D{{Key: mongoKeyHostId, Value: s.hostInfo.id}, {Key: mongoKeyFileBasename, Value: baseName}}, - &mongoOpt.FindOneOptions{Sort: bson.D{{Key: mongoKeyOffset, Value: -1}}}, // sort descending (get largest) + offsetResult, _ := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), + bson.D{{Key: mLogKeyHostId, Value: s.hostInfo.Id}, {Key: mLogKeyFileBasename, Value: baseName}}, + &mongoOpt.FindOneOptions{Sort: bson.D{{Key: mLogKeyOffset, Value: -1}}}, // sort descending (get largest) )) + offsetResultBytes, err := offsetResult.DecodeBytes() if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("retrieving offset from database: %w", err) } - var log mLog - if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { - return fmt.Errorf("decoding offset from database: %w", err) - } + log := mLogfromBson(&offsetResultBytes) fi, err := os.Stat(name) if err != nil { @@ -162,7 +160,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue } sendQueue <- rawLine{ - recordMetadata: recordMetadata{ + RecordMetadata: RecordMetadata{ HostInfo: s.hostInfo, File: baseName, diff --git a/cmd/watcher.go b/cmd/watcher.go index 7f30960..4972201 100644 --- a/cmd/watcher.go +++ b/cmd/watcher.go @@ -229,9 +229,9 @@ var App = &cli.App{ } type HostInfo struct { - id string - name string - arch string + Id string + Name string + Arch string } func getHostInfo(nodeName string) (h HostInfo, err error) { @@ -241,16 +241,16 @@ func getHostInfo(nodeName string) (h HostInfo, err error) { err = fmt.Errorf("name: hostname: %w", err) // don't exit early } } - h.name = strings.TrimSpace(nodeName) + h.Name = strings.TrimSpace(nodeName) id, errL := os.ReadFile(MachineId) if errL != nil { err = errAppend(err, fmt.Errorf("id: %w", errL)) } - h.id = strings.TrimSpace(string(id)) + h.Id = strings.TrimSpace(string(id)) - h.arch = runtime.GOARCH + h.Arch = runtime.GOARCH return h, err }