Improve the DescribeIndex RPC about the state and row num (#19528)

Signed-off-by: SimFG <bang.fu@zilliz.com>

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2022-09-30 10:56:54 +08:00 committed by GitHub
parent 2599a3ece0
commit 45f5007410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 279 additions and 60 deletions

View File

@ -508,7 +508,7 @@ func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexSta
}
for indexID, createTs := range indexID2CreateTs {
indexStates := i.metaTable.GetIndexStates(indexID, createTs)
indexStates, _ := i.metaTable.GetIndexStates(indexID, createTs)
for _, state := range indexStates {
if state.state != commonpb.IndexState_Finished {
ret.State = state.state
@ -568,30 +568,36 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS
return ret, nil
}
// CompleteIndexInfo get the building index progress and index state
// completeIndexInfo get the building index progress and index state
func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.IndexInfo) error {
collectionID := indexInfo.CollectionID
indexName := indexInfo.IndexName
log.Info("IndexCoord completeIndexInfo", zap.Int64("collID", collectionID),
zap.String("indexName", indexName))
flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{
CollectionID: collectionID,
PartitionID: -1,
})
if err != nil {
return err
}
resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
SegmentIDs: flushSegments.Segments,
})
if err != nil {
return err
}
totalRows, indexRows := int64(0), int64(0)
calculateTotalRow := func() (int64, error) {
totalRows := int64(0)
flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{
CollectionID: collectionID,
PartitionID: -1,
})
if err != nil {
return totalRows, err
}
for _, seg := range resp.Infos {
totalRows += seg.NumOfRows
resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
SegmentIDs: flushSegments.Segments,
})
if err != nil {
return totalRows, err
}
for _, seg := range resp.Infos {
if seg.State == commonpb.SegmentState_Flushed {
totalRows += seg.NumOfRows
}
}
return totalRows, nil
}
indexID2CreateTs := i.metaTable.GetIndexIDByName(collectionID, indexName)
@ -600,31 +606,37 @@ func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.I
return nil
}
for indexID, createTs := range indexID2CreateTs {
indexRows = i.metaTable.GetIndexBuildProgress(indexID, createTs)
var indexID int64
var createTs uint64
// the size of `indexID2CreateTs` map is one
// and we need to get key and value through the `for` statement
for k, v := range indexID2CreateTs {
indexID = k
createTs = v
break
}
indexInfo.IndexedRows = indexRows
indexInfo.TotalRows = totalRows
stateRes := commonpb.IndexState_Finished
failReasonRes := ""
for indexID, createTs := range indexID2CreateTs {
indexStates := i.metaTable.GetIndexStates(indexID, createTs)
for _, state := range indexStates {
if state.state != commonpb.IndexState_Finished {
stateRes = state.state
failReasonRes = state.failReason
break
}
indexStates, indexStateCnt := i.metaTable.GetIndexStates(indexID, createTs)
allCnt := len(indexStates)
switch {
case indexStateCnt.Failed > 0:
indexInfo.State = commonpb.IndexState_Failed
indexInfo.IndexStateFailReason = indexStateCnt.FailReason
case indexStateCnt.Finished == allCnt:
indexInfo.State = commonpb.IndexState_Finished
default:
indexInfo.State = commonpb.IndexState_InProgress
indexInfo.IndexedRows = i.metaTable.GetIndexBuildProgress(indexID, createTs)
totalRow, err := calculateTotalRow()
if err != nil {
return err
}
indexInfo.TotalRows = totalRow
}
indexInfo.State = stateRes
indexInfo.IndexStateFailReason = failReasonRes
log.Debug("IndexCoord completeIndexInfo success", zap.Int64("collID", collectionID),
zap.Int64("totalRows", totalRows), zap.Int64("indexRows", indexRows), zap.Int("seg num", len(resp.Infos)),
zap.Any("state", stateRes), zap.String("failReason", failReasonRes))
zap.Int64("totalRows", indexInfo.TotalRows), zap.Int64("indexRows", indexInfo.IndexedRows),
zap.Any("state", indexInfo.State), zap.String("failReason", indexInfo.IndexStateFailReason))
return nil
}

View File

@ -483,6 +483,7 @@ type mockETCDKV struct {
kv.MetaKv
save func(string, string) error
load func(string) (string, error)
remove func(string) error
multiSave func(map[string]string) error
watchWithRevision func(string, int64) clientv3.WatchChan
@ -499,6 +500,9 @@ func NewMockEtcdKV() *mockETCDKV {
save: func(s string, s2 string) error {
return nil
},
load: func(s string) (string, error) {
return "", nil
},
remove: func(s string) error {
return nil
},
@ -523,6 +527,41 @@ func NewMockEtcdKV() *mockETCDKV {
}
}
func NewMockEtcdKVWithReal(real kv.MetaKv) *mockETCDKV {
return &mockETCDKV{
save: func(s string, s2 string) error {
return real.Save(s, s2)
},
load: func(s string) (string, error) {
return real.Load(s)
},
remove: func(s string) error {
return real.Remove(s)
},
multiSave: func(m map[string]string) error {
return real.MultiSave(m)
},
loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) {
return real.LoadWithRevisionAndVersions(s)
},
compareVersionAndSwap: func(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) {
return real.CompareVersionAndSwap(key, version, target, opts...)
},
loadWithPrefix: func(key string) ([]string, []string, error) {
return real.LoadWithPrefix(key)
},
loadWithPrefix2: func(key string) ([]string, []string, []int64, error) {
return real.LoadWithPrefix2(key)
},
loadWithRevision: func(key string) ([]string, []string, int64, error) {
return real.LoadWithRevision(key)
},
removeWithPrefix: func(key string) error {
return real.RemoveWithPrefix(key)
},
}
}
func (mk *mockETCDKV) Save(key string, value string) error {
return mk.save(key, value)
}
@ -563,6 +602,10 @@ func (mk *mockETCDKV) RemoveWithPrefix(key string) error {
return mk.removeWithPrefix(key)
}
func (mk *mockETCDKV) Load(key string) (string, error) {
return mk.load(key)
}
type chunkManagerMock struct {
storage.ChunkManager

View File

@ -19,12 +19,15 @@ package indexcoord
import (
"context"
"errors"
"fmt"
"math/rand"
"path"
"strconv"
"testing"
"time"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/api/commonpb"
@ -42,6 +45,51 @@ import (
"github.com/milvus-io/milvus/internal/util/sessionutil"
)
func TestMockEtcd(t *testing.T) {
Params.InitOnce()
Params.EtcdCfg.MetaRootPath = "indexcoord-mock"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
assert.NoError(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
mockEtcd := NewMockEtcdKVWithReal(etcdKV)
key := "foo"
value := "foo-val"
err = mockEtcd.Save(key, value)
assert.NoError(t, err)
fmt.Println(mockEtcd == nil)
loadVal, err := mockEtcd.Load(key)
assert.NoError(t, err)
assert.Equal(t, value, loadVal)
_, _, err = mockEtcd.LoadWithPrefix(key)
assert.NoError(t, err)
_, _, _, err = mockEtcd.LoadWithPrefix2(key)
assert.NoError(t, err)
_, _, _, err = mockEtcd.LoadWithRevision(key)
assert.NoError(t, err)
_, _, _, _, err = mockEtcd.LoadWithRevisionAndVersions(key)
assert.NoError(t, err)
err = mockEtcd.MultiSave(map[string]string{
"TestMockEtcd-1": "mock-val",
"TestMockEtcd-2": "mock-val",
})
assert.NoError(t, err)
err = mockEtcd.RemoveWithPrefix("TestMockEtcd-")
assert.NoError(t, err)
err = mockEtcd.Remove(key)
assert.NoError(t, err)
}
func testIndexCoord(t *testing.T) {
ctx := context.Background()
Params.EtcdCfg.MetaRootPath = "indexcoord-ut"
@ -122,6 +170,10 @@ func testIndexCoord(t *testing.T) {
err = ic.Init()
assert.NoError(t, err)
mockKv := NewMockEtcdKVWithReal(ic.etcdKV)
ic.metaTable, err = NewMetaTable(mockKv)
assert.NoError(t, err)
err = ic.Register()
assert.NoError(t, err)
@ -192,6 +244,120 @@ func testIndexCoord(t *testing.T) {
assert.Equal(t, len(req.SegmentIDs), len(resp.SegmentInfo))
})
getReq := func() *indexpb.DescribeIndexRequest {
return &indexpb.DescribeIndexRequest{
CollectionID: collID,
IndexName: indexName,
}
}
t.Run("DescribeIndex NotExist", func(t *testing.T) {
indexs := ic.metaTable.collectionIndexes
ic.metaTable.collectionIndexes = make(map[UniqueID]map[UniqueID]*model.Index)
defer func() {
fmt.Println("simfg fubang")
ic.metaTable.collectionIndexes = indexs
}()
resp, err := ic.DescribeIndex(ctx, getReq())
assert.NoError(t, err)
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_IndexNotExist)
})
t.Run("DescribeIndex State", func(t *testing.T) {
req := getReq()
res := ic.metaTable.GetIndexIDByName(collID, indexName)
var indexIDTest int64
for k := range res {
indexIDTest = k
break
}
indexs := ic.metaTable.segmentIndexes
mockIndexs := make(map[UniqueID]map[UniqueID]*model.SegmentIndex)
progressIndex := &model.SegmentIndex{
IndexState: commonpb.IndexState_InProgress,
}
failedIndex := &model.SegmentIndex{
IndexState: commonpb.IndexState_Failed,
SegmentID: 333,
FailReason: "mock fail",
}
finishedIndex := &model.SegmentIndex{
IndexState: commonpb.IndexState_Finished,
NumRows: 2048,
}
ic.metaTable.segmentIndexes = mockIndexs
defer func() {
ic.metaTable.segmentIndexes = indexs
}()
mockIndexs[111] = make(map[UniqueID]*model.SegmentIndex)
mockIndexs[111][indexIDTest] = finishedIndex
resp, err := ic.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, commonpb.IndexState_Finished, resp.IndexInfos[0].State)
originFunc1 := dcm.CallGetFlushedSegment
originFunc2 := dcm.CallGetSegmentInfo
defer func() {
dcm.CallGetFlushedSegment = originFunc1
dcm.SetFunc(func() {
dcm.CallGetSegmentInfo = originFunc2
})
}()
dcm.CallGetFlushedSegment = func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
return nil, errors.New("mock error")
}
mockIndexs[222] = make(map[UniqueID]*model.SegmentIndex)
mockIndexs[222][indexIDTest] = progressIndex
resp, err = ic.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
dcm.CallGetFlushedSegment = func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
return &datapb.GetFlushedSegmentsResponse{
Segments: []int64{111, 222, 333},
}, nil
}
dcm.SetFunc(func() {
dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return nil, errors.New("mock error")
}
})
resp, err = ic.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
dcm.SetFunc(func() {
dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{
{State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
{State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
{State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
},
}, nil
}
})
resp, err = ic.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, commonpb.IndexState_InProgress, resp.IndexInfos[0].State)
assert.Equal(t, int64(2048), resp.IndexInfos[0].IndexedRows)
assert.Equal(t, int64(2048*3), resp.IndexInfos[0].TotalRows)
mockIndexs[333] = make(map[UniqueID]*model.SegmentIndex)
mockIndexs[333][indexIDTest] = failedIndex
resp, err = ic.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, commonpb.IndexState_Failed, resp.IndexInfos[0].State)
})
t.Run("DescribeIndex", func(t *testing.T) {
req := &indexpb.DescribeIndexRequest{
CollectionID: collID,
@ -200,27 +366,6 @@ func testIndexCoord(t *testing.T) {
resp, err := ic.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.Equal(t, 1, len(resp.IndexInfos))
originFunc1 := dcm.CallGetFlushedSegment
originFunc2 := dcm.CallGetSegmentInfo
dcm.CallGetFlushedSegment = func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
return nil, errors.New("mock error")
}
resp, err = ic.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_UnexpectedError)
dcm.CallGetFlushedSegment = originFunc1
dcm.SetFunc(func() {
dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return nil, errors.New("mock error")
}
})
resp, err = ic.DescribeIndex(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_UnexpectedError)
dcm.SetFunc(func() {
dcm.CallGetSegmentInfo = originFunc2
})
})
t.Run("FlushedSegmentWatcher", func(t *testing.T) {

View File

@ -585,8 +585,17 @@ type IndexState struct {
failReason string
}
type IndexStateCnt struct {
None int
Unissued int
InProgress int
Finished int
Failed int
FailReason string
}
// GetIndexStates gets the index states for indexID from meta table.
func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) []*IndexState {
func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) ([]*IndexState, IndexStateCnt) {
mt.segmentIndexLock.RLock()
defer mt.segmentIndexLock.RUnlock()
@ -597,6 +606,7 @@ func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) []*IndexStat
cntInProgress = 0
cntFinished = 0
cntFailed = 0
failReason string
)
for _, indexID2SegIdx := range mt.segmentIndexes {
@ -622,6 +632,7 @@ func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) []*IndexStat
cntFinished++
case commonpb.IndexState_Failed:
cntFailed++
failReason += fmt.Sprintf("%d: %s;", segIdx.SegmentID, segIdx.FailReason)
}
segIndexStates = append(segIndexStates, &IndexState{
state: segIdx.IndexState,
@ -633,7 +644,14 @@ func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) []*IndexStat
zap.Int("total", len(segIndexStates)), zap.Int("None", cntNone), zap.Int("Unissued", cntUnissued),
zap.Int("InProgress", cntInProgress), zap.Int("Finished", cntFinished), zap.Int("Failed", cntFailed))
return segIndexStates
return segIndexStates, IndexStateCnt{
None: cntNone,
Unissued: cntNone,
InProgress: cntInProgress,
Finished: cntFinished,
Failed: cntFailed,
FailReason: failReason,
}
}
func (mt *metaTable) GetSegmentIndexes(segID UniqueID) []*model.SegmentIndex {

View File

@ -626,8 +626,9 @@ func TestMetaTable_GetIndexNameByID(t *testing.T) {
func TestMetaTable_GetIndexStates(t *testing.T) {
mt := constructMetaTable(&indexcoord.Catalog{})
states := mt.GetIndexStates(indexID, 11)
states, stateCnt := mt.GetIndexStates(indexID, 11)
assert.Equal(t, 1, len(states))
assert.Equal(t, 1, stateCnt.Finished)
}
func TestMetaTable_GetSegmentIndexes(t *testing.T) {