From bc46e4780dea468d9a45a1f0a970b4955d98e0a5 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Sun, 6 Jun 2021 09:41:35 +0800 Subject: [PATCH] Fixbug:index is not loaded in some cases (#5633) * Fixbug:index is not created in some cases Signed-off-by: zhenshan.cao * Delete unecessary print Signed-off-by: zhenshan.cao * fix create index Signed-off-by: yefu.chen Co-authored-by: yefu.chen --- internal/indexnode/indexnode.go | 10 +- internal/indexnode/task.go | 47 +++++---- internal/indexnode/task_scheduler.go | 14 +-- internal/indexservice/indexservice.go | 63 ++++++------ internal/indexservice/meta_table.go | 116 +++++++++-------------- internal/indexservice/util.go | 50 +++++++++- internal/masterservice/master_service.go | 2 +- internal/masterservice/task.go | 6 ++ internal/querynode/index_loader.go | 1 + internal/querynode/load_service.go | 2 +- 10 files changed, 170 insertions(+), 141 deletions(-) diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 930c559093..0216b56ff6 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -66,7 +66,7 @@ type IndexNode struct { } func NewIndexNode(ctx context.Context) (*IndexNode, error) { - log.Debug("new index node ...") + log.Debug("New IndexNode ...") rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) b := &IndexNode{ @@ -192,7 +192,7 @@ func (i *IndexNode) SetIndexServiceClient(serviceClient types.IndexService) { func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateIndexRequest) (*commonpb.Status, error) { log.Debug("IndexNode building index ...", zap.Int64("IndexBuildID", request.IndexBuildID), - zap.String("Indexname", request.IndexName), + zap.String("IndexName", request.IndexName), zap.Int64("IndexID", request.IndexID), zap.Int64("Version", request.Version), zap.String("MetaPath", request.MetaPath), @@ -253,7 +253,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.Compone }, } - log.Debug("IndexNode compoents states", + log.Debug("IndexNode Component states", zap.Any("State", ret.State), zap.Any("Status", ret.Status), zap.Any("SubcomponentStates", ret.SubcomponentStates)) @@ -261,7 +261,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.Compone } func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - log.Debug("get indexnode time tick channel ...") + log.Debug("get IndexNode time tick channel ...") return &milvuspb.StringResponse{ Status: &commonpb.Status{ @@ -271,7 +271,7 @@ func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRes } func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - log.Debug("get indexnode statistics channel ...") + log.Debug("get IndexNode statistics channel ...") return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 506394872d..380b097eb5 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -113,7 +113,7 @@ func (bt *BaseTask) Name() string { func (it *IndexBuildTask) OnEnqueue() error { it.SetID(it.req.IndexBuildID) - log.Debug("indexnode", zap.Int64("[IndexBuilderTask] Enqueue TaskID", it.ID())) + log.Debug("IndexNode IndexBuilderTask Enqueue", zap.Int64("TaskID", it.ID())) return nil } @@ -122,17 +122,19 @@ func (it *IndexBuildTask) checkIndexMeta(pre bool) error { indexMeta := indexpb.IndexMeta{} _, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath) if err != nil { - log.Debug("IndexService", zap.Any("load meta error with path", it.req.MetaPath)) - log.Debug("IndexService", zap.Any("Load meta error", err)) + log.Debug("IndexNode checkIndexMeta", zap.Any("load meta error with path", it.req.MetaPath), + zap.Error(err), zap.Any("pre", pre)) return err } + log.Debug("IndexNode checkIndexMeta load meta success", zap.Any("path", it.req.MetaPath), zap.Any("pre", pre)) err = proto.UnmarshalText(values[0], &indexMeta) if err != nil { - log.Debug("IndexService", zap.Any("Unmarshal error", err)) + log.Debug("IndexNode checkIndexMeta Unmarshal", zap.Error(err)) return err } + log.Debug("IndexNode checkIndexMeta Unmarshal success", zap.Any("IndexMeta", indexMeta)) if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished { - log.Debug("IndexNode", zap.Any("Notify build index", "This version is not the latest version")) + log.Debug("IndexNode checkIndexMeta Notify build index this version is not the latest version", zap.Any("version", it.req.Version)) return nil } if indexMeta.MarkDeleted { @@ -152,24 +154,25 @@ func (it *IndexBuildTask) checkIndexMeta(pre bool) error { if it.err != nil { indexMeta.State = commonpb.IndexState_Failed } - log.Debug("IndexNode", zap.Any("MetaPath", it.req.MetaPath)) err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], proto.MarshalTextString(&indexMeta)) + log.Debug("IndexNode checkIndexMeta CompareVersionAndSwap", zap.Error(err)) return err } err := retry.Retry(3, time.Millisecond*200, fn) + log.Debug("IndexNode checkIndexMeta final", zap.Error(err)) return err } func (it *IndexBuildTask) PreExecute(ctx context.Context) error { - log.Debug("preExecute...") + log.Debug("IndexNode IndexBuildTask preExecute...") return it.checkIndexMeta(true) } func (it *IndexBuildTask) PostExecute(ctx context.Context) error { - log.Debug("PostExecute...") + log.Debug("IndexNode IndexBuildTask PostExecute...") defer func() { if it.err != nil { @@ -178,8 +181,8 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error { }() if it.serviceClient == nil { - err := errors.New("IndexBuildTask, serviceClient is nil") - log.Debug("[IndexBuildTask][PostExecute] serviceClient is nil") + err := errors.New("IndexNode IndexBuildTask PostExecute, serviceClient is nil") + log.Error("", zap.Error(err)) return err } @@ -187,7 +190,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error { } func (it *IndexBuildTask) Execute(ctx context.Context) error { - log.Debug("start build index ...") + log.Debug("IndexNode IndexBuildTask Execute ...") var err error typeParams := make(map[string]string) @@ -232,13 +235,13 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { it.index, err = NewCIndex(typeParams, indexParams) if err != nil { - log.Error("indexnode", zap.String("NewCIndex err:", err.Error())) + log.Error("IndexNode IndexBuildTask Execute NewCIndex failed", zap.Error(err)) return err } defer func() { err = it.index.Delete() if err != nil { - log.Warn("CIndexDelete Failed") + log.Warn("IndexNode IndexBuildTask Execute CIndexDelete Failed", zap.Error(err)) } }() @@ -305,7 +308,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { if fOk { err = it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data) if err != nil { - log.Error("indexnode", zap.String("BuildFloatVecIndexWithoutIds error", err.Error())) + log.Error("IndexNode BuildFloatVecIndexWithoutIds failed", zap.Error(err)) return err } } @@ -314,7 +317,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { if bOk { err = it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data) if err != nil { - log.Error("indexnode", zap.String("BuildBinaryVecIndexWithoutIds err", err.Error())) + log.Error("IndexNode BuildBinaryVecIndexWithoutIds failed", zap.Error(err)) return err } } @@ -325,7 +328,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { indexBlobs, err := it.index.Serialize() if err != nil { - log.Error("indexnode", zap.String("serialize err", err.Error())) + log.Error("IndexNode index Serialize failed", zap.Error(err)) return err } @@ -354,23 +357,25 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { saveIndexFileFn := func() error { v, err := it.etcdKV.Load(it.req.MetaPath) if err != nil { - log.Debug("IndexService", zap.Any("load meta error with path", it.req.MetaPath)) - log.Debug("IndexService", zap.Any("Load meta error", err)) + log.Debug("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err)) return err } indexMeta := indexpb.IndexMeta{} err = proto.UnmarshalText(v, &indexMeta) if err != nil { - log.Debug("IndexService", zap.Any("Unmarshal error", err)) + log.Debug("IndexNode Unmarshal indexMeta error ", zap.Error(err)) return err } + log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta)) if indexMeta.Version > it.req.Version { - log.Debug("IndexNode", zap.Any("Notify build index", "This version is not the latest version")) + log.Debug("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version), + zap.Any("indexMeta.Version", indexMeta.Version)) return errors.New("This task has been reassigned ") } return saveBlob(savePath, value) } err := retry.Retry(5, time.Millisecond*200, saveIndexFileFn) + log.Debug("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath)) if err != nil { return err } @@ -398,7 +403,7 @@ func (it *IndexBuildTask) Rollback() error { err := it.kv.MultiRemove(it.savePaths) if err != nil { - log.Warn("indexnode", zap.String("IndexBuildTask Rollback Failed", err.Error())) + log.Warn("IndexNode IndexBuildTask Rollback Failed", zap.Error(err)) return err } return nil diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index e99e4813b2..2730bf62b4 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -70,7 +70,7 @@ func (queue *BaseTaskQueue) addUnissuedTask(t task) error { defer queue.utLock.Unlock() if queue.utFull() { - return errors.New("task queue is full") + return errors.New("IndexNode task queue is full") } queue.unissuedTasks.PushBack(t) queue.utBufChan <- 1 @@ -82,7 +82,7 @@ func (queue *BaseTaskQueue) FrontUnissuedTask() task { defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { - log.Debug("FrontUnissuedTask sorry, but the unissued task list is empty!") + log.Debug("IndexNode FrontUnissuedTask sorry, but the unissued task list is empty!") return nil } @@ -94,7 +94,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task { defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { - log.Debug("PopUnissued task sorry, but the unissued task list is empty!") + log.Debug("IndexNode PopUnissued task sorry, but the unissued task list is empty!") return nil } @@ -111,7 +111,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) { tID := t.ID() _, ok := queue.activeTasks[tID] if ok { - log.Debug("indexnode", zap.Int64("task with ID %v already in active task list!", tID)) + log.Debug("IndexNode task already in activate task list", zap.Any("TaskID", tID)) } queue.activeTasks[tID] = t @@ -126,7 +126,7 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task { delete(queue.activeTasks, tID) return t } - log.Debug("indexnode", zap.Int64("sorry, but the ID was not found in the active task list!", tID)) + log.Debug("IndexNode the task was not found in the active task list", zap.Any("TaskID", tID)) return nil } @@ -201,7 +201,7 @@ func NewTaskScheduler(ctx context.Context, func (sched *TaskScheduler) setParallelism(parallel int) { if parallel <= 0 { - log.Debug("can not set parallelism to less than zero!") + log.Debug("IndexNode can not set parallelism to less than zero!") return } sched.buildParallel = parallel @@ -258,7 +258,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { } func (sched *TaskScheduler) indexBuildLoop() { - log.Debug("index build loop ...") + log.Debug("IndexNode TaskScheduler start build loop ...") defer sched.wg.Done() for { select { diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 990167dd1b..c471d97134 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -346,8 +346,7 @@ func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndex }, States: indexStates, } - log.Debug("get index states success") - log.Debug("get index states", + log.Debug("IndexService get index states success", zap.Any("index status", ret.Status), zap.Any("index states", ret.States)) @@ -355,7 +354,7 @@ func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndex } func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { - log.Debug("IndexService", zap.Int64("Drop Index ID", req.IndexID)) + log.Debug("IndexService DropIndex", zap.Any("IndexID", req.IndexID)) ret := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -376,12 +375,12 @@ func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequ }() }() - log.Debug("IndexService", zap.Int64("DropIndex success by ID", req.IndexID)) + log.Debug("IndexService DropIndex success", zap.Any("IndexID", req.IndexID)) return ret, nil } func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) { - log.Debug("IndexService", zap.Int64s("get index file paths", req.IndexBuildIDs)) + log.Debug("IndexService GetIndexFilePaths", zap.Int64s("IndexBuildIds", req.IndexBuildIDs)) var indexPaths []*indexpb.IndexFilePathInfo = nil for _, indexID := range req.IndexBuildIDs { @@ -391,7 +390,7 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn } indexPaths = append(indexPaths, indexPathInfo) } - log.Debug("IndexService, get index file paths success") + log.Debug("IndexService GetIndexFilePaths success") ret := &indexpb.GetIndexFilePathsResponse{ Status: &commonpb.Status{ @@ -399,7 +398,7 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn }, FilePaths: indexPaths, } - log.Debug("IndexService", zap.Any("index file paths", ret.FilePaths)) + log.Debug("IndexService GetIndexFilePaths ", zap.Any("FilePaths", ret.FilePaths)) return ret, nil } @@ -414,12 +413,12 @@ func (i *IndexService) tsLoop() { select { case <-tsoTicker.C: if err := i.idAllocator.UpdateID(); err != nil { - log.Debug("IndexService", zap.String("failed to update id", err.Error())) + log.Debug("IndexService tsLoop UpdateID failed", zap.Error(err)) return } case <-ctx.Done(): // Server is closed and it should return nil. - log.Debug("tsLoop is closed") + log.Debug("IndexService tsLoop is closed") return } } @@ -432,7 +431,7 @@ func (i *IndexService) recycleUnusedIndexFiles() { defer i.loopWg.Done() timeTicker := time.NewTicker(durationInterval) - log.Debug("IndexService start recycle unused index files loop") + log.Debug("IndexService start recycleUnusedIndexFiles loop") for { select { @@ -444,18 +443,20 @@ func (i *IndexService) recycleUnusedIndexFiles() { if meta.indexMeta.MarkDeleted { unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil { - log.Debug("IndexService", zap.String("Remove index files error", err.Error())) + log.Debug("IndexService recycleUnusedIndexFiles Remove index files failed", + zap.Any("MarkDeleted", true), zap.Error(err)) } i.metaTable.DeleteIndex(meta.indexMeta.IndexBuildID) } else { for j := 1; j < int(meta.indexMeta.Version); j++ { unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) + "/" + strconv.Itoa(j) if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil { - log.Debug("IndexService", zap.String("Remove index files error", err.Error())) + log.Debug("IndexService recycleUnusedIndexFiles Remove index files failed", + zap.Any("MarkDeleted", false), zap.Error(err)) } } if err := i.metaTable.UpdateRecycleState(meta.indexMeta.IndexBuildID); err != nil { - log.Debug("IndexService", zap.String("Remove index files error", err.Error())) + log.Debug("IndexService recycleUnusedIndexFiles UpdateRecycleState failed", zap.Error(err)) } } } @@ -469,7 +470,7 @@ func (i *IndexService) assignmentTasksLoop() { defer cancel() defer i.loopWg.Done() - log.Debug("IndexService start assign tasks loop") + log.Debug("IndexService start assignmentTasksLoop start") for { select { @@ -478,16 +479,16 @@ func (i *IndexService) assignmentTasksLoop() { case indexBuildIDs := <-i.assignChan: for _, indexBuildID := range indexBuildIDs { meta := i.metaTable.GetIndexMeta(indexBuildID) - log.Debug("IndexService", zap.Any("Meta", meta)) + log.Debug("IndexService assignmentTasksLoop ", zap.Any("Meta", meta)) if meta.indexMeta.State == commonpb.IndexState_Finished { continue } if err := i.metaTable.UpdateVersion(indexBuildID); err != nil { - log.Debug("IndexService", zap.String("build index update version err", err.Error())) + log.Debug("IndexService assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err)) } nodeID, builderClient := i.nodeClients.PeekClient() if builderClient == nil { - log.Debug("IndexService has no available IndexNode") + log.Debug("IndexService assignmentTasksLoop can not find available IndexNode") i.assignChan <- []UniqueID{indexBuildID} continue } @@ -503,13 +504,13 @@ func (i *IndexService) assignmentTasksLoop() { } resp, err := builderClient.CreateIndex(ctx, req) if err != nil { - log.Debug("IndexService", zap.String("build index err", err.Error())) - } - if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil { - log.Debug("IndexService", zap.String("update meta table error", err.Error())) + log.Debug("IndexService assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err)) } if resp.ErrorCode != commonpb.ErrorCode_Success { - log.Debug("IndexService", zap.String("build index err", resp.Reason)) + log.Debug("IndexService assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason)) + } + if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil { + log.Debug("IndexService assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err)) } i.nodeClients.IncPriority(nodeID, 1) } @@ -522,7 +523,7 @@ func (i *IndexService) watchNodeLoop() { defer cancel() defer i.loopWg.Done() - log.Debug("IndexService start watch node loop") + log.Debug("IndexService watchNodeLoop start") for { select { @@ -532,10 +533,10 @@ func (i *IndexService) watchNodeLoop() { switch event.EventType { case sessionutil.SessionAddEvent: serverID := event.Session.ServerID - log.Debug("IndexService", zap.Any("Add IndexNode, session serverID", serverID)) + log.Debug("IndexService watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID)) case sessionutil.SessionDelEvent: serverID := event.Session.ServerID - log.Debug("IndexService", zap.Any("The IndexNode crashed with ID", serverID)) + log.Debug("IndexService watchNodeLoop SessionDelEvent ", zap.Any("serverID", serverID)) indexBuildIDs := i.nodeTasks.getTasksByNodeID(serverID) i.assignChan <- indexBuildIDs i.nodeTasks.delete(serverID) @@ -549,7 +550,7 @@ func (i *IndexService) watchMetaLoop() { defer cancel() defer i.loopWg.Done() - log.Debug("IndexService start watch meta loop") + log.Debug("IndexService watchMetaLoop start") watchChan := i.metaTable.client.WatchWithPrefix("indexes") @@ -558,21 +559,19 @@ func (i *IndexService) watchMetaLoop() { case <-ctx.Done(): return case resp := <-watchChan: - log.Debug("meta updated.") + log.Debug("IndexService watchMetaLoop find meta updated.") for _, event := range resp.Events { eventRevision := event.Kv.Version indexMeta := &indexpb.IndexMeta{} err := proto.UnmarshalText(string(event.Kv.Value), indexMeta) - if err != nil { - log.Debug("IndexService", zap.Any("Unmarshal error", err)) - } indexBuildID := indexMeta.IndexBuildID + log.Debug("IndexService watchMetaLoop", zap.Any("event.Key", event.Kv.Key), + zap.Any("event.V", indexMeta), zap.Any("IndexBuildID", indexBuildID), zap.Error(err)) switch event.Type { case mvccpb.PUT: //TODO: get indexBuildID fast - log.Debug("IndexService", zap.Any("Meta need load by IndexBuildID", indexBuildID)) - reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision) + log.Debug("IndexService watchMetaLoop PUT", zap.Any("IndexBuildID", indexBuildID), zap.Any("reload", reload)) if reload { i.nodeTasks.finishTask(indexBuildID) } diff --git a/internal/indexservice/meta_table.go b/internal/indexservice/meta_table.go index 9f5e04990a..dc8fc75c43 100644 --- a/internal/indexservice/meta_table.go +++ b/internal/indexservice/meta_table.go @@ -56,7 +56,7 @@ func NewMetaTable(kv *etcdkv.EtcdKV) (*metaTable, error) { func (mt *metaTable) reloadFromKV() error { mt.indexBuildID2Meta = make(map[UniqueID]Meta) key := "indexes" - log.Debug("LoadWithPrefix ", zap.String("prefix", key)) + log.Debug("IndexService metaTable LoadWithPrefix ", zap.String("prefix", key)) _, values, versions, err := mt.client.LoadWithPrefix2(key) if err != nil { @@ -84,15 +84,14 @@ func (mt *metaTable) saveIndexMeta(meta *Meta) error { value := proto.MarshalTextString(meta.indexMeta) key := "indexes/" + strconv.FormatInt(meta.indexMeta.IndexBuildID, 10) - log.Debug("LoadWithPrefix ", zap.String("prefix", key)) - err := mt.client.CompareVersionAndSwap(key, meta.revision, value) + log.Debug("IndexService metaTable saveIndexMeta ", zap.String("key", key), zap.Error(err)) if err != nil { return err } - meta.revision = meta.revision + 1 mt.indexBuildID2Meta[meta.indexMeta.IndexBuildID] = *meta + log.Debug("IndexService metaTable saveIndexMeta success", zap.Any("meta.revision", meta.revision)) return nil } @@ -101,6 +100,7 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) { key := "indexes/" + strconv.FormatInt(indexBuildID, 10) _, values, version, err := mt.client.LoadWithPrefix2(key) + log.Debug("IndexService reloadMeta mt.client.LoadWithPrefix2", zap.Any("indexBuildID", indexBuildID), zap.Error(err)) if err != nil { return nil, err } @@ -109,9 +109,9 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) { if err != nil { return nil, err } - if im.State == commonpb.IndexState_Finished { - return nil, nil - } + //if im.State == commonpb.IndexState_Finished { + // return nil, nil + //} m := &Meta{ revision: version[0], indexMeta: im, @@ -123,8 +123,8 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) { func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequest) error { mt.lock.Lock() defer mt.lock.Unlock() - log.Debug("indexservice add index ...") _, ok := mt.indexBuildID2Meta[indexBuildID] + log.Debug("IndexService metaTable AddIndex", zap.Any(" index already exist", ok)) if ok { return fmt.Errorf("index already exists with ID = %d", indexBuildID) } @@ -144,17 +144,20 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error { mt.lock.Lock() defer mt.lock.Unlock() - log.Debug("IndexService update index meta") + log.Debug("IndexService metaTable BuildIndex") meta, ok := mt.indexBuildID2Meta[indexBuildID] if !ok { + log.Debug("IndexService metaTable BuildIndex index not exists", zap.Any("indexBuildID", indexBuildID)) return fmt.Errorf("index not exists with ID = %d", indexBuildID) } - if meta.indexMeta.State != commonpb.IndexState_Unissued { - return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State) - } + //if meta.indexMeta.State != commonpb.IndexState_Unissued { + // return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State) + //} + meta.indexMeta.NodeID = nodeID + meta.indexMeta.State = commonpb.IndexState_InProgress err := mt.saveIndexMeta(&meta) if err != nil { @@ -178,17 +181,20 @@ func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error { func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() - log.Debug("IndexService update index version") + log.Debug("IndexService metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID)) meta, ok := mt.indexBuildID2Meta[indexBuildID] if !ok { + log.Debug("IndexService metaTable update UpdateVersion indexBuildID not exists", zap.Any("IndexBuildId", indexBuildID)) return fmt.Errorf("index not exists with ID = %d", indexBuildID) } - if meta.indexMeta.State != commonpb.IndexState_Unissued { - return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State) - } + //if meta.indexMeta.State != commonpb.IndexState_Unissued { + // return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State) + //} meta.indexMeta.Version = meta.indexMeta.Version + 1 + log.Debug("IndexService metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID), + zap.Any("Version", meta.indexMeta.Version)) err := mt.saveIndexMeta(&meta) if err != nil { @@ -212,13 +218,13 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() - log.Debug("indexservice", zap.Int64("mark index is deleted", indexID)) + log.Debug("IndexService metaTable MarkIndexAsDeleted ", zap.Int64("indexID", indexID)) for _, meta := range mt.indexBuildID2Meta { if meta.indexMeta.Req.IndexID == indexID { meta.indexMeta.MarkDeleted = true if err := mt.saveIndexMeta(&meta); err != nil { - log.Debug("IndexService", zap.Any("Meta table mark deleted err", err.Error())) + log.Debug("IndexService metaTable MarkIndexAsDeleted saveIndexMeta failed", zap.Error(err)) fn := func() error { m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID) if m == nil { @@ -284,9 +290,7 @@ func (mt *metaTable) DeleteIndex(indexBuildID UniqueID) { key := "indexes/" + strconv.FormatInt(indexBuildID, 10) err := mt.client.Remove(key) - if err != nil { - log.Debug("IndexService", zap.Any("Delete IndexMeta in etcd error", err)) - } + log.Debug("IndexService metaTable DeleteIndex", zap.Error(err)) } func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error { @@ -294,10 +298,16 @@ func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error { defer mt.lock.Unlock() meta, ok := mt.indexBuildID2Meta[indexBuildID] + log.Debug("IndexService metaTable UpdateRecycleState", zap.Any("indexBuildID", indexBuildID), + zap.Any("exists", ok)) if !ok { return fmt.Errorf("index not exists with ID = %d", indexBuildID) } + if meta.indexMeta.Recycled { + return nil + } + meta.indexMeta.Recycled = true if err := mt.saveIndexMeta(&meta); err != nil { fn := func() error { @@ -311,6 +321,8 @@ func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error { } err2 := retry.Retry(5, time.Millisecond*200, fn) if err2 != nil { + meta.indexMeta.Recycled = false + log.Debug("IndexService metaTable UpdateRecycleState failed", zap.Error(err2)) return err2 } } @@ -340,10 +352,8 @@ func (mt *metaTable) GetIndexMeta(indexBuildID UniqueID) Meta { defer mt.lock.Unlock() meta, ok := mt.indexBuildID2Meta[indexBuildID] - if !ok { - log.Debug("IndexService", zap.Any("Meta table does not have the meta with indexBuildID", indexBuildID)) - } - + log.Debug("IndexService metaTable GetIndexMeta", zap.Any("indexBuildID", indexBuildID), + zap.Any("exist", ok)) return meta } @@ -371,51 +381,6 @@ func (mt *metaTable) GetUnassignedTasks(nodeIDs []int64) [][]UniqueID { return tasks } -func compare2Array(arr1, arr2 interface{}) bool { - p1, ok := arr1.([]*commonpb.KeyValuePair) - if ok { - p2, ok1 := arr2.([]*commonpb.KeyValuePair) - if ok1 { - for _, param1 := range p1 { - sameParams := false - for _, param2 := range p2 { - if param1.Key == param2.Key && param1.Value == param2.Value { - sameParams = true - } - } - if !sameParams { - return false - } - } - return true - } - log.Error("indexservice", zap.Any("type error", "arr2 type should be commonpb.KeyValuePair")) - return false - } - v1, ok2 := arr1.([]string) - if ok2 { - v2, ok3 := arr2.([]string) - if ok3 { - for _, s1 := range v1 { - sameParams := false - for _, s2 := range v2 { - if s1 == s2 { - sameParams = true - } - } - if !sameParams { - return false - } - } - return true - } - log.Error("indexservice", zap.Any("type error", "arr2 type should be string array")) - return false - } - log.Error("indexservice", zap.Any("type error", "param type should be commonpb.KeyValuePair or string array")) - return false -} - func (mt *metaTable) HasSameReq(req *indexpb.BuildIndexRequest) (bool, UniqueID) { mt.lock.Lock() defer mt.lock.Unlock() @@ -442,9 +407,14 @@ LOOP: func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool { mt.lock.Lock() defer mt.lock.Unlock() - meta, ok := mt.indexBuildID2Meta[indexBuildID] + log.Debug("IndexService metaTable LoadMetaFromETCD", zap.Any("indexBuildID", indexBuildID), + zap.Any("revision", revision), zap.Any("ok", ok)) if ok { + log.Debug("IndexService metaTable LoadMetaFromETCD", + zap.Any("meta.revision", meta.revision), + zap.Any("revision", revision)) + if meta.revision >= revision { return false } @@ -452,12 +422,12 @@ func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool { m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID) if m == nil { - log.Debug("IndexService", zap.Any("Load meta from etcd error", err)) + log.Debug("IndexService metaTable reloadMeta failed", zap.Error(err)) return false } - log.Debug("IndexService", zap.Any("IndexMeta", m)) mt.indexBuildID2Meta[indexBuildID] = *m + log.Debug("IndexService LoadMetaFromETCD success", zap.Any("IndexMeta", m)) return true } diff --git a/internal/indexservice/util.go b/internal/indexservice/util.go index 582cf2ae7d..4d44a91747 100644 --- a/internal/indexservice/util.go +++ b/internal/indexservice/util.go @@ -11,7 +11,10 @@ package indexservice -import "github.com/milvus-io/milvus/internal/proto/commonpb" +import ( + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/commonpb" +) func CompareAddress(a *commonpb.Address, b *commonpb.Address) bool { if a == b { @@ -19,3 +22,48 @@ func CompareAddress(a *commonpb.Address, b *commonpb.Address) bool { } return a.Ip == b.Ip && a.Port == b.Port } + +func compare2Array(arr1, arr2 interface{}) bool { + p1, ok := arr1.([]*commonpb.KeyValuePair) + if ok { + p2, ok1 := arr2.([]*commonpb.KeyValuePair) + if ok1 { + for _, param1 := range p1 { + sameParams := false + for _, param2 := range p2 { + if param1.Key == param2.Key && param1.Value == param2.Value { + sameParams = true + } + } + if !sameParams { + return false + } + } + return true + } + log.Error("IndexService compare2Array arr2 should be commonpb.KeyValuePair") + return false + } + v1, ok2 := arr1.([]string) + if ok2 { + v2, ok3 := arr2.([]string) + if ok3 { + for _, s1 := range v1 { + sameParams := false + for _, s2 := range v2 { + if s1 == s2 { + sameParams = true + } + } + if !sameParams { + return false + } + } + return true + } + log.Error("IndexService compare2Array arr2 type should be string array") + return false + } + log.Error("IndexService compare2Array param type should be commonpb.KeyValuePair or string array") + return false +} diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index ebe96ff5a5..8868977da9 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -435,7 +435,7 @@ func (c *Core) startDataNodeFlushedSegmentLoop() { EnableIndex: false, } info.BuildID, err = c.BuildIndex(segID, fieldSch, idxInfo, true) - if err == nil { + if err == nil && info.BuildID != 0 { info.EnableIndex = true } else { log.Error("build index fail", zap.String("error", err.Error())) diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index f64faf5c7b..ee891c2ccb 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -697,6 +697,8 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error { } //TODO, get filed_id and index_name from request segIdxInfo, err := t.core.MetaTable.GetSegmentIndexInfoByID(t.Req.SegmentID, -1, "") + log.Debug("MasterService DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID", zap.Any("SegmentID", t.Req.SegmentID), + zap.Any("segIdxInfo", segIdxInfo), zap.Error(err)) if err != nil { return err } @@ -793,10 +795,14 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error { if err != nil { return err } + if info.BuildID != 0 { + info.EnableIndex = true + } segIdxInfos = append(segIdxInfos, &info) } _, err = t.core.MetaTable.AddIndex(segIdxInfos, "", "") + log.Debug("MasterService CreateIndexReq", zap.Any("segIdxInfos", segIdxInfos), zap.Error(err)) return err } diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index 4bf2037360..acb640fc46 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -297,6 +297,7 @@ func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment, if err != nil { return err } + log.Debug("QueryNode IndexLoader setIndexInfo", zap.Any("Req", req), zap.Any("response", response)) if response.Status.ErrorCode != commonpb.ErrorCode_Success { return errors.New(response.Status.Reason) } diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go index be9127553b..f80b956a78 100644 --- a/internal/querynode/load_service.go +++ b/internal/querynode/load_service.go @@ -172,7 +172,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, segment *Segmen for _, vecFieldID := range vectorFieldIDs { err = s.segLoader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID) if err != nil { - log.Warn(err.Error()) + log.Warn("QueryNode load_service", zap.Any("SegmentID", segment.segmentID), zap.Error(err)) continue } loadIndexFieldIDs = append(loadIndexFieldIDs, vecFieldID)