fix mongo structs
This commit is contained in:
parent
ce3f67754a
commit
8961a4237f
@ -28,7 +28,7 @@ var (
|
|||||||
|
|
||||||
type (
|
type (
|
||||||
rawLine struct {
|
rawLine struct {
|
||||||
recordMetadata
|
RecordMetadata
|
||||||
line []byte
|
line []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,7 +76,7 @@ func (s *submitter) parseContainerLogLines(unparsed <-chan rawLine, parsed chan<
|
|||||||
s.l.Error("parsing container log line", zap.Error(err), zap.String("file", raw.File))
|
s.l.Error("parsing container log line", zap.Error(err), zap.String("file", raw.File))
|
||||||
}
|
}
|
||||||
|
|
||||||
line.mLog.recordMetadata = raw.recordMetadata
|
line.mLog.RecordMetadata = raw.RecordMetadata
|
||||||
parsed <- line
|
parsed <- line
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -126,7 +126,7 @@ func (s *submitter) parseStdChannel(bufferLimitBytes int, lines <-chan singleLin
|
|||||||
|
|
||||||
flush := func(last *mLog) {
|
flush := func(last *mLog) {
|
||||||
parsed <- mLog{
|
parsed <- mLog{
|
||||||
recordMetadata: last.recordMetadata,
|
RecordMetadata: last.RecordMetadata,
|
||||||
StdErr: last.StdErr,
|
StdErr: last.StdErr,
|
||||||
|
|
||||||
ContainerTime: firstTime,
|
ContainerTime: firstTime,
|
||||||
|
@ -13,7 +13,7 @@ func initializeIndexes(ctx context.Context, col *mongo.Collection) error {
|
|||||||
|
|
||||||
// (does not create duplicates)
|
// (does not create duplicates)
|
||||||
_, err := ind.CreateOne(mongoTimeoutCtx(ctx), mongo.IndexModel{
|
_, err := ind.CreateOne(mongoTimeoutCtx(ctx), mongo.IndexModel{
|
||||||
Keys: bson.D{{Key: mongoKeyFileBasename, Value: 1}, {Key: mongoKeyOffset, Value: -1}},
|
Keys: bson.D{{Key: mLogKeyFileBasename, Value: 1}, {Key: mLogKeyOffset, Value: -1}},
|
||||||
})
|
})
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -22,7 +22,7 @@ func initializeIndexes(ctx context.Context, col *mongo.Collection) error {
|
|||||||
// when editing, also edit everything in this file!
|
// when editing, also edit everything in this file!
|
||||||
type (
|
type (
|
||||||
mLog struct {
|
mLog struct {
|
||||||
recordMetadata
|
RecordMetadata
|
||||||
|
|
||||||
Content any
|
Content any
|
||||||
ContainerTime time.Time
|
ContainerTime time.Time
|
||||||
@ -31,7 +31,7 @@ type (
|
|||||||
// added by toBson()
|
// added by toBson()
|
||||||
ShipTime time.Time
|
ShipTime time.Time
|
||||||
}
|
}
|
||||||
recordMetadata struct {
|
RecordMetadata struct {
|
||||||
HostInfo HostInfo
|
HostInfo HostInfo
|
||||||
File string
|
File string
|
||||||
Offset int64 // byte offset where log entry ends at
|
Offset int64 // byte offset where log entry ends at
|
||||||
@ -39,28 +39,80 @@ type (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
mongoKeyHostInfo = "host_info"
|
// used outside fromBson and toBson
|
||||||
mongoKeyId = "id"
|
mLogKeyHostId = mLogKeyHostInfo + "." + mLogKeyId
|
||||||
mongoKeyHostId = mongoKeyHostInfo + "." + mongoKeyId
|
)
|
||||||
mongoKeyFileBasename = "file"
|
|
||||||
mongoKeyOffset = "offset"
|
const (
|
||||||
|
mLogKeyHostInfo = "host_info"
|
||||||
|
mLogKeyId = "id"
|
||||||
|
mLogKeyName = "name"
|
||||||
|
mLogKeyArch = "arch"
|
||||||
|
|
||||||
|
mLogKeyFileBasename = "file"
|
||||||
|
mLogKeyOffset = "offset"
|
||||||
|
mLogKeyContent = "content"
|
||||||
|
mLogKeyContainerTime = "container_time"
|
||||||
|
mLogKeyStderr = "stderr"
|
||||||
|
mLogKeyShipTime = "ship_time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// not using marshal, since it is <0.1x performance
|
// not using marshal, since it is <0.1x performance
|
||||||
func (l *mLog) toBson() bson.M {
|
func (l *mLog) toBson() bson.M {
|
||||||
// DO NOT USE QUOTED STRINGS! Move them to const and use variable instead
|
// DO NOT USE QUOTED STRINGS! Move them to const and use variable instead
|
||||||
return bson.M{
|
return bson.M{
|
||||||
mongoKeyHostInfo: bson.M{
|
mLogKeyHostInfo: bson.M{
|
||||||
mongoKeyId: l.HostInfo.id,
|
mLogKeyId: l.HostInfo.Id,
|
||||||
"name": l.HostInfo.name,
|
mLogKeyName: l.HostInfo.Name,
|
||||||
"arch": l.HostInfo.arch,
|
mLogKeyArch: l.HostInfo.Arch,
|
||||||
},
|
},
|
||||||
mongoKeyFileBasename: l.File,
|
mLogKeyFileBasename: l.File,
|
||||||
mongoKeyOffset: l.Offset,
|
mLogKeyOffset: l.Offset,
|
||||||
"content": l.Content,
|
mLogKeyContent: l.Content,
|
||||||
"container_time": l.ContainerTime,
|
mLogKeyContainerTime: l.ContainerTime,
|
||||||
"stderr": l.StdErr,
|
mLogKeyStderr: l.StdErr,
|
||||||
|
|
||||||
"ship_time": time.Now(),
|
mLogKeyShipTime: time.Now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// really, mongo should support tagged structs
|
||||||
|
func mLogfromBson(b *bson.Raw) mLog {
|
||||||
|
return mLog{
|
||||||
|
RecordMetadata: RecordMetadata{
|
||||||
|
HostInfo: HostInfo{
|
||||||
|
Id: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyId),
|
||||||
|
Name: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyName),
|
||||||
|
Arch: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyArch),
|
||||||
|
},
|
||||||
|
File: bsonLookupStringValue(b, mLogKeyFileBasename),
|
||||||
|
Offset: bsonLookupInt64(b, mLogKeyOffset),
|
||||||
|
},
|
||||||
|
Content: bsonLookupStringValue(b, mLogKeyContent),
|
||||||
|
ContainerTime: bsonLookupTime(b, mLogKeyContainerTime),
|
||||||
|
StdErr: bsonLookupBoolean(b, mLogKeyStderr),
|
||||||
|
ShipTime: bsonLookupTime(b, mLogKeyShipTime),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// default values without ok
|
||||||
|
|
||||||
|
func bsonLookupBoolean(b *bson.Raw, key ...string) bool {
|
||||||
|
v, _ := b.Lookup(key...).BooleanOK()
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func bsonLookupStringValue(b *bson.Raw, key ...string) string {
|
||||||
|
v, _ := b.Lookup(key...).StringValueOK()
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func bsonLookupInt64(b *bson.Raw, key ...string) int64 {
|
||||||
|
v, _ := b.Lookup(key...).Int64OK()
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func bsonLookupTime(b *bson.Raw, key ...string) time.Time {
|
||||||
|
v, _ := b.Lookup(key...).TimeOK()
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
@ -101,19 +101,17 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
|||||||
}
|
}
|
||||||
|
|
||||||
// get files with offset
|
// get files with offset
|
||||||
offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx),
|
offsetResult, _ := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx),
|
||||||
bson.D{{Key: mongoKeyHostId, Value: s.hostInfo.id}, {Key: mongoKeyFileBasename, Value: baseName}},
|
bson.D{{Key: mLogKeyHostId, Value: s.hostInfo.Id}, {Key: mLogKeyFileBasename, Value: baseName}},
|
||||||
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: mongoKeyOffset, Value: -1}}}, // sort descending (get largest)
|
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: mLogKeyOffset, Value: -1}}}, // sort descending (get largest)
|
||||||
))
|
))
|
||||||
|
|
||||||
|
offsetResultBytes, err := offsetResult.DecodeBytes()
|
||||||
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
||||||
return fmt.Errorf("retrieving offset from database: %w", err)
|
return fmt.Errorf("retrieving offset from database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var log mLog
|
log := mLogfromBson(&offsetResultBytes)
|
||||||
if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
|
||||||
return fmt.Errorf("decoding offset from database: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fi, err := os.Stat(name)
|
fi, err := os.Stat(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -162,7 +160,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
|||||||
}
|
}
|
||||||
|
|
||||||
sendQueue <- rawLine{
|
sendQueue <- rawLine{
|
||||||
recordMetadata: recordMetadata{
|
RecordMetadata: RecordMetadata{
|
||||||
HostInfo: s.hostInfo,
|
HostInfo: s.hostInfo,
|
||||||
File: baseName,
|
File: baseName,
|
||||||
|
|
||||||
|
@ -229,9 +229,9 @@ var App = &cli.App{
|
|||||||
}
|
}
|
||||||
|
|
||||||
type HostInfo struct {
|
type HostInfo struct {
|
||||||
id string
|
Id string
|
||||||
name string
|
Name string
|
||||||
arch string
|
Arch string
|
||||||
}
|
}
|
||||||
|
|
||||||
func getHostInfo(nodeName string) (h HostInfo, err error) {
|
func getHostInfo(nodeName string) (h HostInfo, err error) {
|
||||||
@ -241,16 +241,16 @@ func getHostInfo(nodeName string) (h HostInfo, err error) {
|
|||||||
err = fmt.Errorf("name: hostname: %w", err) // don't exit early
|
err = fmt.Errorf("name: hostname: %w", err) // don't exit early
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
h.name = strings.TrimSpace(nodeName)
|
h.Name = strings.TrimSpace(nodeName)
|
||||||
|
|
||||||
id, errL := os.ReadFile(MachineId)
|
id, errL := os.ReadFile(MachineId)
|
||||||
if errL != nil {
|
if errL != nil {
|
||||||
err = errAppend(err, fmt.Errorf("id: %w", errL))
|
err = errAppend(err, fmt.Errorf("id: %w", errL))
|
||||||
}
|
}
|
||||||
|
|
||||||
h.id = strings.TrimSpace(string(id))
|
h.Id = strings.TrimSpace(string(id))
|
||||||
|
|
||||||
h.arch = runtime.GOARCH
|
h.Arch = runtime.GOARCH
|
||||||
|
|
||||||
return h, err
|
return h, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user