From 94f09662254f24b64c030dd950ab1cb7aa0cf55b Mon Sep 17 00:00:00 2001 From: godchen Date: Wed, 28 Apr 2021 11:15:28 +0800 Subject: [PATCH] Add get index build progress interface implementation (#5067) Add get index build progress interface implementation. Signed-off-by: godchen --- internal/proxynode/impl.go | 53 ++++++++- internal/proxynode/task.go | 232 +++++++++++++++++++++++++++++++++++-- 2 files changed, 276 insertions(+), 9 deletions(-) diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 3885a0b96b..5fb2308562 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -886,7 +886,58 @@ func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropInde // GetIndexBuildProgress gets index build progress with filed_name and index_name. // IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows. func (node *ProxyNode) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) { - return nil, nil + gibpt := &GetIndexBuildProgressTask{ + ctx: ctx, + Condition: NewTaskCondition(ctx), + GetIndexBuildProgressRequest: request, + indexService: node.indexService, + masterService: node.masterService, + dataService: node.dataService, + } + + err := node.sched.DdQueue.Enqueue(gibpt) + if err != nil { + return &milvuspb.GetIndexBuildProgressResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + }, nil + } + + log.Debug("GetIndexBuildProgress", + zap.String("role", Params.RoleName), + zap.Int64("msgID", request.Base.MsgID), + zap.Uint64("timestamp", request.Base.Timestamp), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("field", request.FieldName), + zap.String("index name", request.IndexName)) + defer func() { + log.Debug("GetIndexBuildProgress Done", + zap.Error(err), + zap.String("role", Params.RoleName), + zap.Int64("msgID", request.Base.MsgID), + zap.Uint64("timestamp", request.Base.Timestamp), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("field", request.FieldName), + zap.String("index name", request.IndexName)) + }() + + err = gibpt.WaitToFinish() + if err != nil { + return &milvuspb.GetIndexBuildProgressResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + }, nil + } + log.Debug("progress", zap.Any("result", gibpt.result)) + log.Debug("progress", zap.Any("status", gibpt.result.Status)) + + return gibpt.result, nil } func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) { diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index ad0b542879..e6f2e8f5e3 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -57,6 +57,7 @@ const ( DescribeIndexTaskName = "DescribeIndexTask" DropIndexTaskName = "DropIndexTask" GetIndexStateTaskName = "GetIndexStateTask" + GetIndexBuildProgressTaskName = "GetIndexBuildProgressTask" FlushTaskName = "FlushTask" LoadCollectionTaskName = "LoadCollectionTask" ReleaseCollectionTaskName = "ReleaseCollectionTask" @@ -1723,6 +1724,228 @@ func (dit *DropIndexTask) PostExecute(ctx context.Context) error { return nil } +type GetIndexBuildProgressTask struct { + Condition + *milvuspb.GetIndexBuildProgressRequest + ctx context.Context + indexService types.IndexService + masterService types.MasterService + dataService types.DataService + result *milvuspb.GetIndexBuildProgressResponse +} + +func (gibpt *GetIndexBuildProgressTask) TraceCtx() context.Context { + return gibpt.ctx +} + +func (gibpt *GetIndexBuildProgressTask) ID() UniqueID { + return gibpt.Base.MsgID +} + +func (gibpt *GetIndexBuildProgressTask) SetID(uid UniqueID) { + gibpt.Base.MsgID = uid +} + +func (gibpt *GetIndexBuildProgressTask) Name() string { + return GetIndexBuildProgressTaskName +} + +func (gibpt *GetIndexBuildProgressTask) Type() commonpb.MsgType { + return gibpt.Base.MsgType +} + +func (gibpt *GetIndexBuildProgressTask) BeginTs() Timestamp { + return gibpt.Base.Timestamp +} + +func (gibpt *GetIndexBuildProgressTask) EndTs() Timestamp { + return gibpt.Base.Timestamp +} + +func (gibpt *GetIndexBuildProgressTask) SetTs(ts Timestamp) { + gibpt.Base.Timestamp = ts +} + +func (gibpt *GetIndexBuildProgressTask) OnEnqueue() error { + gibpt.Base = &commonpb.MsgBase{} + return nil +} + +func (gibpt *GetIndexBuildProgressTask) PreExecute(ctx context.Context) error { + gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress + gibpt.Base.SourceID = Params.ProxyID + + if err := ValidateCollectionName(gibpt.CollectionName); err != nil { + return err + } + + return nil +} + +func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error { + collectionName := gibpt.CollectionName + collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) + if err != nil { // err is not nil if collection not exists + return err + } + + showPartitionRequest := &milvuspb.ShowPartitionsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowPartitions, + MsgID: gibpt.Base.MsgID, + Timestamp: gibpt.Base.Timestamp, + SourceID: Params.ProxyID, + }, + DbName: gibpt.DbName, + CollectionName: collectionName, + CollectionID: collectionID, + } + partitions, err := gibpt.masterService.ShowPartitions(ctx, showPartitionRequest) + if err != nil { + return err + } + + if gibpt.IndexName == "" { + gibpt.IndexName = Params.DefaultIndexName + } + + describeIndexReq := milvuspb.DescribeIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeIndex, + MsgID: gibpt.Base.MsgID, + Timestamp: gibpt.Base.Timestamp, + SourceID: Params.ProxyID, + }, + DbName: gibpt.DbName, + CollectionName: gibpt.CollectionName, + // IndexName: gibpt.IndexName, + } + + indexDescriptionResp, err2 := gibpt.masterService.DescribeIndex(ctx, &describeIndexReq) + if err2 != nil { + return err2 + } + + matchIndexID := int64(-1) + foundIndexID := false + for _, desc := range indexDescriptionResp.IndexDescriptions { + if desc.IndexName == gibpt.IndexName { + matchIndexID = desc.IndexID + foundIndexID = true + break + } + } + if !foundIndexID { + return errors.New(fmt.Sprint("Can't found IndexID for indexName", gibpt.IndexName)) + } + + var allSegmentIDs []UniqueID + for _, partitionID := range partitions.PartitionIDs { + showSegmentsRequest := &milvuspb.ShowSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowSegments, + MsgID: gibpt.Base.MsgID, + Timestamp: gibpt.Base.Timestamp, + SourceID: Params.ProxyID, + }, + CollectionID: collectionID, + PartitionID: partitionID, + } + segments, err := gibpt.masterService.ShowSegments(ctx, showSegmentsRequest) + if err != nil { + return err + } + if segments.Status.ErrorCode != commonpb.ErrorCode_Success { + return errors.New(segments.Status.Reason) + } + allSegmentIDs = append(allSegmentIDs, segments.SegmentIDs...) + } + + getIndexStatesRequest := &indexpb.GetIndexStatesRequest{ + IndexBuildIDs: make([]UniqueID, 0), + } + + buildIndexMap := make(map[int64]int64) + for _, segmentID := range allSegmentIDs { + describeSegmentRequest := &milvuspb.DescribeSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeSegment, + MsgID: gibpt.Base.MsgID, + Timestamp: gibpt.Base.Timestamp, + SourceID: Params.ProxyID, + }, + CollectionID: collectionID, + SegmentID: segmentID, + } + segmentDesc, err := gibpt.masterService.DescribeSegment(ctx, describeSegmentRequest) + if err != nil { + return err + } + if segmentDesc.IndexID == matchIndexID { + if segmentDesc.EnableIndex { + getIndexStatesRequest.IndexBuildIDs = append(getIndexStatesRequest.IndexBuildIDs, segmentDesc.BuildID) + buildIndexMap[segmentID] = segmentDesc.BuildID + } + } + } + + states, err := gibpt.indexService.GetIndexStates(ctx, getIndexStatesRequest) + if err != nil { + return err + } + + if states.Status.ErrorCode != commonpb.ErrorCode_Success { + gibpt.result = &milvuspb.GetIndexBuildProgressResponse{ + Status: states.Status, + } + } + + buildFinishMap := make(map[int64]bool) + for _, state := range states.States { + if state.State == commonpb.IndexState_Finished { + buildFinishMap[state.IndexBuildID] = true + } + } + + infoResp, err := gibpt.dataService.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + MsgID: 0, + Timestamp: 0, + SourceID: Params.ProxyID, + }, + SegmentIDs: allSegmentIDs, + }) + if err != nil { + return err + } + + total := int64(0) + indexed := int64(0) + + for _, info := range infoResp.Infos { + total += info.NumRows + if buildFinishMap[buildIndexMap[info.ID]] { + indexed += info.NumRows + } + } + + gibpt.result = &milvuspb.GetIndexBuildProgressResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + TotalRows: total, + IndexedRows: indexed, + } + + return nil +} + +func (gibpt *GetIndexBuildProgressTask) PostExecute(ctx context.Context) error { + return nil +} + type GetIndexStateTask struct { Condition *milvuspb.GetIndexStateRequest @@ -1773,13 +1996,7 @@ func (gist *GetIndexStateTask) PreExecute(ctx context.Context) error { gist.Base.MsgType = commonpb.MsgType_GetIndexState gist.Base.SourceID = Params.ProxyID - collName, fieldName := gist.CollectionName, gist.FieldName - - if err := ValidateCollectionName(collName); err != nil { - return err - } - - if err := ValidateFieldName(fieldName); err != nil { + if err := ValidateCollectionName(gist.CollectionName); err != nil { return err } @@ -1822,7 +2039,6 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error { }, DbName: gist.DbName, CollectionName: gist.CollectionName, - FieldName: gist.FieldName, IndexName: gist.IndexName, }