mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Fixbug:index is not loaded in some cases (#5633)
* Fixbug:index is not created in some cases Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com> * Delete unecessary print Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com> * fix create index Signed-off-by: yefu.chen <yefu.chen@zilliz.com> Co-authored-by: yefu.chen <yefu.chen@zilliz.com>
This commit is contained in:
parent
8ead67ef95
commit
bc46e4780d
@ -66,7 +66,7 @@ type IndexNode struct {
|
||||
}
|
||||
|
||||
func NewIndexNode(ctx context.Context) (*IndexNode, error) {
|
||||
log.Debug("new index node ...")
|
||||
log.Debug("New IndexNode ...")
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
b := &IndexNode{
|
||||
@ -192,7 +192,7 @@ func (i *IndexNode) SetIndexServiceClient(serviceClient types.IndexService) {
|
||||
func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
|
||||
log.Debug("IndexNode building index ...",
|
||||
zap.Int64("IndexBuildID", request.IndexBuildID),
|
||||
zap.String("Indexname", request.IndexName),
|
||||
zap.String("IndexName", request.IndexName),
|
||||
zap.Int64("IndexID", request.IndexID),
|
||||
zap.Int64("Version", request.Version),
|
||||
zap.String("MetaPath", request.MetaPath),
|
||||
@ -253,7 +253,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.Compone
|
||||
},
|
||||
}
|
||||
|
||||
log.Debug("IndexNode compoents states",
|
||||
log.Debug("IndexNode Component states",
|
||||
zap.Any("State", ret.State),
|
||||
zap.Any("Status", ret.Status),
|
||||
zap.Any("SubcomponentStates", ret.SubcomponentStates))
|
||||
@ -261,7 +261,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.Compone
|
||||
}
|
||||
|
||||
func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
log.Debug("get indexnode time tick channel ...")
|
||||
log.Debug("get IndexNode time tick channel ...")
|
||||
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
@ -271,7 +271,7 @@ func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRes
|
||||
}
|
||||
|
||||
func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
log.Debug("get indexnode statistics channel ...")
|
||||
log.Debug("get IndexNode statistics channel ...")
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -113,7 +113,7 @@ func (bt *BaseTask) Name() string {
|
||||
|
||||
func (it *IndexBuildTask) OnEnqueue() error {
|
||||
it.SetID(it.req.IndexBuildID)
|
||||
log.Debug("indexnode", zap.Int64("[IndexBuilderTask] Enqueue TaskID", it.ID()))
|
||||
log.Debug("IndexNode IndexBuilderTask Enqueue", zap.Int64("TaskID", it.ID()))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -122,17 +122,19 @@ func (it *IndexBuildTask) checkIndexMeta(pre bool) error {
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
_, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath)
|
||||
if err != nil {
|
||||
log.Debug("IndexService", zap.Any("load meta error with path", it.req.MetaPath))
|
||||
log.Debug("IndexService", zap.Any("Load meta error", err))
|
||||
log.Debug("IndexNode checkIndexMeta", zap.Any("load meta error with path", it.req.MetaPath),
|
||||
zap.Error(err), zap.Any("pre", pre))
|
||||
return err
|
||||
}
|
||||
log.Debug("IndexNode checkIndexMeta load meta success", zap.Any("path", it.req.MetaPath), zap.Any("pre", pre))
|
||||
err = proto.UnmarshalText(values[0], &indexMeta)
|
||||
if err != nil {
|
||||
log.Debug("IndexService", zap.Any("Unmarshal error", err))
|
||||
log.Debug("IndexNode checkIndexMeta Unmarshal", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("IndexNode checkIndexMeta Unmarshal success", zap.Any("IndexMeta", indexMeta))
|
||||
if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
|
||||
log.Debug("IndexNode", zap.Any("Notify build index", "This version is not the latest version"))
|
||||
log.Debug("IndexNode checkIndexMeta Notify build index this version is not the latest version", zap.Any("version", it.req.Version))
|
||||
return nil
|
||||
}
|
||||
if indexMeta.MarkDeleted {
|
||||
@ -152,24 +154,25 @@ func (it *IndexBuildTask) checkIndexMeta(pre bool) error {
|
||||
if it.err != nil {
|
||||
indexMeta.State = commonpb.IndexState_Failed
|
||||
}
|
||||
log.Debug("IndexNode", zap.Any("MetaPath", it.req.MetaPath))
|
||||
err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0],
|
||||
proto.MarshalTextString(&indexMeta))
|
||||
log.Debug("IndexNode checkIndexMeta CompareVersionAndSwap", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err := retry.Retry(3, time.Millisecond*200, fn)
|
||||
log.Debug("IndexNode checkIndexMeta final", zap.Error(err))
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
|
||||
log.Debug("preExecute...")
|
||||
log.Debug("IndexNode IndexBuildTask preExecute...")
|
||||
return it.checkIndexMeta(true)
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
||||
log.Debug("PostExecute...")
|
||||
log.Debug("IndexNode IndexBuildTask PostExecute...")
|
||||
|
||||
defer func() {
|
||||
if it.err != nil {
|
||||
@ -178,8 +181,8 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
||||
}()
|
||||
|
||||
if it.serviceClient == nil {
|
||||
err := errors.New("IndexBuildTask, serviceClient is nil")
|
||||
log.Debug("[IndexBuildTask][PostExecute] serviceClient is nil")
|
||||
err := errors.New("IndexNode IndexBuildTask PostExecute, serviceClient is nil")
|
||||
log.Error("", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -187,7 +190,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
log.Debug("start build index ...")
|
||||
log.Debug("IndexNode IndexBuildTask Execute ...")
|
||||
var err error
|
||||
|
||||
typeParams := make(map[string]string)
|
||||
@ -232,13 +235,13 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
|
||||
it.index, err = NewCIndex(typeParams, indexParams)
|
||||
if err != nil {
|
||||
log.Error("indexnode", zap.String("NewCIndex err:", err.Error()))
|
||||
log.Error("IndexNode IndexBuildTask Execute NewCIndex failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err = it.index.Delete()
|
||||
if err != nil {
|
||||
log.Warn("CIndexDelete Failed")
|
||||
log.Warn("IndexNode IndexBuildTask Execute CIndexDelete Failed", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
@ -305,7 +308,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
if fOk {
|
||||
err = it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data)
|
||||
if err != nil {
|
||||
log.Error("indexnode", zap.String("BuildFloatVecIndexWithoutIds error", err.Error()))
|
||||
log.Error("IndexNode BuildFloatVecIndexWithoutIds failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -314,7 +317,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
if bOk {
|
||||
err = it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data)
|
||||
if err != nil {
|
||||
log.Error("indexnode", zap.String("BuildBinaryVecIndexWithoutIds err", err.Error()))
|
||||
log.Error("IndexNode BuildBinaryVecIndexWithoutIds failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -325,7 +328,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
|
||||
indexBlobs, err := it.index.Serialize()
|
||||
if err != nil {
|
||||
log.Error("indexnode", zap.String("serialize err", err.Error()))
|
||||
log.Error("IndexNode index Serialize failed", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
@ -354,23 +357,25 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
saveIndexFileFn := func() error {
|
||||
v, err := it.etcdKV.Load(it.req.MetaPath)
|
||||
if err != nil {
|
||||
log.Debug("IndexService", zap.Any("load meta error with path", it.req.MetaPath))
|
||||
log.Debug("IndexService", zap.Any("Load meta error", err))
|
||||
log.Debug("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
err = proto.UnmarshalText(v, &indexMeta)
|
||||
if err != nil {
|
||||
log.Debug("IndexService", zap.Any("Unmarshal error", err))
|
||||
log.Debug("IndexNode Unmarshal indexMeta error ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta))
|
||||
if indexMeta.Version > it.req.Version {
|
||||
log.Debug("IndexNode", zap.Any("Notify build index", "This version is not the latest version"))
|
||||
log.Debug("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version),
|
||||
zap.Any("indexMeta.Version", indexMeta.Version))
|
||||
return errors.New("This task has been reassigned ")
|
||||
}
|
||||
return saveBlob(savePath, value)
|
||||
}
|
||||
err := retry.Retry(5, time.Millisecond*200, saveIndexFileFn)
|
||||
log.Debug("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -398,7 +403,7 @@ func (it *IndexBuildTask) Rollback() error {
|
||||
|
||||
err := it.kv.MultiRemove(it.savePaths)
|
||||
if err != nil {
|
||||
log.Warn("indexnode", zap.String("IndexBuildTask Rollback Failed", err.Error()))
|
||||
log.Warn("IndexNode IndexBuildTask Rollback Failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -70,7 +70,7 @@ func (queue *BaseTaskQueue) addUnissuedTask(t task) error {
|
||||
defer queue.utLock.Unlock()
|
||||
|
||||
if queue.utFull() {
|
||||
return errors.New("task queue is full")
|
||||
return errors.New("IndexNode task queue is full")
|
||||
}
|
||||
queue.unissuedTasks.PushBack(t)
|
||||
queue.utBufChan <- 1
|
||||
@ -82,7 +82,7 @@ func (queue *BaseTaskQueue) FrontUnissuedTask() task {
|
||||
defer queue.utLock.Unlock()
|
||||
|
||||
if queue.unissuedTasks.Len() <= 0 {
|
||||
log.Debug("FrontUnissuedTask sorry, but the unissued task list is empty!")
|
||||
log.Debug("IndexNode FrontUnissuedTask sorry, but the unissued task list is empty!")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -94,7 +94,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task {
|
||||
defer queue.utLock.Unlock()
|
||||
|
||||
if queue.unissuedTasks.Len() <= 0 {
|
||||
log.Debug("PopUnissued task sorry, but the unissued task list is empty!")
|
||||
log.Debug("IndexNode PopUnissued task sorry, but the unissued task list is empty!")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -111,7 +111,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) {
|
||||
tID := t.ID()
|
||||
_, ok := queue.activeTasks[tID]
|
||||
if ok {
|
||||
log.Debug("indexnode", zap.Int64("task with ID %v already in active task list!", tID))
|
||||
log.Debug("IndexNode task already in activate task list", zap.Any("TaskID", tID))
|
||||
}
|
||||
|
||||
queue.activeTasks[tID] = t
|
||||
@ -126,7 +126,7 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
|
||||
delete(queue.activeTasks, tID)
|
||||
return t
|
||||
}
|
||||
log.Debug("indexnode", zap.Int64("sorry, but the ID was not found in the active task list!", tID))
|
||||
log.Debug("IndexNode the task was not found in the active task list", zap.Any("TaskID", tID))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -201,7 +201,7 @@ func NewTaskScheduler(ctx context.Context,
|
||||
|
||||
func (sched *TaskScheduler) setParallelism(parallel int) {
|
||||
if parallel <= 0 {
|
||||
log.Debug("can not set parallelism to less than zero!")
|
||||
log.Debug("IndexNode can not set parallelism to less than zero!")
|
||||
return
|
||||
}
|
||||
sched.buildParallel = parallel
|
||||
@ -258,7 +258,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
||||
}
|
||||
|
||||
func (sched *TaskScheduler) indexBuildLoop() {
|
||||
log.Debug("index build loop ...")
|
||||
log.Debug("IndexNode TaskScheduler start build loop ...")
|
||||
defer sched.wg.Done()
|
||||
for {
|
||||
select {
|
||||
|
@ -346,8 +346,7 @@ func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndex
|
||||
},
|
||||
States: indexStates,
|
||||
}
|
||||
log.Debug("get index states success")
|
||||
log.Debug("get index states",
|
||||
log.Debug("IndexService get index states success",
|
||||
zap.Any("index status", ret.Status),
|
||||
zap.Any("index states", ret.States))
|
||||
|
||||
@ -355,7 +354,7 @@ func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.GetIndex
|
||||
}
|
||||
|
||||
func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
|
||||
log.Debug("IndexService", zap.Int64("Drop Index ID", req.IndexID))
|
||||
log.Debug("IndexService DropIndex", zap.Any("IndexID", req.IndexID))
|
||||
|
||||
ret := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
@ -376,12 +375,12 @@ func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequ
|
||||
}()
|
||||
}()
|
||||
|
||||
log.Debug("IndexService", zap.Int64("DropIndex success by ID", req.IndexID))
|
||||
log.Debug("IndexService DropIndex success", zap.Any("IndexID", req.IndexID))
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
|
||||
log.Debug("IndexService", zap.Int64s("get index file paths", req.IndexBuildIDs))
|
||||
log.Debug("IndexService GetIndexFilePaths", zap.Int64s("IndexBuildIds", req.IndexBuildIDs))
|
||||
var indexPaths []*indexpb.IndexFilePathInfo = nil
|
||||
|
||||
for _, indexID := range req.IndexBuildIDs {
|
||||
@ -391,7 +390,7 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn
|
||||
}
|
||||
indexPaths = append(indexPaths, indexPathInfo)
|
||||
}
|
||||
log.Debug("IndexService, get index file paths success")
|
||||
log.Debug("IndexService GetIndexFilePaths success")
|
||||
|
||||
ret := &indexpb.GetIndexFilePathsResponse{
|
||||
Status: &commonpb.Status{
|
||||
@ -399,7 +398,7 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn
|
||||
},
|
||||
FilePaths: indexPaths,
|
||||
}
|
||||
log.Debug("IndexService", zap.Any("index file paths", ret.FilePaths))
|
||||
log.Debug("IndexService GetIndexFilePaths ", zap.Any("FilePaths", ret.FilePaths))
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
@ -414,12 +413,12 @@ func (i *IndexService) tsLoop() {
|
||||
select {
|
||||
case <-tsoTicker.C:
|
||||
if err := i.idAllocator.UpdateID(); err != nil {
|
||||
log.Debug("IndexService", zap.String("failed to update id", err.Error()))
|
||||
log.Debug("IndexService tsLoop UpdateID failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// Server is closed and it should return nil.
|
||||
log.Debug("tsLoop is closed")
|
||||
log.Debug("IndexService tsLoop is closed")
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -432,7 +431,7 @@ func (i *IndexService) recycleUnusedIndexFiles() {
|
||||
defer i.loopWg.Done()
|
||||
|
||||
timeTicker := time.NewTicker(durationInterval)
|
||||
log.Debug("IndexService start recycle unused index files loop")
|
||||
log.Debug("IndexService start recycleUnusedIndexFiles loop")
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -444,18 +443,20 @@ func (i *IndexService) recycleUnusedIndexFiles() {
|
||||
if meta.indexMeta.MarkDeleted {
|
||||
unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID))
|
||||
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
|
||||
log.Debug("IndexService", zap.String("Remove index files error", err.Error()))
|
||||
log.Debug("IndexService recycleUnusedIndexFiles Remove index files failed",
|
||||
zap.Any("MarkDeleted", true), zap.Error(err))
|
||||
}
|
||||
i.metaTable.DeleteIndex(meta.indexMeta.IndexBuildID)
|
||||
} else {
|
||||
for j := 1; j < int(meta.indexMeta.Version); j++ {
|
||||
unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) + "/" + strconv.Itoa(j)
|
||||
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
|
||||
log.Debug("IndexService", zap.String("Remove index files error", err.Error()))
|
||||
log.Debug("IndexService recycleUnusedIndexFiles Remove index files failed",
|
||||
zap.Any("MarkDeleted", false), zap.Error(err))
|
||||
}
|
||||
}
|
||||
if err := i.metaTable.UpdateRecycleState(meta.indexMeta.IndexBuildID); err != nil {
|
||||
log.Debug("IndexService", zap.String("Remove index files error", err.Error()))
|
||||
log.Debug("IndexService recycleUnusedIndexFiles UpdateRecycleState failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -469,7 +470,7 @@ func (i *IndexService) assignmentTasksLoop() {
|
||||
defer cancel()
|
||||
defer i.loopWg.Done()
|
||||
|
||||
log.Debug("IndexService start assign tasks loop")
|
||||
log.Debug("IndexService start assignmentTasksLoop start")
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -478,16 +479,16 @@ func (i *IndexService) assignmentTasksLoop() {
|
||||
case indexBuildIDs := <-i.assignChan:
|
||||
for _, indexBuildID := range indexBuildIDs {
|
||||
meta := i.metaTable.GetIndexMeta(indexBuildID)
|
||||
log.Debug("IndexService", zap.Any("Meta", meta))
|
||||
log.Debug("IndexService assignmentTasksLoop ", zap.Any("Meta", meta))
|
||||
if meta.indexMeta.State == commonpb.IndexState_Finished {
|
||||
continue
|
||||
}
|
||||
if err := i.metaTable.UpdateVersion(indexBuildID); err != nil {
|
||||
log.Debug("IndexService", zap.String("build index update version err", err.Error()))
|
||||
log.Debug("IndexService assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err))
|
||||
}
|
||||
nodeID, builderClient := i.nodeClients.PeekClient()
|
||||
if builderClient == nil {
|
||||
log.Debug("IndexService has no available IndexNode")
|
||||
log.Debug("IndexService assignmentTasksLoop can not find available IndexNode")
|
||||
i.assignChan <- []UniqueID{indexBuildID}
|
||||
continue
|
||||
}
|
||||
@ -503,13 +504,13 @@ func (i *IndexService) assignmentTasksLoop() {
|
||||
}
|
||||
resp, err := builderClient.CreateIndex(ctx, req)
|
||||
if err != nil {
|
||||
log.Debug("IndexService", zap.String("build index err", err.Error()))
|
||||
}
|
||||
if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {
|
||||
log.Debug("IndexService", zap.String("update meta table error", err.Error()))
|
||||
log.Debug("IndexService assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
|
||||
}
|
||||
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Debug("IndexService", zap.String("build index err", resp.Reason))
|
||||
log.Debug("IndexService assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
|
||||
}
|
||||
if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {
|
||||
log.Debug("IndexService assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err))
|
||||
}
|
||||
i.nodeClients.IncPriority(nodeID, 1)
|
||||
}
|
||||
@ -522,7 +523,7 @@ func (i *IndexService) watchNodeLoop() {
|
||||
|
||||
defer cancel()
|
||||
defer i.loopWg.Done()
|
||||
log.Debug("IndexService start watch node loop")
|
||||
log.Debug("IndexService watchNodeLoop start")
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -532,10 +533,10 @@ func (i *IndexService) watchNodeLoop() {
|
||||
switch event.EventType {
|
||||
case sessionutil.SessionAddEvent:
|
||||
serverID := event.Session.ServerID
|
||||
log.Debug("IndexService", zap.Any("Add IndexNode, session serverID", serverID))
|
||||
log.Debug("IndexService watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID))
|
||||
case sessionutil.SessionDelEvent:
|
||||
serverID := event.Session.ServerID
|
||||
log.Debug("IndexService", zap.Any("The IndexNode crashed with ID", serverID))
|
||||
log.Debug("IndexService watchNodeLoop SessionDelEvent ", zap.Any("serverID", serverID))
|
||||
indexBuildIDs := i.nodeTasks.getTasksByNodeID(serverID)
|
||||
i.assignChan <- indexBuildIDs
|
||||
i.nodeTasks.delete(serverID)
|
||||
@ -549,7 +550,7 @@ func (i *IndexService) watchMetaLoop() {
|
||||
|
||||
defer cancel()
|
||||
defer i.loopWg.Done()
|
||||
log.Debug("IndexService start watch meta loop")
|
||||
log.Debug("IndexService watchMetaLoop start")
|
||||
|
||||
watchChan := i.metaTable.client.WatchWithPrefix("indexes")
|
||||
|
||||
@ -558,21 +559,19 @@ func (i *IndexService) watchMetaLoop() {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case resp := <-watchChan:
|
||||
log.Debug("meta updated.")
|
||||
log.Debug("IndexService watchMetaLoop find meta updated.")
|
||||
for _, event := range resp.Events {
|
||||
eventRevision := event.Kv.Version
|
||||
indexMeta := &indexpb.IndexMeta{}
|
||||
err := proto.UnmarshalText(string(event.Kv.Value), indexMeta)
|
||||
if err != nil {
|
||||
log.Debug("IndexService", zap.Any("Unmarshal error", err))
|
||||
}
|
||||
indexBuildID := indexMeta.IndexBuildID
|
||||
log.Debug("IndexService watchMetaLoop", zap.Any("event.Key", event.Kv.Key),
|
||||
zap.Any("event.V", indexMeta), zap.Any("IndexBuildID", indexBuildID), zap.Error(err))
|
||||
switch event.Type {
|
||||
case mvccpb.PUT:
|
||||
//TODO: get indexBuildID fast
|
||||
log.Debug("IndexService", zap.Any("Meta need load by IndexBuildID", indexBuildID))
|
||||
|
||||
reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision)
|
||||
log.Debug("IndexService watchMetaLoop PUT", zap.Any("IndexBuildID", indexBuildID), zap.Any("reload", reload))
|
||||
if reload {
|
||||
i.nodeTasks.finishTask(indexBuildID)
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ func NewMetaTable(kv *etcdkv.EtcdKV) (*metaTable, error) {
|
||||
func (mt *metaTable) reloadFromKV() error {
|
||||
mt.indexBuildID2Meta = make(map[UniqueID]Meta)
|
||||
key := "indexes"
|
||||
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
|
||||
log.Debug("IndexService metaTable LoadWithPrefix ", zap.String("prefix", key))
|
||||
|
||||
_, values, versions, err := mt.client.LoadWithPrefix2(key)
|
||||
if err != nil {
|
||||
@ -84,15 +84,14 @@ func (mt *metaTable) saveIndexMeta(meta *Meta) error {
|
||||
value := proto.MarshalTextString(meta.indexMeta)
|
||||
|
||||
key := "indexes/" + strconv.FormatInt(meta.indexMeta.IndexBuildID, 10)
|
||||
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
|
||||
|
||||
err := mt.client.CompareVersionAndSwap(key, meta.revision, value)
|
||||
log.Debug("IndexService metaTable saveIndexMeta ", zap.String("key", key), zap.Error(err))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
meta.revision = meta.revision + 1
|
||||
mt.indexBuildID2Meta[meta.indexMeta.IndexBuildID] = *meta
|
||||
log.Debug("IndexService metaTable saveIndexMeta success", zap.Any("meta.revision", meta.revision))
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -101,6 +100,7 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
|
||||
key := "indexes/" + strconv.FormatInt(indexBuildID, 10)
|
||||
|
||||
_, values, version, err := mt.client.LoadWithPrefix2(key)
|
||||
log.Debug("IndexService reloadMeta mt.client.LoadWithPrefix2", zap.Any("indexBuildID", indexBuildID), zap.Error(err))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -109,9 +109,9 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if im.State == commonpb.IndexState_Finished {
|
||||
return nil, nil
|
||||
}
|
||||
//if im.State == commonpb.IndexState_Finished {
|
||||
// return nil, nil
|
||||
//}
|
||||
m := &Meta{
|
||||
revision: version[0],
|
||||
indexMeta: im,
|
||||
@ -123,8 +123,8 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
|
||||
func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequest) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
log.Debug("indexservice add index ...")
|
||||
_, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
log.Debug("IndexService metaTable AddIndex", zap.Any(" index already exist", ok))
|
||||
if ok {
|
||||
return fmt.Errorf("index already exists with ID = %d", indexBuildID)
|
||||
}
|
||||
@ -144,17 +144,20 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ
|
||||
func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
log.Debug("IndexService update index meta")
|
||||
log.Debug("IndexService metaTable BuildIndex")
|
||||
|
||||
meta, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
if !ok {
|
||||
log.Debug("IndexService metaTable BuildIndex index not exists", zap.Any("indexBuildID", indexBuildID))
|
||||
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
|
||||
}
|
||||
|
||||
if meta.indexMeta.State != commonpb.IndexState_Unissued {
|
||||
return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State)
|
||||
}
|
||||
//if meta.indexMeta.State != commonpb.IndexState_Unissued {
|
||||
// return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State)
|
||||
//}
|
||||
|
||||
meta.indexMeta.NodeID = nodeID
|
||||
meta.indexMeta.State = commonpb.IndexState_InProgress
|
||||
|
||||
err := mt.saveIndexMeta(&meta)
|
||||
if err != nil {
|
||||
@ -178,17 +181,20 @@ func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error {
|
||||
func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
log.Debug("IndexService update index version")
|
||||
log.Debug("IndexService metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID))
|
||||
meta, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
if !ok {
|
||||
log.Debug("IndexService metaTable update UpdateVersion indexBuildID not exists", zap.Any("IndexBuildId", indexBuildID))
|
||||
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
|
||||
}
|
||||
|
||||
if meta.indexMeta.State != commonpb.IndexState_Unissued {
|
||||
return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State)
|
||||
}
|
||||
//if meta.indexMeta.State != commonpb.IndexState_Unissued {
|
||||
// return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State)
|
||||
//}
|
||||
|
||||
meta.indexMeta.Version = meta.indexMeta.Version + 1
|
||||
log.Debug("IndexService metaTable update UpdateVersion", zap.Any("IndexBuildId", indexBuildID),
|
||||
zap.Any("Version", meta.indexMeta.Version))
|
||||
|
||||
err := mt.saveIndexMeta(&meta)
|
||||
if err != nil {
|
||||
@ -212,13 +218,13 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
log.Debug("indexservice", zap.Int64("mark index is deleted", indexID))
|
||||
log.Debug("IndexService metaTable MarkIndexAsDeleted ", zap.Int64("indexID", indexID))
|
||||
|
||||
for _, meta := range mt.indexBuildID2Meta {
|
||||
if meta.indexMeta.Req.IndexID == indexID {
|
||||
meta.indexMeta.MarkDeleted = true
|
||||
if err := mt.saveIndexMeta(&meta); err != nil {
|
||||
log.Debug("IndexService", zap.Any("Meta table mark deleted err", err.Error()))
|
||||
log.Debug("IndexService metaTable MarkIndexAsDeleted saveIndexMeta failed", zap.Error(err))
|
||||
fn := func() error {
|
||||
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
|
||||
if m == nil {
|
||||
@ -284,9 +290,7 @@ func (mt *metaTable) DeleteIndex(indexBuildID UniqueID) {
|
||||
key := "indexes/" + strconv.FormatInt(indexBuildID, 10)
|
||||
|
||||
err := mt.client.Remove(key)
|
||||
if err != nil {
|
||||
log.Debug("IndexService", zap.Any("Delete IndexMeta in etcd error", err))
|
||||
}
|
||||
log.Debug("IndexService metaTable DeleteIndex", zap.Error(err))
|
||||
}
|
||||
|
||||
func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
|
||||
@ -294,10 +298,16 @@ func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
meta, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
log.Debug("IndexService metaTable UpdateRecycleState", zap.Any("indexBuildID", indexBuildID),
|
||||
zap.Any("exists", ok))
|
||||
if !ok {
|
||||
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
|
||||
}
|
||||
|
||||
if meta.indexMeta.Recycled {
|
||||
return nil
|
||||
}
|
||||
|
||||
meta.indexMeta.Recycled = true
|
||||
if err := mt.saveIndexMeta(&meta); err != nil {
|
||||
fn := func() error {
|
||||
@ -311,6 +321,8 @@ func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
|
||||
}
|
||||
err2 := retry.Retry(5, time.Millisecond*200, fn)
|
||||
if err2 != nil {
|
||||
meta.indexMeta.Recycled = false
|
||||
log.Debug("IndexService metaTable UpdateRecycleState failed", zap.Error(err2))
|
||||
return err2
|
||||
}
|
||||
}
|
||||
@ -340,10 +352,8 @@ func (mt *metaTable) GetIndexMeta(indexBuildID UniqueID) Meta {
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
meta, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
if !ok {
|
||||
log.Debug("IndexService", zap.Any("Meta table does not have the meta with indexBuildID", indexBuildID))
|
||||
}
|
||||
|
||||
log.Debug("IndexService metaTable GetIndexMeta", zap.Any("indexBuildID", indexBuildID),
|
||||
zap.Any("exist", ok))
|
||||
return meta
|
||||
}
|
||||
|
||||
@ -371,51 +381,6 @@ func (mt *metaTable) GetUnassignedTasks(nodeIDs []int64) [][]UniqueID {
|
||||
return tasks
|
||||
}
|
||||
|
||||
func compare2Array(arr1, arr2 interface{}) bool {
|
||||
p1, ok := arr1.([]*commonpb.KeyValuePair)
|
||||
if ok {
|
||||
p2, ok1 := arr2.([]*commonpb.KeyValuePair)
|
||||
if ok1 {
|
||||
for _, param1 := range p1 {
|
||||
sameParams := false
|
||||
for _, param2 := range p2 {
|
||||
if param1.Key == param2.Key && param1.Value == param2.Value {
|
||||
sameParams = true
|
||||
}
|
||||
}
|
||||
if !sameParams {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
log.Error("indexservice", zap.Any("type error", "arr2 type should be commonpb.KeyValuePair"))
|
||||
return false
|
||||
}
|
||||
v1, ok2 := arr1.([]string)
|
||||
if ok2 {
|
||||
v2, ok3 := arr2.([]string)
|
||||
if ok3 {
|
||||
for _, s1 := range v1 {
|
||||
sameParams := false
|
||||
for _, s2 := range v2 {
|
||||
if s1 == s2 {
|
||||
sameParams = true
|
||||
}
|
||||
}
|
||||
if !sameParams {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
log.Error("indexservice", zap.Any("type error", "arr2 type should be string array"))
|
||||
return false
|
||||
}
|
||||
log.Error("indexservice", zap.Any("type error", "param type should be commonpb.KeyValuePair or string array"))
|
||||
return false
|
||||
}
|
||||
|
||||
func (mt *metaTable) HasSameReq(req *indexpb.BuildIndexRequest) (bool, UniqueID) {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
@ -442,9 +407,14 @@ LOOP:
|
||||
func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
meta, ok := mt.indexBuildID2Meta[indexBuildID]
|
||||
log.Debug("IndexService metaTable LoadMetaFromETCD", zap.Any("indexBuildID", indexBuildID),
|
||||
zap.Any("revision", revision), zap.Any("ok", ok))
|
||||
if ok {
|
||||
log.Debug("IndexService metaTable LoadMetaFromETCD",
|
||||
zap.Any("meta.revision", meta.revision),
|
||||
zap.Any("revision", revision))
|
||||
|
||||
if meta.revision >= revision {
|
||||
return false
|
||||
}
|
||||
@ -452,12 +422,12 @@ func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool {
|
||||
|
||||
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
|
||||
if m == nil {
|
||||
log.Debug("IndexService", zap.Any("Load meta from etcd error", err))
|
||||
log.Debug("IndexService metaTable reloadMeta failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
log.Debug("IndexService", zap.Any("IndexMeta", m))
|
||||
mt.indexBuildID2Meta[indexBuildID] = *m
|
||||
log.Debug("IndexService LoadMetaFromETCD success", zap.Any("IndexMeta", m))
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -11,7 +11,10 @@
|
||||
|
||||
package indexservice
|
||||
|
||||
import "github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
import (
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
)
|
||||
|
||||
func CompareAddress(a *commonpb.Address, b *commonpb.Address) bool {
|
||||
if a == b {
|
||||
@ -19,3 +22,48 @@ func CompareAddress(a *commonpb.Address, b *commonpb.Address) bool {
|
||||
}
|
||||
return a.Ip == b.Ip && a.Port == b.Port
|
||||
}
|
||||
|
||||
func compare2Array(arr1, arr2 interface{}) bool {
|
||||
p1, ok := arr1.([]*commonpb.KeyValuePair)
|
||||
if ok {
|
||||
p2, ok1 := arr2.([]*commonpb.KeyValuePair)
|
||||
if ok1 {
|
||||
for _, param1 := range p1 {
|
||||
sameParams := false
|
||||
for _, param2 := range p2 {
|
||||
if param1.Key == param2.Key && param1.Value == param2.Value {
|
||||
sameParams = true
|
||||
}
|
||||
}
|
||||
if !sameParams {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
log.Error("IndexService compare2Array arr2 should be commonpb.KeyValuePair")
|
||||
return false
|
||||
}
|
||||
v1, ok2 := arr1.([]string)
|
||||
if ok2 {
|
||||
v2, ok3 := arr2.([]string)
|
||||
if ok3 {
|
||||
for _, s1 := range v1 {
|
||||
sameParams := false
|
||||
for _, s2 := range v2 {
|
||||
if s1 == s2 {
|
||||
sameParams = true
|
||||
}
|
||||
}
|
||||
if !sameParams {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
log.Error("IndexService compare2Array arr2 type should be string array")
|
||||
return false
|
||||
}
|
||||
log.Error("IndexService compare2Array param type should be commonpb.KeyValuePair or string array")
|
||||
return false
|
||||
}
|
||||
|
@ -435,7 +435,7 @@ func (c *Core) startDataNodeFlushedSegmentLoop() {
|
||||
EnableIndex: false,
|
||||
}
|
||||
info.BuildID, err = c.BuildIndex(segID, fieldSch, idxInfo, true)
|
||||
if err == nil {
|
||||
if err == nil && info.BuildID != 0 {
|
||||
info.EnableIndex = true
|
||||
} else {
|
||||
log.Error("build index fail", zap.String("error", err.Error()))
|
||||
|
@ -697,6 +697,8 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
|
||||
}
|
||||
//TODO, get filed_id and index_name from request
|
||||
segIdxInfo, err := t.core.MetaTable.GetSegmentIndexInfoByID(t.Req.SegmentID, -1, "")
|
||||
log.Debug("MasterService DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID", zap.Any("SegmentID", t.Req.SegmentID),
|
||||
zap.Any("segIdxInfo", segIdxInfo), zap.Error(err))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -793,10 +795,14 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.BuildID != 0 {
|
||||
info.EnableIndex = true
|
||||
}
|
||||
segIdxInfos = append(segIdxInfos, &info)
|
||||
}
|
||||
|
||||
_, err = t.core.MetaTable.AddIndex(segIdxInfos, "", "")
|
||||
log.Debug("MasterService CreateIndexReq", zap.Any("segIdxInfos", segIdxInfos), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -297,6 +297,7 @@ func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("QueryNode IndexLoader setIndexInfo", zap.Any("Req", req), zap.Any("response", response))
|
||||
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return errors.New(response.Status.Reason)
|
||||
}
|
||||
|
@ -172,7 +172,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, segment *Segmen
|
||||
for _, vecFieldID := range vectorFieldIDs {
|
||||
err = s.segLoader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID)
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
log.Warn("QueryNode load_service", zap.Any("SegmentID", segment.segmentID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
loadIndexFieldIDs = append(loadIndexFieldIDs, vecFieldID)
|
||||
|
Loading…
Reference in New Issue
Block a user