Optimize IndexCoord scheduler loop (#19375)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2022-09-24 13:40:52 +08:00 committed by GitHub
parent 4e521438c3
commit 9e18645363
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 175 additions and 208 deletions

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/logutil"
)
type flushedSegmentWatcher struct {
@ -72,7 +71,7 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable,
kvClient: kv,
wg: sync.WaitGroup{},
internalTaskMutex: sync.RWMutex{},
scheduleDuration: time.Second,
scheduleDuration: time.Second * 3,
internalNotify: make(chan struct{}, 1),
meta: meta,
builder: builder,
@ -87,22 +86,23 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable,
}
func (fsw *flushedSegmentWatcher) reloadFromKV() error {
log.Info("flushSegmentWatcher reloadFromKV")
log.Ctx(fsw.ctx).Info("flushSegmentWatcher reloadFromKV")
fsw.internalTasks = make(map[UniqueID]*internalTask)
_, values, version, err := fsw.kvClient.LoadWithRevision(util.FlushedSegmentPrefix)
if err != nil {
log.Error("flushSegmentWatcher reloadFromKV fail", zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err))
log.Ctx(fsw.ctx).Error("flushSegmentWatcher reloadFromKV fail", zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err))
return err
}
for _, value := range values {
segID, err := strconv.ParseInt(value, 10, 64)
if err != nil {
log.Error("flushSegmentWatcher parse segmentID fail", zap.String("value", value), zap.Error(err))
log.Ctx(fsw.ctx).Error("flushSegmentWatcher parse segmentID fail", zap.String("value", value), zap.Error(err))
return err
}
fsw.enqueueInternalTask(segID)
}
fsw.etcdRevision = version
log.Ctx(fsw.ctx).Info("flushSegmentWatcher reloadFromKV success", zap.Int64("etcdRevision", version))
return nil
}
@ -121,7 +121,7 @@ func (fsw *flushedSegmentWatcher) enqueueInternalTask(segmentID UniqueID) {
fsw.internalTaskMutex.Lock()
defer fsw.internalTaskMutex.Unlock()
logutil.Logger(fsw.ctx).Info("flushedSegmentWatcher enqueueInternalTask", zap.Int64("segmentID", segmentID))
log.Ctx(fsw.ctx).Info("flushedSegmentWatcher enqueueInternalTask", zap.Int64("segmentID", segmentID))
if _, ok := fsw.internalTasks[segmentID]; !ok {
fsw.internalTasks[segmentID] = &internalTask{
@ -129,11 +129,12 @@ func (fsw *flushedSegmentWatcher) enqueueInternalTask(segmentID UniqueID) {
segmentInfo: nil,
}
}
logutil.Logger(fsw.ctx).Info("flushedSegmentWatcher enqueueInternalTask success", zap.Int64("segmentID", segmentID))
log.Ctx(fsw.ctx).Info("flushedSegmentWatcher already have the task success", zap.Int64("segmentID", segmentID))
}
func (fsw *flushedSegmentWatcher) internalScheduler() {
log.Info("IndexCoord flushedSegmentWatcher internalScheduler start...")
log.Ctx(fsw.ctx).Info("IndexCoord flushedSegmentWatcher internalScheduler start...")
defer fsw.wg.Done()
ticker := time.NewTicker(fsw.scheduleDuration)
@ -142,7 +143,7 @@ func (fsw *flushedSegmentWatcher) internalScheduler() {
for {
select {
case <-fsw.ctx.Done():
log.Warn("IndexCoord flushedSegmentWatcher context done")
log.Ctx(fsw.ctx).Warn("IndexCoord flushedSegmentWatcher context done")
return
case <-ticker.C:
fsw.internalRun()
@ -156,7 +157,8 @@ func (fsw *flushedSegmentWatcher) internalRun() {
fsw.internalTaskMutex.RLock()
segmentIDs := make([]UniqueID, 0, len(fsw.internalTasks))
if len(fsw.internalTasks) > 0 {
log.Debug("IndexCoord flushedSegmentWatcher schedule internal tasks", zap.Int("internal task num", len(fsw.internalTasks)))
log.Ctx(fsw.ctx).Debug("IndexCoord flushedSegmentWatcher schedule internal tasks",
zap.Int("internal task num", len(fsw.internalTasks)))
for segID := range fsw.internalTasks {
segmentIDs = append(segmentIDs, segID)
}
@ -188,7 +190,8 @@ func (fsw *flushedSegmentWatcher) Len() int {
func (fsw *flushedSegmentWatcher) updateInternalTaskState(segID UniqueID, state indexTaskState) {
fsw.internalTaskMutex.Lock()
defer fsw.internalTaskMutex.Unlock()
log.Debug("flushedSegmentWatcher updateInternalTaskState", zap.Int64("segID", segID), zap.String("state", state.String()))
log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher updateInternalTaskState", zap.Int64("segID", segID),
zap.String("state", state.String()))
if _, ok := fsw.internalTasks[segID]; ok {
fsw.internalTasks[segID].state = state
}
@ -199,7 +202,7 @@ func (fsw *flushedSegmentWatcher) deleteInternalTask(segID UniqueID) {
defer fsw.internalTaskMutex.Unlock()
delete(fsw.internalTasks, segID)
log.Debug("flushedSegmentWatcher delete the internal task", zap.Int64("segID", segID))
log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher delete the internal task", zap.Int64("segID", segID))
}
func (fsw *flushedSegmentWatcher) getInternalTask(segID UniqueID) *internalTask {
@ -219,24 +222,24 @@ func (fsw *flushedSegmentWatcher) setInternalTaskSegmentInfo(segID UniqueID, seg
if _, ok := fsw.internalTasks[segID]; ok {
fsw.internalTasks[segID].segmentInfo = segInfo
}
log.Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID))
log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID))
}
func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
t := fsw.getInternalTask(segID)
log.Debug("IndexCoord flushedSegmentWatcher process internal task", zap.Int64("segID", segID),
log.Ctx(fsw.ctx).RatedDebug(10, "flushedSegmentWatcher process internal task", zap.Int64("segID", segID),
zap.String("state", t.state.String()))
switch t.state {
case indexTaskPrepare:
if err := fsw.prepare(segID); err != nil {
log.Error("flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err))
log.Ctx(fsw.ctx).RatedWarn(10, "flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err))
return
}
fsw.updateInternalTaskState(segID, indexTaskInit)
case indexTaskInit:
if err := fsw.constructTask(t); err != nil {
log.Error("flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err))
log.Ctx(fsw.ctx).RatedWarn(10, "flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err))
return
}
fsw.updateInternalTaskState(segID, indexTaskInProgress)
@ -248,24 +251,22 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
}
case indexTaskDone:
if err := fsw.removeFlushedSegment(t); err != nil {
log.Error("IndexCoord flushSegmentWatcher removeFlushedSegment fail",
log.Ctx(fsw.ctx).RatedWarn(10, "IndexCoord flushSegmentWatcher removeFlushedSegment fail",
zap.Int64("segID", segID), zap.Error(err))
return
}
fsw.deleteInternalTask(segID)
fsw.internalNotifyFunc()
default:
log.Debug("IndexCoord flushedSegmentWatcher internal task get invalid state", zap.Int64("segID", segID),
log.Info("IndexCoord flushedSegmentWatcher internal task get invalid state", zap.Int64("segID", segID),
zap.String("state", t.state.String()))
}
}
func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
log.Debug("IndexCoord flushedSegmentWatcher construct tasks by segment info", zap.Int64("segID", t.segmentInfo.ID),
zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom))
fieldIndexes := fsw.meta.GetIndexesForCollection(t.segmentInfo.CollectionID, "")
if len(fieldIndexes) == 0 {
log.Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID),
log.Ctx(fsw.ctx).Debug("segment no need to build index", zap.Int64("segmentID", t.segmentInfo.ID),
zap.Int64("num of rows", t.segmentInfo.NumOfRows), zap.Int("collection indexes num", len(fieldIndexes)))
// no need to build index
return nil
@ -285,7 +286,7 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
// send to indexBuilder
have, buildID, err := fsw.ic.createIndexForSegment(segIdx)
if err != nil {
log.Warn("IndexCoord create index for segment fail", zap.Int64("segID", t.segmentInfo.ID),
log.Ctx(fsw.ctx).Warn("IndexCoord create index for segment fail", zap.Int64("segID", t.segmentInfo.ID),
zap.Int64("indexID", index.IndexID), zap.Error(err))
return err
}
@ -294,7 +295,7 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
}
}
fsw.handoff.enqueue(t.segmentInfo.ID)
log.Debug("flushedSegmentWatcher construct children task success", zap.Int64("segID", t.segmentInfo.ID),
log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher construct task success", zap.Int64("segID", t.segmentInfo.ID),
zap.Int("tasks num", len(fieldIndexes)))
return nil
}
@ -303,11 +304,11 @@ func (fsw *flushedSegmentWatcher) removeFlushedSegment(t *internalTask) error {
deletedKeys := fmt.Sprintf("%s/%d/%d/%d", util.FlushedSegmentPrefix, t.segmentInfo.CollectionID, t.segmentInfo.PartitionID, t.segmentInfo.ID)
err := fsw.kvClient.RemoveWithPrefix(deletedKeys)
if err != nil {
log.Error("IndexCoord remove flushed segment fail", zap.Int64("collID", t.segmentInfo.CollectionID),
log.Ctx(fsw.ctx).Warn("IndexCoord remove flushed segment fail", zap.Int64("collID", t.segmentInfo.CollectionID),
zap.Int64("partID", t.segmentInfo.PartitionID), zap.Int64("segID", t.segmentInfo.ID), zap.Error(err))
return err
}
log.Info("IndexCoord remove flushed segment success", zap.Int64("collID", t.segmentInfo.CollectionID),
log.Ctx(fsw.ctx).Info("IndexCoord remove flushed segment success", zap.Int64("collID", t.segmentInfo.CollectionID),
zap.Int64("partID", t.segmentInfo.PartitionID), zap.Int64("segID", t.segmentInfo.ID))
return nil
}

View File

@ -76,14 +76,14 @@ func (gc *garbageCollector) Stop() {
func (gc *garbageCollector) recycleUnusedIndexes() {
defer gc.wg.Done()
log.Info("IndexCoord garbageCollector recycleUnusedIndexes start")
log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedIndexes start")
ticker := time.NewTicker(gc.gcMetaDuration)
defer ticker.Stop()
for {
select {
case <-gc.ctx.Done():
log.Info("IndexCoord garbageCollector recycleUnusedMetaLoop context has done")
log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedMetaLoop context has done")
return
case <-ticker.C:
deletedIndexes := gc.metaTable.GetDeletedIndexes()
@ -91,7 +91,7 @@ func (gc *garbageCollector) recycleUnusedIndexes() {
buildIDs := gc.metaTable.GetBuildIDsFromIndexID(index.IndexID)
if len(buildIDs) == 0 {
if err := gc.metaTable.RemoveIndex(index.CollectionID, index.IndexID); err != nil {
log.Warn("IndexCoord remove index on collection fail", zap.Int64("collID", index.CollectionID),
log.Ctx(gc.ctx).Warn("IndexCoord remove index on collection fail", zap.Int64("collID", index.CollectionID),
zap.Int64("indexID", index.IndexID), zap.Error(err))
continue
}
@ -99,7 +99,7 @@ func (gc *garbageCollector) recycleUnusedIndexes() {
for _, buildID := range buildIDs {
segIdx, ok := gc.metaTable.GetMeta(buildID)
if !ok {
log.Debug("IndexCoord get segment index is not exist", zap.Int64("buildID", buildID))
log.Ctx(gc.ctx).Debug("IndexCoord get segment index is not exist", zap.Int64("buildID", buildID))
continue
}
if segIdx.NodeID != 0 {
@ -107,13 +107,15 @@ func (gc *garbageCollector) recycleUnusedIndexes() {
continue
}
if err := gc.metaTable.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID); err != nil {
log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID),
log.Ctx(gc.ctx).Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("nodeID", segIdx.NodeID), zap.Error(err))
continue
}
log.Info("IndexCoord remove segment index meta success", zap.Int64("buildID", segIdx.BuildID),
log.Ctx(gc.ctx).Info("IndexCoord remove segment index meta success", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("nodeID", segIdx.NodeID))
}
log.Ctx(gc.ctx).Info("garbageCollector remove index success", zap.Int64("collID", index.CollectionID),
zap.Int64("indexID", index.IndexID))
}
}
}
@ -135,12 +137,12 @@ func (gc *garbageCollector) recycleSegIndexesMeta() {
PartitionID: -1,
})
if err != nil {
log.Warn("IndexCoord garbageCollector get flushed segments from DataCoord fail",
log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector get flushed segments from DataCoord fail",
zap.Int64("collID", collID), zap.Error(err))
return
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("IndexCoord garbageCollector get flushed segments from DataCoord fail", zap.Int64("collID", collID),
log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector get flushed segments from DataCoord fail", zap.Int64("collID", collID),
zap.String("fail reason", resp.Status.Reason))
return
}
@ -153,7 +155,7 @@ func (gc *garbageCollector) recycleSegIndexesMeta() {
continue
}
if _, ok := flushedSegments[segID]; !ok {
log.Debug("segment is already not exist, mark it deleted", zap.Int64("collID", collID),
log.Ctx(gc.ctx).Debug("segment is already not exist, mark it deleted", zap.Int64("collID", collID),
zap.Int64("segID", segID))
if err := gc.metaTable.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
@ -171,18 +173,18 @@ func (gc *garbageCollector) recycleSegIndexesMeta() {
continue
}
if err := gc.metaTable.RemoveSegmentIndex(meta.CollectionID, meta.PartitionID, meta.SegmentID, meta.BuildID); err != nil {
log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", meta.BuildID),
log.Ctx(gc.ctx).Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", meta.BuildID),
zap.Int64("nodeID", meta.NodeID), zap.Error(err))
continue
}
log.Debug("index meta recycle success", zap.Int64("buildID", meta.BuildID))
log.Ctx(gc.ctx).Debug("index meta recycle success", zap.Int64("buildID", meta.BuildID))
}
}
}
func (gc *garbageCollector) recycleUnusedSegIndexes() {
defer gc.wg.Done()
log.Info("IndexCoord garbageCollector recycleUnusedSegIndexes start")
log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedSegIndexes start")
ticker := time.NewTicker(gc.gcMetaDuration)
defer ticker.Stop()
@ -190,7 +192,7 @@ func (gc *garbageCollector) recycleUnusedSegIndexes() {
for {
select {
case <-gc.ctx.Done():
log.Info("IndexCoord garbageCollector recycleUnusedMetaLoop context has done")
log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedMetaLoop context has done")
return
case <-ticker.C:
gc.recycleSegIndexesMeta()
@ -201,7 +203,7 @@ func (gc *garbageCollector) recycleUnusedSegIndexes() {
// recycleUnusedIndexFiles is used to delete those index files that no longer exist in the meta.
func (gc *garbageCollector) recycleUnusedIndexFiles() {
defer gc.wg.Done()
log.Info("IndexCoord garbageCollector start recycleUnusedIndexFiles loop")
log.Ctx(gc.ctx).Info("IndexCoord garbageCollector start recycleUnusedIndexFiles loop")
ticker := time.NewTicker(gc.gcFileDuration)
defer ticker.Stop()
@ -215,35 +217,35 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
// list dir first
keys, _, err := gc.chunkManager.ListWithPrefix(prefix, false)
if err != nil {
log.Error("IndexCoord garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err))
log.Ctx(gc.ctx).Error("IndexCoord garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err))
continue
}
for _, key := range keys {
log.Debug("indexFiles keys", zap.String("key", key))
log.Ctx(gc.ctx).Debug("indexFiles keys", zap.String("key", key))
buildID, err := parseBuildIDFromFilePath(key)
if err != nil {
log.Error("IndexCoord garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err))
log.Ctx(gc.ctx).Error("IndexCoord garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err))
continue
}
log.Info("IndexCoord garbageCollector will recycle index files", zap.Int64("buildID", buildID))
log.Ctx(gc.ctx).Info("IndexCoord garbageCollector will recycle index files", zap.Int64("buildID", buildID))
if !gc.metaTable.HasBuildID(buildID) {
// buildID no longer exists in meta, remove all index files
log.Info("IndexCoord garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files",
log.Ctx(gc.ctx).Info("IndexCoord garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files",
zap.Int64("buildID", buildID))
err = gc.chunkManager.RemoveWithPrefix(key)
if err != nil {
log.Warn("IndexCoord garbageCollector recycleUnusedIndexFiles remove index files failed",
log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector recycleUnusedIndexFiles remove index files failed",
zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err))
continue
}
continue
}
log.Info("index meta can be recycled, recycle index files", zap.Int64("buildID", buildID))
log.Ctx(gc.ctx).Info("index meta can be recycled, recycle index files", zap.Int64("buildID", buildID))
canRecycle, indexFilePaths := gc.metaTable.GetIndexFilePathByBuildID(buildID)
if !canRecycle {
// Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc,
// and delete all index files about the buildID at one time.
log.Warn("IndexCoord garbageCollector can not recycle index files", zap.Int64("buildID", buildID))
log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector can not recycle index files", zap.Int64("buildID", buildID))
continue
}
filesMap := make(map[string]bool)
@ -252,24 +254,24 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
}
files, _, err := gc.chunkManager.ListWithPrefix(key, true)
if err != nil {
log.Warn("IndexCoord garbageCollector recycleUnusedIndexFiles list files failed",
log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector recycleUnusedIndexFiles list files failed",
zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err))
continue
}
log.Info("recycle index files", zap.Int64("buildID", buildID), zap.Int("meta files num", len(filesMap)),
log.Ctx(gc.ctx).Info("recycle index files", zap.Int64("buildID", buildID), zap.Int("meta files num", len(filesMap)),
zap.Int("chunkManager files num", len(files)))
deletedFilesNum := 0
for _, file := range files {
if _, ok := filesMap[file]; !ok {
if err = gc.chunkManager.Remove(file); err != nil {
log.Warn("IndexCoord garbageCollector recycleUnusedIndexFiles remove file failed",
log.Ctx(gc.ctx).Warn("IndexCoord garbageCollector recycleUnusedIndexFiles remove file failed",
zap.Int64("buildID", buildID), zap.String("file", file), zap.Error(err))
continue
}
deletedFilesNum++
}
}
log.Info("index files recycle success", zap.Int64("buildID", buildID),
log.Ctx(gc.ctx).Info("index files recycle success", zap.Int64("buildID", buildID),
zap.Int("delete index files num", deletedFilesNum))
}
}

View File

@ -137,14 +137,14 @@ func (ib *indexBuilder) enqueue(buildID UniqueID) {
func (ib *indexBuilder) schedule() {
// receive notifyChan
// time ticker
log.Info("index builder schedule loop start")
log.Ctx(ib.ctx).Info("index builder schedule loop start")
defer ib.wg.Done()
ticker := time.NewTicker(ib.scheduleDuration)
defer ticker.Stop()
for {
select {
case <-ib.ctx.Done():
log.Warn("index builder ctx done")
log.Ctx(ib.ctx).Warn("index builder ctx done")
return
case _, ok := <-ib.notifyChan:
if ok {
@ -169,14 +169,18 @@ func (ib *indexBuilder) run() {
return buildIDs[i] < buildIDs[j]
})
if len(buildIDs) > 0 {
log.Info("index builder task schedule", zap.Int("task num", len(buildIDs)))
log.Ctx(ib.ctx).Info("index builder task schedule", zap.Int("task num", len(buildIDs)))
}
for _, buildID := range buildIDs {
ib.process(buildID)
ok := ib.process(buildID)
if !ok {
log.Ctx(ib.ctx).Debug("there is no IndexNode available or etcd is not serviceable, wait a minute...")
break
}
}
}
func (ib *indexBuilder) process(buildID UniqueID) {
func (ib *indexBuilder) process(buildID UniqueID) bool {
ib.taskMutex.RLock()
state := ib.tasks[buildID]
ib.taskMutex.RUnlock()
@ -193,23 +197,24 @@ func (ib *indexBuilder) process(buildID UniqueID) {
delete(ib.tasks, buildID)
}
log.Info("index task is processing", zap.Int64("buildID", buildID), zap.String("task state", state.String()))
log.Ctx(ib.ctx).RatedDebug(10, "index task is processing", zap.Int64("buildID", buildID), zap.String("task state", state.String()))
meta, exist := ib.meta.GetMeta(buildID)
if !exist {
log.Debug("index task has not exist in meta table, remove task", zap.Int64("buildID", buildID))
log.Ctx(ib.ctx).RatedDebug(10, "index task has not exist in meta table, remove task", zap.Int64("buildID", buildID))
deleteFunc(buildID)
return
return true
}
switch state {
case indexTaskInit:
if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) {
log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID))
deleteFunc(buildID)
return
return true
}
log.Debug("task state is init, build index ...", zap.Int64("buildID", buildID),
zap.Int64("segID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
if meta.NumRows < Params.IndexCoordCfg.MinSegmentNumRowsToEnableIndex {
log.Ctx(ib.ctx).Debug("segment num rows is too few, no need to build index", zap.Int64("buildID", buildID),
zap.Int64("segID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
@ -217,42 +222,42 @@ func (ib *indexBuilder) process(buildID UniqueID) {
SerializedSize: 0,
FailReason: "",
}); err != nil {
log.Error("IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err))
return
log.Ctx(ib.ctx).RatedWarn(10, "IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err))
return false
}
updateStateFunc(buildID, indexTaskDone)
return
return true
}
// peek client
// if all IndexNodes are executing task, wait for one of them to finish the task.
nodeID, client := ib.ic.nodeManager.PeekClient(meta)
if client == nil {
log.RatedDebug(30, "index builder peek client error, there is no available")
return
log.Ctx(ib.ctx).RatedDebug(10, "index builder peek client error, there is no available")
return false
}
// update version and set nodeID
if err := ib.meta.UpdateVersion(buildID, nodeID); err != nil {
log.Error("index builder update index version failed", zap.Int64("build", buildID), zap.Error(err))
return
log.Ctx(ib.ctx).RatedWarn(10, "index builder update index version failed", zap.Int64("build", buildID), zap.Error(err))
return false
}
// acquire lock
if err := ib.ic.tryAcquireSegmentReferLock(ib.ctx, buildID, nodeID, []UniqueID{meta.SegmentID}); err != nil {
log.Error("index builder acquire segment reference lock failed", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).RatedWarn(10, "index builder acquire segment reference lock failed", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID), zap.Error(err))
updateStateFunc(buildID, indexTaskRetry)
return
return false
}
info, err := ib.ic.pullSegmentInfo(ib.ctx, meta.SegmentID)
if err != nil {
log.Error("IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID),
log.Ctx(ib.ctx).RatedWarn(10, "IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID),
zap.Int64("buildID", buildID), zap.Error(err))
if errors.Is(err, ErrSegmentNotFound) {
updateStateFunc(buildID, indexTaskDeleted)
return
return true
}
updateStateFunc(buildID, indexTaskRetry)
return
return false
}
binLogs := make([]string, 0)
fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
@ -298,89 +303,91 @@ func (ib *indexBuilder) process(buildID UniqueID) {
TypeParams: typeParams,
NumRows: meta.NumRows,
}
log.Debug("assign task to indexNode", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
log.Ctx(ib.ctx).RatedDebug(10, "assign task to indexNode", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
if err := ib.ic.assignTask(client, req); err != nil {
// need to release lock then reassign, so set task state to retry
log.Error("index builder assign task to IndexNode failed", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).RatedWarn(10, "index builder assign task to IndexNode failed", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID), zap.Error(err))
updateStateFunc(buildID, indexTaskRetry)
return
return false
}
// update index meta state to InProgress
if err := ib.meta.BuildIndex(buildID); err != nil {
// need to release lock then reassign, so set task state to retry
log.Error("index builder update index meta to InProgress failed", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).RatedWarn(10, "index builder update index meta to InProgress failed", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID), zap.Error(err))
updateStateFunc(buildID, indexTaskRetry)
return
return false
}
log.Debug("index task assigned success", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
log.Ctx(ib.ctx).RatedDebug(10, "index task assigned success", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
updateStateFunc(buildID, indexTaskInProgress)
case indexTaskDone:
log.Debug("index task has done", zap.Int64("buildID", buildID))
log.Ctx(ib.ctx).RatedDebug(10, "index task has done", zap.Int64("buildID", buildID))
if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) {
log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID))
updateStateFunc(buildID, indexTaskDeleted)
return
return true
}
if !ib.dropIndexTask(buildID, meta.NodeID) {
return
return true
}
if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil {
// release lock failed, no need to modify state, wait to retry
log.Error("index builder try to release reference lock failed", zap.Error(err))
return
log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err))
return false
}
deleteFunc(buildID)
case indexTaskRetry:
log.Debug("index task state is retry, try to release reference lock", zap.Int64("buildID", buildID))
if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) {
log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID))
updateStateFunc(buildID, indexTaskDeleted)
return
return true
}
if err := ib.releaseLockAndResetTask(buildID, meta.NodeID); err != nil {
// release lock failed, no need to modify state, wait to retry
log.Error("index builder try to release reference lock failed", zap.Error(err))
return
log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err))
return false
}
updateStateFunc(buildID, indexTaskInit)
case indexTaskDeleted:
log.Debug("index task state is deleted, try to release reference lock", zap.Int64("buildID", buildID))
log.Ctx(ib.ctx).Debug("index task state is deleted, try to release reference lock", zap.Int64("buildID", buildID))
// TODO: delete after QueryCoordV2
if err := ib.meta.MarkSegmentsIndexAsDeletedByBuildID([]int64{buildID}); err != nil {
return
return false
}
if meta.NodeID != 0 {
if !ib.dropIndexTask(buildID, meta.NodeID) {
log.Ctx(ib.ctx).Warn("index task state is deleted and drop index job for node fail", zap.Int64("build", buildID),
zap.Int64("nodeID", meta.NodeID))
return
return true
}
if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil {
// release lock failed, no need to modify state, wait to retry
log.Error("index builder try to release reference lock failed", zap.Error(err))
return
log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err))
return false
}
}
// reset nodeID success, remove task.
deleteFunc(buildID)
default:
log.Debug("index task is in progress", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Debug("index task is in progress", zap.Int64("buildID", buildID),
zap.String("state", meta.IndexState.String()))
if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) {
log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID))
updateStateFunc(buildID, indexTaskDeleted)
return
return true
}
updateStateFunc(buildID, ib.getTaskState(buildID, meta.NodeID))
}
return true
}
func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
log.Info("IndexCoord indexBuilder get index task state", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
log.Ctx(ib.ctx).Info("IndexCoord indexBuilder get index task state", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
client, exist := ib.ic.nodeManager.GetClientByID(nodeID)
if exist {
response, err := client.QueryJobs(ib.ctx, &indexpb.QueryJobsRequest{
@ -388,12 +395,12 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
BuildIDs: []int64{buildID},
})
if err != nil {
log.Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
log.Ctx(ib.ctx).Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
zap.Error(err))
return indexTaskInProgress
}
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
log.Ctx(ib.ctx).Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
zap.Int64("buildID", buildID), zap.String("fail reason", response.Status.Reason))
return indexTaskInProgress
}
@ -402,22 +409,22 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
for _, info := range response.IndexInfos {
if info.BuildID == buildID {
if info.State == commonpb.IndexState_Failed || info.State == commonpb.IndexState_Finished {
log.Info("this task has been finished", zap.Int64("buildID", info.BuildID),
log.Ctx(ib.ctx).Info("this task has been finished", zap.Int64("buildID", info.BuildID),
zap.String("index state", info.State.String()))
if err := ib.meta.FinishTask(info); err != nil {
log.Error("IndexCoord update index state fail", zap.Int64("buildID", info.BuildID),
log.Ctx(ib.ctx).Error("IndexCoord update index state fail", zap.Int64("buildID", info.BuildID),
zap.String("index state", info.State.String()), zap.Error(err))
return indexTaskInProgress
}
return indexTaskDone
} else if info.State == commonpb.IndexState_Retry || info.State == commonpb.IndexState_IndexStateNone {
log.Info("this task should be retry", zap.Int64("buildID", buildID))
log.Ctx(ib.ctx).Info("this task should be retry", zap.Int64("buildID", buildID), zap.String("fail reason", info.FailReason))
return indexTaskRetry
}
return indexTaskInProgress
}
}
log.Info("this task should be retry, indexNode does not have this task", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Info("this task should be retry, indexNode does not have this task", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID))
return indexTaskRetry
}
@ -426,7 +433,7 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
}
func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool {
log.Info("IndexCoord notify IndexNode drop the index task", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
log.Ctx(ib.ctx).Info("IndexCoord notify IndexNode drop the index task", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
client, exist := ib.ic.nodeManager.GetClientByID(nodeID)
if exist {
status, err := client.DropJobs(ib.ctx, &indexpb.DropJobsRequest{
@ -434,12 +441,12 @@ func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool {
BuildIDs: []UniqueID{buildID},
})
if err != nil {
log.Warn("IndexCoord notify IndexNode drop the index task fail", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Warn("IndexCoord notify IndexNode drop the index task fail", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID), zap.Error(err))
return false
}
if status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("IndexCoord notify IndexNode drop the index task fail", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Warn("IndexCoord notify IndexNode drop the index task fail", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID), zap.String("fail reason", status.Reason))
return false
}
@ -449,37 +456,37 @@ func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool {
}
func (ib *indexBuilder) releaseLockAndResetNode(buildID UniqueID, nodeID UniqueID) error {
log.Info("release segment reference lock and reset nodeID", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Info("release segment reference lock and reset nodeID", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID))
if err := ib.ic.tryReleaseSegmentReferLock(ib.ctx, buildID, nodeID); err != nil {
// release lock failed, no need to modify state, wait to retry
log.Error("index builder try to release reference lock failed", zap.Error(err))
log.Ctx(ib.ctx).Error("index builder try to release reference lock failed", zap.Error(err))
return err
}
if err := ib.meta.ResetNodeID(buildID); err != nil {
log.Error("index builder try to reset nodeID failed", zap.Error(err))
log.Ctx(ib.ctx).Error("index builder try to reset nodeID failed", zap.Error(err))
return err
}
log.Info("release segment reference lock and reset nodeID success", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Info("release segment reference lock and reset nodeID success", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID))
return nil
}
func (ib *indexBuilder) releaseLockAndResetTask(buildID UniqueID, nodeID UniqueID) error {
log.Info("release segment reference lock and reset task", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Info("release segment reference lock and reset task", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID))
if nodeID != 0 {
if err := ib.ic.tryReleaseSegmentReferLock(ib.ctx, buildID, nodeID); err != nil {
// release lock failed, no need to modify state, wait to retry
log.Error("index builder try to release reference lock failed", zap.Error(err))
log.Ctx(ib.ctx).Error("index builder try to release reference lock failed", zap.Error(err))
return err
}
}
if err := ib.meta.ResetMeta(buildID); err != nil {
log.Error("index builder try to reset task failed", zap.Error(err))
log.Ctx(ib.ctx).Error("index builder try to reset task failed", zap.Error(err))
return err
}
log.Info("release segment reference lock and reset task success", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Info("release segment reference lock and reset task success", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID))
return nil
}

View File

@ -104,7 +104,6 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error {
// PeekClient peeks the client with the least load.
func (nm *NodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, types.IndexNode) {
log.Info("IndexCoord peek client")
allClients := nm.GetAllClients()
if len(allClients) == 0 {
log.Error("there is no IndexNode online")
@ -155,7 +154,7 @@ func (nm *NodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, types.Ind
return peekNodeID, allClients[peekNodeID]
}
log.Warn("IndexCoord peek client fail")
log.RatedDebug(30, "IndexCoord peek client fail")
return 0, nil
}

View File

@ -5,8 +5,6 @@ import (
"fmt"
"strconv"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
@ -16,7 +14,6 @@ import (
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/logutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/trace"
@ -25,13 +22,13 @@ import (
func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) {
stateCode := i.stateCode.Load().(internalpb.StateCode)
if stateCode != internalpb.StateCode_Healthy {
log.Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID))
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "state code is not healthy",
}, nil
}
log.Info("IndexNode building index ...",
log.Ctx(ctx).Info("IndexNode building index ...",
zap.String("ClusterID", req.ClusterID),
zap.Int64("IndexBuildID", req.BuildID),
zap.Int64("IndexID", req.IndexID),
@ -46,13 +43,12 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
sp.SetTag("IndexBuildID", strconv.FormatInt(req.BuildID, 10))
sp.SetTag("ClusterID", req.ClusterID)
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10), metrics.TotalLabel).Inc()
taskCtx := logutil.WithModule(i.loopCtx, typeutil.IndexNodeRole)
taskCtx, taskCancel := context.WithCancel(taskCtx)
taskCtx, taskCancel := context.WithCancel(i.loopCtx)
if oldInfo := i.loadOrStoreTask(req.ClusterID, req.BuildID, &taskInfo{
cancel: taskCancel,
state: commonpb.IndexState_InProgress}); oldInfo != nil {
log.Warn("duplicated index build task", zap.String("ClusterID", req.ClusterID), zap.Int64("BuildID", req.BuildID))
log.Ctx(ctx).Warn("duplicated index build task", zap.String("ClusterID", req.ClusterID), zap.Int64("BuildID", req.BuildID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_BuildIndexError,
Reason: "duplicated index build task",
@ -60,7 +56,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
}
cm, err := i.storageFactory.NewChunkManager(i.loopCtx, req.StorageConfig)
if err != nil {
log.Error("create chunk manager failed", zap.String("Bucket", req.StorageConfig.BucketName),
log.Ctx(ctx).Error("create chunk manager failed", zap.String("Bucket", req.StorageConfig.BucketName),
zap.String("AccessKey", req.StorageConfig.AccessKeyID),
zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID))
return &commonpb.Status{
@ -86,20 +82,20 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
Reason: "",
}
if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
log.Warn("IndexNode failed to schedule", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.Error(err))
log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.Error(err))
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
ret.Reason = err.Error()
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10), metrics.FailLabel).Inc()
return ret, nil
}
log.Info("IndexNode successfully scheduled", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.String("indexName", req.IndexName))
log.Ctx(ctx).Info("IndexNode successfully scheduled", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.String("indexName", req.IndexName))
return ret, nil
}
func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) {
stateCode := i.stateCode.Load().(internalpb.StateCode)
if stateCode != internalpb.StateCode_Healthy {
log.Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID))
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID))
return &indexpb.QueryJobsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -107,7 +103,6 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
},
}, nil
}
log.Debug("querying index build task", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs))
infos := make(map[UniqueID]*taskInfo)
i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) {
if ClusterID == req.ClusterID {
@ -139,16 +134,19 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
ret.IndexInfos[i].IndexFiles = info.indexFiles
ret.IndexInfos[i].SerializedSize = info.serializedSize
ret.IndexInfos[i].FailReason = info.failReason
log.Ctx(ctx).Debug("querying index build task", zap.String("ClusterID", req.ClusterID),
zap.Int64("IndexBuildID", buildID), zap.String("state", info.state.String()),
zap.String("fail reason", info.failReason))
}
}
return ret, nil
}
func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) (*commonpb.Status, error) {
log.Debug("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs))
log.Ctx(ctx).Debug("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs))
stateCode := i.stateCode.Load().(internalpb.StateCode)
if stateCode != internalpb.StateCode_Healthy {
log.Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID))
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "state code is not healthy",
@ -164,6 +162,8 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest)
info.cancel()
}
}
log.Ctx(ctx).Debug("drop index build jobs success", zap.String("ClusterID", req.ClusterID),
zap.Int64s("IndexBuildIDs", req.BuildIDs))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
@ -173,7 +173,7 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest)
func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) {
stateCode := i.stateCode.Load().(internalpb.StateCode)
if stateCode != internalpb.StateCode_Healthy {
log.Warn("index node not ready", zap.Int32("state", int32(stateCode)))
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)))
return &indexpb.GetJobStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -192,7 +192,7 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
if i.sched.buildParallel > unissued+active {
slots = i.sched.buildParallel - unissued - active
}
logutil.Logger(ctx).Info("Get Index Job Stats", zap.Int("Unissued", unissued), zap.Int("Active", active), zap.Int("Slot", slots))
log.Ctx(ctx).Info("Get Index Job Stats", zap.Int("Unissued", unissued), zap.Int("Active", active), zap.Int("Slot", slots))
return &indexpb.GetJobStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -210,7 +210,7 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
if !i.isHealthy() {
log.Warn("IndexNode.GetMetrics failed",
log.Ctx(ctx).Warn("IndexNode.GetMetrics failed",
zap.Int64("node_id", Params.IndexNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.Error(errIndexNodeIsUnhealthy(Params.IndexNodeCfg.GetNodeID())))
@ -226,7 +226,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("IndexNode.GetMetrics failed to parse metric type",
log.Ctx(ctx).Warn("IndexNode.GetMetrics failed to parse metric type",
zap.Int64("node_id", Params.IndexNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.Error(err))
@ -243,7 +243,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
if metricType == metricsinfo.SystemInfoMetrics {
metrics, err := getSystemInfoMetrics(ctx, req, i)
log.Debug("IndexNode.GetMetrics",
log.Ctx(ctx).Debug("IndexNode.GetMetrics",
zap.Int64("node_id", Params.IndexNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.String("metric_type", metricType),
@ -252,7 +252,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
return metrics, nil
}
log.Warn("IndexNode.GetMetrics failed, request metric type is not implemented yet",
log.Ctx(ctx).Warn("IndexNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("node_id", Params.IndexNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.String("metric_type", metricType))

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
@ -136,7 +137,7 @@ func (it *indexBuildTask) GetState() commonpb.IndexState {
func (it *indexBuildTask) OnEnqueue(ctx context.Context) error {
it.statistic.StartTime = time.Now().UnixMicro()
it.statistic.PodID = it.node.GetNodeID()
logutil.Logger(ctx).Debug("IndexNode IndexBuilderTask Enqueue")
log.Ctx(ctx).Debug("IndexNode IndexBuilderTask Enqueue")
return nil
}
@ -193,7 +194,7 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
var err error
it.statistic.Dim, err = strconv.ParseInt(dimStr, 10, 64)
if err != nil {
logutil.Logger(ctx).Error("parse dimesion failed", zap.Error(err))
log.Ctx(ctx).Error("parse dimesion failed", zap.Error(err))
// ignore error
}
}
@ -207,7 +208,7 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
// var err error
// it.cm, err = factory.NewVectorStorageChunkManager(ctx)
// if err != nil {
// logutil.Logger(ctx).Error("init chunk manager failed", zap.Error(err), zap.String("BucketName", it.req.BucketName), zap.String("StorageAccessKey", it.req.StorageAccessKey))
// log.Ctx(ctx).Error("init chunk manager failed", zap.Error(err), zap.String("BucketName", it.req.BucketName), zap.String("StorageAccessKey", it.req.StorageAccessKey))
// return err
// }
return nil
@ -253,11 +254,11 @@ func (it *indexBuildTask) LoadData(ctx context.Context) error {
// gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value
err := funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.GOMAXPROCS(0), loadKey, "loadKey")
if err != nil {
logutil.Logger(it.ctx).Warn("loadKey failed", zap.Error(err))
log.Ctx(ctx).Warn("loadKey failed", zap.Error(err))
return err
}
loadVectorDuration := it.tr.RecordSpan().Milliseconds()
logutil.Logger(ctx).Debug("indexnode load data success")
log.Ctx(ctx).Debug("indexnode load data success")
it.tr.Record("load field data done")
metrics.IndexNodeLoadFieldLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(loadVectorDuration))
@ -283,13 +284,13 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
if dType != schemapb.DataType_None {
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams)
if err != nil {
logutil.Logger(ctx).Error("failed to create index", zap.Error(err))
log.Ctx(ctx).Error("failed to create index", zap.Error(err))
return err
}
err = it.index.Build(dataset)
if err != nil {
logutil.Logger(ctx).Error("failed to build index", zap.Error(err))
log.Ctx(ctx).Error("failed to build index", zap.Error(err))
return err
}
}
@ -298,7 +299,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
it.tr.Record("build index done")
indexBlobs, err := it.index.Serialize()
if err != nil {
logutil.Logger(ctx).Error("IndexNode index Serialize failed", zap.Error(err))
log.Ctx(ctx).Error("IndexNode index Serialize failed", zap.Error(err))
return err
}
it.tr.Record("index serialize done")
@ -311,7 +312,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
// early release index for gc, and we can ensure that Delete is idempotent.
if err := it.index.Delete(); err != nil {
logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
log.Ctx(ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
}
var serializedIndexBlobs []*storage.Blob
@ -341,7 +342,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk {
logutil.Logger(ctx).Error("IndexNode don't support build disk index",
log.Ctx(ctx).Error("IndexNode don't support build disk index",
zap.String("index type", it.newIndexParams["index_type"]),
zap.Bool("enable disk", Params.IndexNodeCfg.EnableDisk))
return errors.New("index node don't support build disk index")
@ -350,7 +351,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
// check load size and size of field data
localUsedSize, err := indexcgowrapper.GetLocalUsedSize()
if err != nil {
logutil.Logger(ctx).Error("IndexNode get local used size failed")
log.Ctx(ctx).Error("IndexNode get local used size failed")
return errors.New("index node get local used size failed")
}
@ -358,7 +359,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
maxUsedLocalSize := int64(float64(Params.IndexNodeCfg.DiskCapacityLimit) * Params.IndexNodeCfg.MaxDiskUsagePercentage)
if usedLocalSizeWhenBuild > maxUsedLocalSize {
logutil.Logger(ctx).Error("IndexNode don't has enough disk size to build disk ann index",
log.Ctx(ctx).Error("IndexNode don't has enough disk size to build disk ann index",
zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild),
zap.Int64("maxUsedLocalSize", maxUsedLocalSize))
return errors.New("index node don't has enough disk size to build disk ann index")
@ -378,18 +379,18 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams)
if err != nil {
logutil.Logger(ctx).Error("failed to create index", zap.Error(err))
log.Ctx(ctx).Error("failed to create index", zap.Error(err))
return err
}
err = it.index.Build(dataset)
if err != nil {
if it.index.CleanLocalData() != nil {
logutil.Logger(ctx).Error("failed to clean cached data on disk after build index failed",
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",
zap.Int64("buildID", it.BuildID),
zap.Int64("index version", it.req.GetIndexVersion()))
}
logutil.Logger(ctx).Error("failed to build index", zap.Error(err))
log.Ctx(ctx).Error("failed to build index", zap.Error(err))
return err
}
}
@ -400,7 +401,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
indexBlobs, err := it.index.SerializeDiskIndex()
if err != nil {
logutil.Logger(ctx).Error("IndexNode index Serialize failed", zap.Error(err))
log.Ctx(ctx).Error("IndexNode index Serialize failed", zap.Error(err))
return err
}
it.tr.Record("index serialize done")
@ -413,7 +414,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
// early release index for gc, and we can ensure that Delete is idempotent.
if err := it.index.Delete(); err != nil {
logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
log.Ctx(it.ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
}
encodeIndexFileDur := it.tr.Record("index codec serialize done")
@ -443,7 +444,7 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
return it.cm.Write(savePath, blob.Value)
}
if err := retry.Do(ctx, saveFn, retry.Attempts(5)); err != nil {
logutil.Logger(ctx).Warn("index node save index file failed", zap.Error(err), zap.String("savePath", savePath))
log.Ctx(ctx).Warn("index node save index file failed", zap.Error(err), zap.String("savePath", savePath))
return err
}
savePaths[idx] = savePath
@ -452,17 +453,17 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
// If an error occurs, return the error that the task state will be set to retry.
if err := funcutil.ProcessFuncParallel(blobCnt, runtime.NumCPU(), saveIndexFile, "saveIndexFile"); err != nil {
logutil.Logger(it.ctx).Error("saveIndexFile fail")
log.Ctx(ctx).Error("saveIndexFile fail")
return err
}
it.savePaths = savePaths
it.statistic.EndTime = time.Now().UnixMicro()
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, savePaths, it.serializedSize, &it.statistic)
logutil.Logger(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths))
log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths))
saveIndexFileDur := it.tr.Record("index file save done")
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds()))
it.tr.Elapse("index building all done")
logutil.Logger(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID),
log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID),
zap.Int64("partition", it.partitionID), zap.Int64("SegmentId", it.segmentID))
return nil
}
@ -501,7 +502,7 @@ func (it *indexBuildTask) SaveDiskAnnIndexFiles(ctx context.Context) error {
return it.cm.Write(indexParamPath, indexParamBlob.Value)
}
if err := retry.Do(ctx, saveFn, retry.Attempts(5)); err != nil {
logutil.Logger(ctx).Warn("index node save index param file failed", zap.Error(err), zap.String("savePath", indexParamPath))
log.Ctx(ctx).Warn("index node save index param file failed", zap.Error(err), zap.String("savePath", indexParamPath))
return err
}
@ -510,11 +511,11 @@ func (it *indexBuildTask) SaveDiskAnnIndexFiles(ctx context.Context) error {
it.statistic.EndTime = time.Now().UnixMicro()
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, savePaths, it.serializedSize, &it.statistic)
logutil.Logger(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths))
log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths))
saveIndexFileDur := it.tr.Record("index file save done")
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds()))
it.tr.Elapse("index building all done")
logutil.Logger(ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID),
log.Ctx(ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID),
zap.Int64("partition", it.partitionID), zap.Int64("segment", it.segmentID))
return nil
}
@ -535,7 +536,7 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob
it.partitionID = partitionID
it.segmentID = segmentID
logutil.Logger(ctx).Debug("indexnode deserialize data success",
log.Ctx(ctx).Debug("indexnode deserialize data success",
zap.Int64("index id", it.req.IndexID),
zap.String("index name", it.req.IndexName),
zap.Int64("collectionID", it.collectionID),
@ -557,45 +558,3 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob
it.fieldData = data
return nil
}
// Execute actually performs the task of building an index.
//func (it *indexBuildTask) Execute(ctx context.Context) error {
// logutil.Logger(it.ctx).Debug("IndexNode indexBuildTask Execute ...")
// sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute")
// defer sp.Finish()
// select {
// case <-ctx.Done():
// logutil.Logger(it.ctx).Warn("build task was canceled")
// return errCancel
// default:
// if err := it.prepareParams(ctx); err != nil {
// it.SetState(commonpb.IndexState_Failed)
// logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute prepareParams failed", zap.Error(err))
// return err
// }
// defer it.releaseMemory()
// blobs, err := it.buildIndex(ctx)
// if err != nil {
// if errors.Is(err, ErrNoSuchKey) {
// it.SetState(commonpb.IndexState_Failed)
// logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute buildIndex failed", zap.Error(err))
// return err
// }
// it.SetState(commonpb.IndexState_Unissued)
// logutil.Logger(it.ctx).Error("IndexNode indexBuildTask Execute buildIndex failed, need to retry", zap.Error(err))
// return err
// }
// if err = it.saveIndex(ctx, blobs); err != nil {
// logutil.Logger(it.ctx).Warn("save index file failed", zap.Error(err))
// it.SetState(commonpb.IndexState_Unissued)
// return err
// }
// it.SetState(commonpb.IndexState_Finished)
// saveIndexFileDur := it.tr.Record("index file save done")
// metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds()))
// it.tr.Elapse("index building all done")
// logutil.Logger(it.ctx).Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID),
// zap.Int64("partition", it.partitionID), zap.Int64("segment", it.segmentID))
// return nil
// }
//}

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/logutil"
)
// TaskQueue is a queue used to store tasks.
@ -217,7 +216,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
for _, fn := range pipelines {
if err := wrap(fn); err != nil {
if err == errCancel {
logutil.Logger(t.Ctx()).Warn("index build task canceled", zap.String("task", t.Name()))
log.Ctx(t.Ctx()).Warn("index build task canceled", zap.String("task", t.Name()))
t.SetState(commonpb.IndexState_Failed, err.Error())
} else if errors.Is(err, ErrNoSuchKey) {
t.SetState(commonpb.IndexState_Failed, err.Error())

View File

@ -2,10 +2,10 @@ package indexnode
import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/indexpb"
)