From 45f500741050cfc2c651c33ac237b7b24fc29b9c Mon Sep 17 00:00:00 2001 From: SimFG Date: Fri, 30 Sep 2022 10:56:54 +0800 Subject: [PATCH] Improve the `DescribeIndex` RPC about the state and row num (#19528) Signed-off-by: SimFG Signed-off-by: SimFG --- internal/indexcoord/index_coord.go | 84 ++++++----- internal/indexcoord/index_coord_mock.go | 43 ++++++ internal/indexcoord/index_coord_test.go | 187 +++++++++++++++++++++--- internal/indexcoord/meta_table.go | 22 ++- internal/indexcoord/meta_table_test.go | 3 +- 5 files changed, 279 insertions(+), 60 deletions(-) diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index bc29507a14..1c0e6d81b2 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -508,7 +508,7 @@ func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexSta } for indexID, createTs := range indexID2CreateTs { - indexStates := i.metaTable.GetIndexStates(indexID, createTs) + indexStates, _ := i.metaTable.GetIndexStates(indexID, createTs) for _, state := range indexStates { if state.state != commonpb.IndexState_Finished { ret.State = state.state @@ -568,30 +568,36 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS return ret, nil } -// CompleteIndexInfo get the building index progress and index state +// completeIndexInfo get the building index progress and index state func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.IndexInfo) error { collectionID := indexInfo.CollectionID indexName := indexInfo.IndexName log.Info("IndexCoord completeIndexInfo", zap.Int64("collID", collectionID), zap.String("indexName", indexName)) - flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{ - CollectionID: collectionID, - PartitionID: -1, - }) - if err != nil { - return err - } - resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ - SegmentIDs: flushSegments.Segments, - }) - if err != nil { - return err - } - totalRows, indexRows := int64(0), int64(0) + calculateTotalRow := func() (int64, error) { + totalRows := int64(0) + flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{ + CollectionID: collectionID, + PartitionID: -1, + }) + if err != nil { + return totalRows, err + } - for _, seg := range resp.Infos { - totalRows += seg.NumOfRows + resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ + SegmentIDs: flushSegments.Segments, + }) + if err != nil { + return totalRows, err + } + + for _, seg := range resp.Infos { + if seg.State == commonpb.SegmentState_Flushed { + totalRows += seg.NumOfRows + } + } + return totalRows, nil } indexID2CreateTs := i.metaTable.GetIndexIDByName(collectionID, indexName) @@ -600,31 +606,37 @@ func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.I return nil } - for indexID, createTs := range indexID2CreateTs { - indexRows = i.metaTable.GetIndexBuildProgress(indexID, createTs) + var indexID int64 + var createTs uint64 + // the size of `indexID2CreateTs` map is one + // and we need to get key and value through the `for` statement + for k, v := range indexID2CreateTs { + indexID = k + createTs = v break } - indexInfo.IndexedRows = indexRows - indexInfo.TotalRows = totalRows - stateRes := commonpb.IndexState_Finished - failReasonRes := "" - for indexID, createTs := range indexID2CreateTs { - indexStates := i.metaTable.GetIndexStates(indexID, createTs) - for _, state := range indexStates { - if state.state != commonpb.IndexState_Finished { - stateRes = state.state - failReasonRes = state.failReason - break - } + indexStates, indexStateCnt := i.metaTable.GetIndexStates(indexID, createTs) + allCnt := len(indexStates) + switch { + case indexStateCnt.Failed > 0: + indexInfo.State = commonpb.IndexState_Failed + indexInfo.IndexStateFailReason = indexStateCnt.FailReason + case indexStateCnt.Finished == allCnt: + indexInfo.State = commonpb.IndexState_Finished + default: + indexInfo.State = commonpb.IndexState_InProgress + indexInfo.IndexedRows = i.metaTable.GetIndexBuildProgress(indexID, createTs) + totalRow, err := calculateTotalRow() + if err != nil { + return err } + indexInfo.TotalRows = totalRow } - indexInfo.State = stateRes - indexInfo.IndexStateFailReason = failReasonRes log.Debug("IndexCoord completeIndexInfo success", zap.Int64("collID", collectionID), - zap.Int64("totalRows", totalRows), zap.Int64("indexRows", indexRows), zap.Int("seg num", len(resp.Infos)), - zap.Any("state", stateRes), zap.String("failReason", failReasonRes)) + zap.Int64("totalRows", indexInfo.TotalRows), zap.Int64("indexRows", indexInfo.IndexedRows), + zap.Any("state", indexInfo.State), zap.String("failReason", indexInfo.IndexStateFailReason)) return nil } diff --git a/internal/indexcoord/index_coord_mock.go b/internal/indexcoord/index_coord_mock.go index a4c0e7cbca..3c66fb837e 100644 --- a/internal/indexcoord/index_coord_mock.go +++ b/internal/indexcoord/index_coord_mock.go @@ -483,6 +483,7 @@ type mockETCDKV struct { kv.MetaKv save func(string, string) error + load func(string) (string, error) remove func(string) error multiSave func(map[string]string) error watchWithRevision func(string, int64) clientv3.WatchChan @@ -499,6 +500,9 @@ func NewMockEtcdKV() *mockETCDKV { save: func(s string, s2 string) error { return nil }, + load: func(s string) (string, error) { + return "", nil + }, remove: func(s string) error { return nil }, @@ -523,6 +527,41 @@ func NewMockEtcdKV() *mockETCDKV { } } +func NewMockEtcdKVWithReal(real kv.MetaKv) *mockETCDKV { + return &mockETCDKV{ + save: func(s string, s2 string) error { + return real.Save(s, s2) + }, + load: func(s string) (string, error) { + return real.Load(s) + }, + remove: func(s string) error { + return real.Remove(s) + }, + multiSave: func(m map[string]string) error { + return real.MultiSave(m) + }, + loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) { + return real.LoadWithRevisionAndVersions(s) + }, + compareVersionAndSwap: func(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) { + return real.CompareVersionAndSwap(key, version, target, opts...) + }, + loadWithPrefix: func(key string) ([]string, []string, error) { + return real.LoadWithPrefix(key) + }, + loadWithPrefix2: func(key string) ([]string, []string, []int64, error) { + return real.LoadWithPrefix2(key) + }, + loadWithRevision: func(key string) ([]string, []string, int64, error) { + return real.LoadWithRevision(key) + }, + removeWithPrefix: func(key string) error { + return real.RemoveWithPrefix(key) + }, + } +} + func (mk *mockETCDKV) Save(key string, value string) error { return mk.save(key, value) } @@ -563,6 +602,10 @@ func (mk *mockETCDKV) RemoveWithPrefix(key string) error { return mk.removeWithPrefix(key) } +func (mk *mockETCDKV) Load(key string) (string, error) { + return mk.load(key) +} + type chunkManagerMock struct { storage.ChunkManager diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 5e76eaf3ac..a026aa7678 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -19,12 +19,15 @@ package indexcoord import ( "context" "errors" + "fmt" "math/rand" "path" "strconv" "testing" "time" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/api/commonpb" @@ -42,6 +45,51 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" ) +func TestMockEtcd(t *testing.T) { + Params.InitOnce() + Params.EtcdCfg.MetaRootPath = "indexcoord-mock" + + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + assert.NoError(t, err) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + + mockEtcd := NewMockEtcdKVWithReal(etcdKV) + key := "foo" + value := "foo-val" + err = mockEtcd.Save(key, value) + assert.NoError(t, err) + + fmt.Println(mockEtcd == nil) + loadVal, err := mockEtcd.Load(key) + assert.NoError(t, err) + assert.Equal(t, value, loadVal) + + _, _, err = mockEtcd.LoadWithPrefix(key) + assert.NoError(t, err) + + _, _, _, err = mockEtcd.LoadWithPrefix2(key) + assert.NoError(t, err) + + _, _, _, err = mockEtcd.LoadWithRevision(key) + assert.NoError(t, err) + + _, _, _, _, err = mockEtcd.LoadWithRevisionAndVersions(key) + assert.NoError(t, err) + + err = mockEtcd.MultiSave(map[string]string{ + "TestMockEtcd-1": "mock-val", + "TestMockEtcd-2": "mock-val", + }) + assert.NoError(t, err) + + err = mockEtcd.RemoveWithPrefix("TestMockEtcd-") + assert.NoError(t, err) + + err = mockEtcd.Remove(key) + assert.NoError(t, err) + +} + func testIndexCoord(t *testing.T) { ctx := context.Background() Params.EtcdCfg.MetaRootPath = "indexcoord-ut" @@ -122,6 +170,10 @@ func testIndexCoord(t *testing.T) { err = ic.Init() assert.NoError(t, err) + mockKv := NewMockEtcdKVWithReal(ic.etcdKV) + ic.metaTable, err = NewMetaTable(mockKv) + assert.NoError(t, err) + err = ic.Register() assert.NoError(t, err) @@ -192,6 +244,120 @@ func testIndexCoord(t *testing.T) { assert.Equal(t, len(req.SegmentIDs), len(resp.SegmentInfo)) }) + getReq := func() *indexpb.DescribeIndexRequest { + return &indexpb.DescribeIndexRequest{ + CollectionID: collID, + IndexName: indexName, + } + } + + t.Run("DescribeIndex NotExist", func(t *testing.T) { + indexs := ic.metaTable.collectionIndexes + ic.metaTable.collectionIndexes = make(map[UniqueID]map[UniqueID]*model.Index) + defer func() { + fmt.Println("simfg fubang") + ic.metaTable.collectionIndexes = indexs + }() + + resp, err := ic.DescribeIndex(ctx, getReq()) + assert.NoError(t, err) + assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_IndexNotExist) + }) + + t.Run("DescribeIndex State", func(t *testing.T) { + req := getReq() + res := ic.metaTable.GetIndexIDByName(collID, indexName) + var indexIDTest int64 + for k := range res { + indexIDTest = k + break + } + + indexs := ic.metaTable.segmentIndexes + mockIndexs := make(map[UniqueID]map[UniqueID]*model.SegmentIndex) + progressIndex := &model.SegmentIndex{ + IndexState: commonpb.IndexState_InProgress, + } + failedIndex := &model.SegmentIndex{ + IndexState: commonpb.IndexState_Failed, + SegmentID: 333, + FailReason: "mock fail", + } + finishedIndex := &model.SegmentIndex{ + IndexState: commonpb.IndexState_Finished, + NumRows: 2048, + } + ic.metaTable.segmentIndexes = mockIndexs + defer func() { + ic.metaTable.segmentIndexes = indexs + }() + + mockIndexs[111] = make(map[UniqueID]*model.SegmentIndex) + mockIndexs[111][indexIDTest] = finishedIndex + + resp, err := ic.DescribeIndex(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, commonpb.IndexState_Finished, resp.IndexInfos[0].State) + + originFunc1 := dcm.CallGetFlushedSegment + originFunc2 := dcm.CallGetSegmentInfo + defer func() { + dcm.CallGetFlushedSegment = originFunc1 + dcm.SetFunc(func() { + dcm.CallGetSegmentInfo = originFunc2 + }) + }() + dcm.CallGetFlushedSegment = func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { + return nil, errors.New("mock error") + } + + mockIndexs[222] = make(map[UniqueID]*model.SegmentIndex) + mockIndexs[222][indexIDTest] = progressIndex + resp, err = ic.DescribeIndex(ctx, req) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + dcm.CallGetFlushedSegment = func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { + return &datapb.GetFlushedSegmentsResponse{ + Segments: []int64{111, 222, 333}, + }, nil + } + dcm.SetFunc(func() { + dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + return nil, errors.New("mock error") + } + }) + resp, err = ic.DescribeIndex(ctx, req) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + dcm.SetFunc(func() { + dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + return &datapb.GetSegmentInfoResponse{ + Infos: []*datapb.SegmentInfo{ + {State: commonpb.SegmentState_Flushed, NumOfRows: 2048}, + {State: commonpb.SegmentState_Flushed, NumOfRows: 2048}, + {State: commonpb.SegmentState_Flushed, NumOfRows: 2048}, + }, + }, nil + } + }) + resp, err = ic.DescribeIndex(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, commonpb.IndexState_InProgress, resp.IndexInfos[0].State) + assert.Equal(t, int64(2048), resp.IndexInfos[0].IndexedRows) + assert.Equal(t, int64(2048*3), resp.IndexInfos[0].TotalRows) + + mockIndexs[333] = make(map[UniqueID]*model.SegmentIndex) + mockIndexs[333][indexIDTest] = failedIndex + resp, err = ic.DescribeIndex(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, commonpb.IndexState_Failed, resp.IndexInfos[0].State) + }) + t.Run("DescribeIndex", func(t *testing.T) { req := &indexpb.DescribeIndexRequest{ CollectionID: collID, @@ -200,27 +366,6 @@ func testIndexCoord(t *testing.T) { resp, err := ic.DescribeIndex(ctx, req) assert.NoError(t, err) assert.Equal(t, 1, len(resp.IndexInfos)) - - originFunc1 := dcm.CallGetFlushedSegment - originFunc2 := dcm.CallGetSegmentInfo - dcm.CallGetFlushedSegment = func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { - return nil, errors.New("mock error") - } - resp, err = ic.DescribeIndex(ctx, req) - assert.NoError(t, err) - assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_UnexpectedError) - dcm.CallGetFlushedSegment = originFunc1 - dcm.SetFunc(func() { - dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { - return nil, errors.New("mock error") - } - }) - resp, err = ic.DescribeIndex(ctx, req) - assert.NoError(t, err) - assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_UnexpectedError) - dcm.SetFunc(func() { - dcm.CallGetSegmentInfo = originFunc2 - }) }) t.Run("FlushedSegmentWatcher", func(t *testing.T) { diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index 0cade2e210..5222fda4ce 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -585,8 +585,17 @@ type IndexState struct { failReason string } +type IndexStateCnt struct { + None int + Unissued int + InProgress int + Finished int + Failed int + FailReason string +} + // GetIndexStates gets the index states for indexID from meta table. -func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) []*IndexState { +func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) ([]*IndexState, IndexStateCnt) { mt.segmentIndexLock.RLock() defer mt.segmentIndexLock.RUnlock() @@ -597,6 +606,7 @@ func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) []*IndexStat cntInProgress = 0 cntFinished = 0 cntFailed = 0 + failReason string ) for _, indexID2SegIdx := range mt.segmentIndexes { @@ -622,6 +632,7 @@ func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) []*IndexStat cntFinished++ case commonpb.IndexState_Failed: cntFailed++ + failReason += fmt.Sprintf("%d: %s;", segIdx.SegmentID, segIdx.FailReason) } segIndexStates = append(segIndexStates, &IndexState{ state: segIdx.IndexState, @@ -633,7 +644,14 @@ func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) []*IndexStat zap.Int("total", len(segIndexStates)), zap.Int("None", cntNone), zap.Int("Unissued", cntUnissued), zap.Int("InProgress", cntInProgress), zap.Int("Finished", cntFinished), zap.Int("Failed", cntFailed)) - return segIndexStates + return segIndexStates, IndexStateCnt{ + None: cntNone, + Unissued: cntNone, + InProgress: cntInProgress, + Finished: cntFinished, + Failed: cntFailed, + FailReason: failReason, + } } func (mt *metaTable) GetSegmentIndexes(segID UniqueID) []*model.SegmentIndex { diff --git a/internal/indexcoord/meta_table_test.go b/internal/indexcoord/meta_table_test.go index fb482bf69a..9b7f17bcf2 100644 --- a/internal/indexcoord/meta_table_test.go +++ b/internal/indexcoord/meta_table_test.go @@ -626,8 +626,9 @@ func TestMetaTable_GetIndexNameByID(t *testing.T) { func TestMetaTable_GetIndexStates(t *testing.T) { mt := constructMetaTable(&indexcoord.Catalog{}) - states := mt.GetIndexStates(indexID, 11) + states, stateCnt := mt.GetIndexStates(indexID, 11) assert.Equal(t, 1, len(states)) + assert.Equal(t, 1, stateCnt.Finished) } func TestMetaTable_GetSegmentIndexes(t *testing.T) {