mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
Add compaction log (#24976)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
d143682d7d
commit
f57fe6d70b
@ -312,7 +312,10 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
|
||||
if ok {
|
||||
if state == commonpb.CompactionState_Completed {
|
||||
log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID))
|
||||
c.completeCompaction(stateResult.GetResult())
|
||||
err := c.completeCompaction(stateResult.GetResult())
|
||||
if err != nil {
|
||||
log.Warn("fail to complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID), zap.Error(err))
|
||||
}
|
||||
continue
|
||||
}
|
||||
// check wether the CompactionPlan is timeout
|
||||
|
@ -73,12 +73,13 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
|
||||
vs [][]byte
|
||||
)
|
||||
|
||||
log.Debug("down load", zap.Strings("path", paths))
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
for err != nil {
|
||||
select {
|
||||
case <-gCtx.Done():
|
||||
log.Warn("ctx done when downloading kvs from blob storage")
|
||||
log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths))
|
||||
return errDownloadFromBlobStorage
|
||||
|
||||
default:
|
||||
|
@ -87,7 +87,7 @@ func (c *compactionExecutor) executeTask(task compactor) {
|
||||
c.toCompleteState(task)
|
||||
}()
|
||||
|
||||
log.Info("start to execute compaction", zap.Int64("planID", task.getPlanID()))
|
||||
log.Info("start to execute compaction", zap.Int64("planID", task.getPlanID()), zap.Int64("Collection", task.getCollection()), zap.String("channel", task.getChannelName()))
|
||||
|
||||
result, err := task.compact()
|
||||
if err != nil {
|
||||
|
@ -200,7 +200,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
|
||||
|
||||
dbuff.accumulateEntriesNum(dbuff.delData.RowCount)
|
||||
log.Info("mergeDeltalogs end",
|
||||
zap.Int("number of pks to compact in insert logs", len(pk2ts)),
|
||||
zap.Int("number of deleted pks to compact in insert logs", len(pk2ts)),
|
||||
zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart))))
|
||||
|
||||
return pk2ts, dbuff, nil
|
||||
@ -392,21 +392,21 @@ func (t *compactionTask) merge(
|
||||
downloadStart := time.Now()
|
||||
data, err := t.download(ctxTimeout, path)
|
||||
if err != nil {
|
||||
log.Warn("download insertlogs wrong", zap.Error(err))
|
||||
log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err))
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
downloadTimeCost += time.Since(downloadStart)
|
||||
|
||||
iter, err := storage.NewInsertBinlogIterator(data, pkID, pkType)
|
||||
if err != nil {
|
||||
log.Warn("new insert binlogs Itr wrong", zap.Error(err))
|
||||
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err))
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
for iter.HasNext() {
|
||||
vInter, _ := iter.Next()
|
||||
v, ok := vInter.(*storage.Value)
|
||||
if !ok {
|
||||
log.Warn("transfer interface to Value wrong")
|
||||
log.Warn("transfer interface to Value wrong", zap.Strings("path", path))
|
||||
return nil, nil, 0, errors.New("unexpected error")
|
||||
}
|
||||
|
||||
@ -423,7 +423,7 @@ func (t *compactionTask) merge(
|
||||
|
||||
row, ok := v.Value.(map[UniqueID]interface{})
|
||||
if !ok {
|
||||
log.Warn("transfer interface to map wrong")
|
||||
log.Warn("transfer interface to map wrong", zap.Strings("path", path))
|
||||
return nil, nil, 0, errors.New("unexpected error")
|
||||
}
|
||||
|
||||
@ -644,7 +644,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
dmu sync.Mutex
|
||||
)
|
||||
|
||||
allPs := make([][]string, 0)
|
||||
allPath := make([][]string, 0)
|
||||
|
||||
downloadStart := time.Now()
|
||||
g, gCtx := errgroup.WithContext(ctxTimeout)
|
||||
@ -669,7 +669,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
for _, f := range s.GetFieldBinlogs() {
|
||||
ps = append(ps, f.GetBinlogs()[idx].GetLogPath())
|
||||
}
|
||||
allPs = append(allPs, ps)
|
||||
allPath = append(allPath, ps)
|
||||
}
|
||||
|
||||
segID := s.GetSegmentID()
|
||||
@ -679,7 +679,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
g.Go(func() error {
|
||||
bs, err := t.download(gCtx, []string{path})
|
||||
if err != nil {
|
||||
log.Warn("download deltalogs wrong")
|
||||
log.Warn("download deltalogs wrong", zap.String("path", path), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -709,7 +709,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
inPaths, statsPaths, numRows, err := t.merge(ctxTimeout, allPs, targetSegID, partID, meta, deltaPk2Ts)
|
||||
inPaths, statsPaths, numRows, err := t.merge(ctxTimeout, allPath, targetSegID, partID, meta, deltaPk2Ts)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user