From 1c489a85b26d1834f88f87b32ce2f34fa154911b Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 23 Sep 2022 09:58:51 +0800 Subject: [PATCH] Add ut for IndexCoord (#19349) Signed-off-by: cai.zhang Signed-off-by: cai.zhang --- internal/indexcoord/garbage_collector.go | 6 - internal/indexcoord/garbage_collector_test.go | 383 +++++++++++- internal/indexcoord/index_builder.go | 60 +- internal/indexcoord/index_builder_test.go | 563 ++++++++++++++++-- internal/indexcoord/meta_table.go | 21 +- internal/indexcoord/task_state_test.go | 32 + 6 files changed, 925 insertions(+), 140 deletions(-) create mode 100644 internal/indexcoord/task_state_test.go diff --git a/internal/indexcoord/garbage_collector.go b/internal/indexcoord/garbage_collector.go index b669960044..0136bc4aa5 100644 --- a/internal/indexcoord/garbage_collector.go +++ b/internal/indexcoord/garbage_collector.go @@ -106,9 +106,6 @@ func (gc *garbageCollector) recycleUnusedIndexes() { // wait for releasing reference lock continue } - if !gc.metaTable.IsExpire(segIdx.BuildID) { - continue - } if err := gc.metaTable.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID); err != nil { log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID), zap.Int64("nodeID", segIdx.NodeID), zap.Error(err)) @@ -173,9 +170,6 @@ func (gc *garbageCollector) recycleSegIndexesMeta() { // wait for releasing reference lock continue } - if !gc.metaTable.IsExpire(meta.BuildID) { - continue - } if err := gc.metaTable.RemoveSegmentIndex(meta.CollectionID, meta.PartitionID, meta.SegmentID, meta.BuildID); err != nil { log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", meta.BuildID), zap.Int64("nodeID", meta.NodeID), zap.Error(err)) diff --git a/internal/indexcoord/garbage_collector_test.go b/internal/indexcoord/garbage_collector_test.go index 14f9e97bd6..4149ef05fe 100644 --- a/internal/indexcoord/garbage_collector_test.go +++ b/internal/indexcoord/garbage_collector_test.go @@ -18,37 +18,412 @@ package indexcoord import ( "context" + "errors" "fmt" + "strconv" "sync" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/api/commonpb" + "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/indexcoord" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/datapb" ) -func TestGarbageCollector_Start(t *testing.T) { - gc := newGarbageCollector(context.Background(), &metaTable{}, &chunkManagerMock{ +func createGarbageCollectorMetaTable(catalog metastore.IndexCoordCatalog) *metaTable { + return &metaTable{ + catalog: catalog, + indexLock: sync.RWMutex{}, + segmentIndexLock: sync.RWMutex{}, + collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{ + collID: { + indexID: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: indexName, + IsDeleted: true, + CreateTime: 1, + TypeParams: nil, + IndexParams: nil, + }, + indexID + 1: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "indexName2", + IsDeleted: true, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + }, + indexID + 2: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 2, + IndexID: indexID + 2, + IndexName: "indexName3", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + }, + }, + }, + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ + segID: { + indexID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 1, + IndexState: 3, + FailReason: "", + IsDeleted: false, + CreateTime: 1, + IndexFilePaths: nil, + IndexSize: 100, + WriteHandoff: false, + }, + }, + segID + 1: { + indexID: { + SegmentID: segID + 1, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID + 1, + NodeID: 0, + IndexVersion: 1, + IndexState: 2, + FailReason: "", + IsDeleted: false, + CreateTime: 1, + IndexFilePaths: nil, + IndexSize: 100, + WriteHandoff: false, + }, + }, + segID + 2: { + indexID + 2: { + SegmentID: segID + 2, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 2, + BuildID: buildID + 2, + NodeID: 0, + IndexVersion: 1, + IndexState: 1, + FailReason: "", + IsDeleted: true, + CreateTime: 1, + IndexFilePaths: nil, + IndexSize: 0, + WriteHandoff: false, + }, + }, + segID + 3: { + indexID + 2: { + SegmentID: segID + 3, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 2, + BuildID: buildID + 3, + NodeID: 0, + IndexVersion: 1, + IndexState: 1, + FailReason: "", + IsDeleted: false, + CreateTime: 1, + IndexFilePaths: []string{"file1", "file2"}, + IndexSize: 0, + WriteHandoff: false, + }, + }, + segID + 4: { + indexID + 2: { + SegmentID: segID + 4, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 2, + BuildID: buildID + 4, + NodeID: 0, + IndexVersion: 1, + IndexState: 2, + FailReason: "", + IsDeleted: false, + CreateTime: 1, + IndexFilePaths: []string{}, + IndexSize: 0, + WriteHandoff: false, + }, + }, + }, + buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ + buildID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 1, + IndexState: 3, + FailReason: "", + IsDeleted: false, + CreateTime: 1, + IndexFilePaths: nil, + IndexSize: 100, + WriteHandoff: false, + }, + buildID + 1: { + SegmentID: segID + 1, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID + 1, + NodeID: 0, + IndexVersion: 1, + IndexState: 2, + FailReason: "", + IsDeleted: false, + CreateTime: 1, + IndexFilePaths: nil, + IndexSize: 100, + WriteHandoff: false, + }, + buildID + 2: { + SegmentID: segID + 2, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 2, + BuildID: buildID + 2, + NodeID: 0, + IndexVersion: 1, + IndexState: 1, + FailReason: "", + IsDeleted: true, + CreateTime: 1, + IndexFilePaths: nil, + IndexSize: 0, + WriteHandoff: false, + }, + buildID + 3: { + SegmentID: segID + 3, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 2, + BuildID: buildID + 3, + NodeID: 0, + IndexVersion: 1, + IndexState: 3, + FailReason: "", + IsDeleted: false, + CreateTime: 1, + IndexFilePaths: []string{"file1", "file2"}, + IndexSize: 0, + WriteHandoff: false, + }, + buildID + 4: { + SegmentID: segID + 4, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 2, + BuildID: buildID + 4, + NodeID: 0, + IndexVersion: 1, + IndexState: 2, + FailReason: "", + IsDeleted: false, + CreateTime: 1, + IndexFilePaths: []string{}, + IndexSize: 0, + WriteHandoff: false, + }, + }, + } +} + +func TestGarbageCollector(t *testing.T) { + meta := createGarbageCollectorMetaTable(&indexcoord.Catalog{Txn: NewMockEtcdKV()}) + gc := newGarbageCollector(context.Background(), meta, &chunkManagerMock{ removeWithPrefix: func(s string) error { return nil }, listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { - return []string{}, []time.Time{}, nil + return []string{strconv.FormatInt(buildID, 10), strconv.FormatInt(buildID+1, 10), + strconv.FormatInt(buildID+3, 10), strconv.FormatInt(buildID+4, 10)}, []time.Time{}, nil }, remove: func(s string) error { return nil }, - }, &IndexCoord{}) + }, &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallGetFlushedSegment: func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { + return &datapb.GetFlushedSegmentsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Segments: []int64{segID, segID + 1, segID + 2, segID + 3, segID + 4}, + }, nil + }, + }, + }) gc.gcMetaDuration = time.Millisecond * 300 gc.gcFileDuration = time.Millisecond * 300 gc.Start() + time.Sleep(time.Second * 2) + err := gc.metaTable.MarkSegmentsIndexAsDeletedByBuildID([]UniqueID{buildID + 3, buildID + 4}) + assert.NoError(t, err) + segIndexes := gc.metaTable.GetAllSegIndexes() + for len(segIndexes) != 0 { + time.Sleep(time.Second) + segIndexes = gc.metaTable.GetAllSegIndexes() + } gc.Stop() } +func TestGarbageCollector_error(t *testing.T) { + meta := createGarbageCollectorMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + remove: func(s string) error { + return errors.New("error") + }, + }, + }) + gc := newGarbageCollector(context.Background(), meta, &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return errors.New("error") + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + return nil, nil, errors.New("error") + }, + remove: func(s string) error { + return errors.New("error") + }, + }, &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallGetFlushedSegment: func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { + return &datapb.GetFlushedSegmentsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Segments: []int64{segID, segID + 1, segID + 2, segID + 3, segID + 4}, + }, nil + }, + }, + }) + + gc.gcMetaDuration = time.Millisecond * 300 + gc.gcFileDuration = time.Millisecond * 300 + + gc.Start() + time.Sleep(time.Second * 3) + gc.Stop() +} + +func TestGarbageCollectorGetFlushedSegment_error(t *testing.T) { + t.Run("error", func(t *testing.T) { + meta := createGarbageCollectorMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + remove: func(s string) error { + return errors.New("error") + }, + }, + }) + gc := newGarbageCollector(context.Background(), meta, &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return errors.New("error") + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + return nil, nil, errors.New("error") + }, + remove: func(s string) error { + return errors.New("error") + }, + }, &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallGetFlushedSegment: func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { + return &datapb.GetFlushedSegmentsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Segments: []int64{}, + }, errors.New("error") + }, + }, + }) + + gc.recycleSegIndexesMeta() + }) + + t.Run("fail", func(t *testing.T) { + meta := createGarbageCollectorMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + remove: func(s string) error { + return errors.New("error") + }, + }, + }) + gc := newGarbageCollector(context.Background(), meta, &chunkManagerMock{ + removeWithPrefix: func(s string) error { + return errors.New("error") + }, + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + return nil, nil, errors.New("error") + }, + remove: func(s string) error { + return errors.New("error") + }, + }, &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallGetFlushedSegment: func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { + return &datapb.GetFlushedSegmentsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", + }, + Segments: []int64{}, + }, nil + }, + }, + }) + + gc.recycleSegIndexesMeta() + }) + +} + func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { t.Run("index not in meta and remove with prefix failed", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/indexcoord/index_builder.go b/internal/indexcoord/index_builder.go index 9e3b4e1dd9..c28a8baae5 100644 --- a/internal/indexcoord/index_builder.go +++ b/internal/indexcoord/index_builder.go @@ -18,6 +18,7 @@ package indexcoord import ( "context" + "errors" "path" "sort" "sync" @@ -28,7 +29,6 @@ import ( "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" ) @@ -243,22 +243,11 @@ func (ib *indexBuilder) process(buildID UniqueID) { updateStateFunc(buildID, indexTaskRetry) return } - segmentsInfo, err := ib.ic.dataCoordClient.GetSegmentInfo(ib.ctx, &datapb.GetSegmentInfoRequest{ - SegmentIDs: []UniqueID{meta.SegmentID}, - IncludeUnHealthy: false, - }) - + info, err := ib.ic.pullSegmentInfo(ib.ctx, meta.SegmentID) if err != nil { log.Error("IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID), zap.Int64("buildID", buildID), zap.Error(err)) - updateStateFunc(buildID, indexTaskRetry) - return - } - if segmentsInfo.Status.ErrorCode != commonpb.ErrorCode_Success { - log.Error("IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID), - zap.Int64("buildID", buildID), zap.String("failReason", segmentsInfo.Status.Reason)) - // TODO: delete after QueryCoordV2 - if segmentsInfo.Status.GetReason() == msgSegmentNotFound(meta.SegmentID) { + if errors.Is(err, ErrSegmentNotFound) { updateStateFunc(buildID, indexTaskDeleted) return } @@ -266,20 +255,14 @@ func (ib *indexBuilder) process(buildID UniqueID) { return } binLogs := make([]string, 0) - for _, segmentInfo := range segmentsInfo.Infos { - if segmentInfo.ID != meta.SegmentID || segmentInfo.State != commonpb.SegmentState_Flushed { - continue - } - fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) - for _, fieldBinLog := range segmentInfo.GetBinlogs() { - if fieldBinLog.GetFieldID() == fieldID { - for _, binLog := range fieldBinLog.GetBinlogs() { - binLogs = append(binLogs, binLog.LogPath) - } - break + fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) + for _, fieldBinLog := range info.GetBinlogs() { + if fieldBinLog.GetFieldID() == fieldID { + for _, binLog := range fieldBinLog.GetBinlogs() { + binLogs = append(binLogs, binLog.LogPath) } + break } - break } typeParams := ib.meta.GetTypeParams(meta.CollectionID, meta.IndexID) @@ -372,6 +355,8 @@ func (ib *indexBuilder) process(buildID UniqueID) { } if meta.NodeID != 0 { if !ib.dropIndexTask(buildID, meta.NodeID) { + log.Ctx(ib.ctx).Warn("index task state is deleted and drop index job for node fail", zap.Int64("build", buildID), + zap.Int64("nodeID", meta.NodeID)) return } if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil { @@ -499,21 +484,6 @@ func (ib *indexBuilder) releaseLockAndResetTask(buildID UniqueID, nodeID UniqueI return nil } -func (ib *indexBuilder) markTasksAsDeleted(buildIDs []UniqueID) error { - defer ib.notify() - - ib.taskMutex.Lock() - defer ib.taskMutex.Unlock() - - for _, buildID := range buildIDs { - if _, ok := ib.tasks[buildID]; ok { - ib.tasks[buildID] = indexTaskDeleted - log.Debug("index task has been deleted", zap.Int64("buildID", buildID)) - } - } - return nil -} - func (ib *indexBuilder) nodeDown(nodeID UniqueID) { defer ib.notify() @@ -528,11 +498,3 @@ func (ib *indexBuilder) nodeDown(nodeID UniqueID) { } } } - -func (ib *indexBuilder) hasTask(buildID UniqueID) bool { - ib.taskMutex.RLock() - defer ib.taskMutex.RUnlock() - - _, ok := ib.tasks[buildID] - return ok -} diff --git a/internal/indexcoord/index_builder_test.go b/internal/indexcoord/index_builder_test.go index 64c7813e42..39a04abe1c 100644 --- a/internal/indexcoord/index_builder_test.go +++ b/internal/indexcoord/index_builder_test.go @@ -19,6 +19,7 @@ package indexcoord import ( "context" "errors" + "sync" "testing" "time" @@ -227,7 +228,7 @@ func createMetaTable(catalog metastore.IndexCoordCatalog) *metaTable { IndexSize: 0, }, }, - buildID + 9: { + segID + 9: { indexID: { SegmentID: segID + 9, CollectionID: collID, @@ -245,6 +246,24 @@ func createMetaTable(catalog metastore.IndexCoordCatalog) *metaTable { IndexSize: 0, }, }, + segID + 10: { + indexID: { + SegmentID: segID + 10, + CollectionID: collID, + PartitionID: partID, + NumRows: 500, + IndexID: indexID, + BuildID: buildID + 10, + NodeID: nodeID, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFilePaths: nil, + IndexSize: 0, + }, + }, }, buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ buildID: { @@ -407,6 +426,22 @@ func createMetaTable(catalog metastore.IndexCoordCatalog) *metaTable { IndexFilePaths: nil, IndexSize: 0, }, + buildID + 10: { + SegmentID: segID + 10, + CollectionID: collID, + PartitionID: partID, + NumRows: 500, + IndexID: indexID, + BuildID: buildID + 10, + NodeID: nodeID, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFilePaths: nil, + IndexSize: 0, + }, }, } } @@ -444,7 +479,7 @@ func TestIndexBuilder(t *testing.T) { }, }), []UniqueID{nodeID}) - assert.Equal(t, 7, len(ib.tasks)) + assert.Equal(t, 8, len(ib.tasks)) assert.Equal(t, indexTaskInit, ib.tasks[buildID]) assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+1]) assert.Equal(t, indexTaskDeleted, ib.tasks[buildID+2]) @@ -452,6 +487,7 @@ func TestIndexBuilder(t *testing.T) { assert.Equal(t, indexTaskDone, ib.tasks[buildID+4]) assert.Equal(t, indexTaskRetry, ib.tasks[buildID+8]) assert.Equal(t, indexTaskInit, ib.tasks[buildID+9]) + assert.Equal(t, indexTaskRetry, ib.tasks[buildID+10]) ib.scheduleDuration = time.Millisecond * 500 ib.Start() @@ -521,43 +557,18 @@ func TestIndexBuilder_Error(t *testing.T) { ib.process(buildID + 100) }) + t.Run("init no need to build index", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.meta.collectionIndexes[collID][indexID].IsDeleted = true + ib.process(buildID) + ib.meta.collectionIndexes[collID][indexID].IsDeleted = false + }) + t.Run("finish few rows task fail", func(t *testing.T) { ib.tasks[buildID+9] = indexTaskInit ib.process(buildID + 9) }) - //t.Run("getSegmentInfo fail", func(t *testing.T) { - // ib.ic = &IndexCoord{ - // dataCoordClient: &DataCoordMock{ - // CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { - // return &datapb.GetSegmentInfoResponse{}, errors.New("error") - // }, - // }, - // } - // ib.tasks = map[int64]*indexTask{ - // buildID: { - // buildID: buildID, - // state: indexTaskInit, - // segmentInfo: nil, - // }, - // } - // ib.process(ib.tasks[buildID]) - // - // ib.ic = &IndexCoord{ - // dataCoordClient: &DataCoordMock{ - // CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { - // return &datapb.GetSegmentInfoResponse{ - // Status: &commonpb.Status{ - // ErrorCode: commonpb.ErrorCode_UnexpectedError, - // Reason: "get segment info fail", - // }, - // }, nil - // }, - // }, - // } - // ib.process(ib.tasks[buildID]) - //}) - t.Run("peek client fail", func(t *testing.T) { ib.ic.nodeManager = &NodeManager{nodeClients: map[UniqueID]types.IndexNode{}} ib.ic.dataCoordClient = NewDataCoordMock() @@ -575,11 +586,8 @@ func TestIndexBuilder_Error(t *testing.T) { t.Run("acquire lock fail", func(t *testing.T) { ib.tasks[buildID] = indexTaskInit ib.meta = createMetaTable(&indexcoord.Catalog{ - Txn: &mockETCDKV{ - multiSave: func(m map[string]string) error { - return nil - }, - }}) + Txn: NewMockEtcdKV(), + }) dataMock := NewDataCoordMock() dataMock.CallAcquireSegmentLock = func(ctx context.Context, req *datapb.AcquireSegmentLockRequest) (*commonpb.Status, error) { return nil, errors.New("error") @@ -588,15 +596,122 @@ func TestIndexBuilder_Error(t *testing.T) { ib.process(buildID) }) + t.Run("get segment info error", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) + dataMock := NewDataCoordMock() + dataMock.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + return nil, errors.New("error") + } + ib.ic.dataCoordClient = dataMock + ib.process(buildID) + }) + + t.Run("get segment info fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) + dataMock := NewDataCoordMock() + dataMock.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + return &datapb.GetSegmentInfoResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + }, nil + } + ib.ic.dataCoordClient = dataMock + ib.process(buildID) + }) + t.Run("assign task fail", func(t *testing.T) { + Params.CommonCfg.StorageType = "local" ib.tasks[buildID] = indexTaskInit ib.ic.dataCoordClient = NewDataCoordMock() + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) + ib.ic = &IndexCoord{ + loopCtx: context.Background(), + reqTimeoutInterval: time.Second, + nodeManager: &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + 1: &indexnode.Mock{ + CallCreateJob: func(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) { + return nil, errors.New("error") + }, + CallGetJobStats: func(ctx context.Context, in *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + TaskSlots: 1, + }, nil + }, + }, + }, + }, + chunkManager: &chunkManagerMock{}, + dataCoordClient: NewDataCoordMock(), + } + ib.process(buildID) + }) + + t.Run("update index state inProgress fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) ib.ic.nodeManager = &NodeManager{ ctx: context.Background(), nodeClients: map[UniqueID]types.IndexNode{ 1: &indexnode.Mock{ CallCreateJob: func(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) { - return nil, errors.New("error") + err := ib.meta.MarkSegmentsIndexAsDeletedByBuildID([]UniqueID{buildID}) + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, err + }, + CallGetJobStats: func(ctx context.Context, in *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { + return &indexpb.GetJobStatsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + TaskSlots: 1, + }, nil + }, + }, + }, + } + ib.process(buildID) + }) + + t.Run("update index state inProgress error", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + 1: &indexnode.Mock{ + CallCreateJob: func(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) { + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + }, + }) + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil }, CallGetJobStats: func(ctx context.Context, in *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { return &indexpb.GetJobStatsResponse{ @@ -614,12 +729,194 @@ func TestIndexBuilder_Error(t *testing.T) { }) t.Run("no need to build index", func(t *testing.T) { + ib.tasks[buildID] = indexTaskDone + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) ib.meta.collectionIndexes[collID][indexID].IsDeleted = true ib.process(buildID) }) + t.Run("drop index job error", func(t *testing.T) { + ib.tasks[buildID] = indexTaskDone + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) + err := ib.meta.UpdateVersion(buildID, nodeID) + assert.NoError(t, err) + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + nodeID: &indexnode.Mock{ + CallDropJobs: func(ctx context.Context, in *indexpb.DropJobsRequest) (*commonpb.Status, error) { + return nil, errors.New("error") + }, + }, + }, + } + ib.process(buildID) + }) + + t.Run("drop index job fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskDone + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) + err := ib.meta.UpdateVersion(buildID, nodeID) + assert.NoError(t, err) + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + nodeID: &indexnode.Mock{ + CallDropJobs: func(ctx context.Context, in *indexpb.DropJobsRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", + }, nil + }, + }, + }, + } + ib.process(buildID) + }) + + t.Run("release lock fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskDone + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + }, + }) + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + nodeID: &indexnode.Mock{ + CallDropJobs: func(ctx context.Context, in *indexpb.DropJobsRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, nil + }, + }, + }, + } + ib.process(buildID) + }) + + t.Run("retry no need to build index", func(t *testing.T) { + ib.tasks[buildID] = indexTaskRetry + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) + err := ib.meta.MarkIndexAsDeleted(collID, []UniqueID{indexID}) + assert.NoError(t, err) + ib.process(buildID) + }) + + t.Run("retry release lock fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskRetry + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + }, + }) + ib.process(buildID) + }) + + t.Run("delete mark fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskDeleted + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + }, + }) + ib.process(buildID) + }) + + t.Run("delete drop index job fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskDeleted + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return nil + }, + }, + }) + err := ib.meta.UpdateVersion(buildID, nodeID) + assert.NoError(t, err) + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + nodeID: &indexnode.Mock{ + CallDropJobs: func(ctx context.Context, in *indexpb.DropJobsRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, errors.New("error") + }, + }, + }, + } + ib.process(buildID) + }) + + t.Run("delete release lock fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskDeleted + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return nil + }, + }, + }) + err := ib.meta.UpdateVersion(buildID, nodeID) + assert.NoError(t, err) + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + nodeID: &indexnode.Mock{ + CallDropJobs: func(ctx context.Context, in *indexpb.DropJobsRequest) (*commonpb.Status, error) { + ib.meta.catalog = &indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + }, + } + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, nil + }, + }, + }, + } + ib.process(buildID) + }) + + t.Run("deleted remove task", func(t *testing.T) { + ib.tasks[buildID] = indexTaskDeleted + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: NewMockEtcdKV(), + }) + ib.process(buildID) + }) + t.Run("finish task fail", func(t *testing.T) { ib.tasks[buildID] = indexTaskInProgress + ib.meta = createMetaTable(&indexcoord.Catalog{ + Txn: &mockETCDKV{ + multiSave: func(m map[string]string) error { + return errors.New("error") + }, + }, + }) ib.ic.dataCoordClient = NewDataCoordMock() ib.ic.nodeManager = &NodeManager{ ctx: context.Background(), @@ -645,16 +942,43 @@ func TestIndexBuilder_Error(t *testing.T) { }, }, } - ib.ic.metaTable = &metaTable{ - catalog: &indexcoord.Catalog{ - Txn: &mockETCDKV{ - multiSave: func(m map[string]string) error { - return errors.New("error") - }, + ib.getTaskState(buildID, 1) + }) + + t.Run("inProgress no need to build index", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInProgress + ib.meta = createMetaTable(&indexcoord.Catalog{Txn: NewMockEtcdKV()}) + err := ib.meta.MarkIndexAsDeleted(collID, []UniqueID{indexID}) + assert.NoError(t, err) + ib.process(buildID) + }) +} + +func Test_indexBuilder_getTaskState(t *testing.T) { + Params.Init() + ib := &indexBuilder{ + tasks: map[int64]indexTaskState{ + buildID: indexTaskInit, + }, + meta: createMetaTable(&indexcoord.Catalog{Txn: NewMockEtcdKV()}), + ic: &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + return &datapb.GetSegmentInfoResponse{}, errors.New("error") }, }, + }, + } + + t.Run("node not exist", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.ic.dataCoordClient = NewDataCoordMock() + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{}, } - ib.getTaskState(buildID, 1) + + ib.getTaskState(buildID, nodeID) }) t.Run("get state retry", func(t *testing.T) { @@ -663,7 +987,7 @@ func TestIndexBuilder_Error(t *testing.T) { ib.ic.nodeManager = &NodeManager{ ctx: context.Background(), nodeClients: map[UniqueID]types.IndexNode{ - 1: &indexnode.Mock{ + nodeID: &indexnode.Mock{ CallQueryJobs: func(ctx context.Context, in *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) { return &indexpb.QueryJobsResponse{ Status: &commonpb.Status{ @@ -684,16 +1008,7 @@ func TestIndexBuilder_Error(t *testing.T) { }, }, } - ib.ic.metaTable = &metaTable{ - catalog: &indexcoord.Catalog{ - Txn: &mockETCDKV{ - multiSave: func(m map[string]string) error { - return nil - }, - }, - }, - } - ib.getTaskState(buildID, 1) + ib.getTaskState(buildID, nodeID) }) t.Run("get state not exist", func(t *testing.T) { @@ -702,7 +1017,7 @@ func TestIndexBuilder_Error(t *testing.T) { ib.ic.nodeManager = &NodeManager{ ctx: context.Background(), nodeClients: map[UniqueID]types.IndexNode{ - 1: &indexnode.Mock{ + nodeID: &indexnode.Mock{ CallQueryJobs: func(ctx context.Context, in *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) { return &indexpb.QueryJobsResponse{ Status: &commonpb.Status{ @@ -715,15 +1030,133 @@ func TestIndexBuilder_Error(t *testing.T) { }, }, } - ib.ic.metaTable = &metaTable{ - catalog: &indexcoord.Catalog{ - Txn: &mockETCDKV{ - multiSave: func(m map[string]string) error { - return nil + ib.getTaskState(buildID, nodeID) + }) + + t.Run("query jobs error", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.ic.dataCoordClient = NewDataCoordMock() + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + nodeID: &indexnode.Mock{ + CallQueryJobs: func(ctx context.Context, in *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) { + return nil, errors.New("error") }, }, }, } - ib.getTaskState(buildID, 1) + ib.getTaskState(buildID, nodeID) + }) + + t.Run("query jobs fail", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.ic.dataCoordClient = NewDataCoordMock() + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + nodeID: &indexnode.Mock{ + CallQueryJobs: func(ctx context.Context, in *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) { + return &indexpb.QueryJobsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", + }, + }, nil + }, + }, + }, + } + ib.getTaskState(buildID, nodeID) + }) + + t.Run("job is InProgress", func(t *testing.T) { + ib.tasks[buildID] = indexTaskInit + ib.ic.dataCoordClient = NewDataCoordMock() + ib.ic.nodeManager = &NodeManager{ + ctx: context.Background(), + nodeClients: map[UniqueID]types.IndexNode{ + nodeID: &indexnode.Mock{ + CallQueryJobs: func(ctx context.Context, in *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) { + return &indexpb.QueryJobsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + IndexInfos: []*indexpb.IndexTaskInfo{ + { + BuildID: buildID, + State: commonpb.IndexState_InProgress, + IndexFiles: nil, + SerializedSize: 0, + FailReason: "", + }, + }, + }, nil + }, + }, + }, + } + ib.getTaskState(buildID, nodeID) }) } + +func Test_indexBuilder_releaseLockAndResetNode_error(t *testing.T) { + Params.Init() + ctx, cancel := context.WithCancel(context.Background()) + ib := &indexBuilder{ + ctx: ctx, + cancel: cancel, + tasks: map[int64]indexTaskState{ + buildID: indexTaskInit, + }, + meta: createMetaTable(&indexcoord.Catalog{Txn: NewMockEtcdKV()}), + ic: &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallReleaseSegmentLock: func(ctx context.Context, req *datapb.ReleaseSegmentLockRequest) (*commonpb.Status, error) { + return nil, errors.New("error") + }, + }, + }, + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := ib.releaseLockAndResetNode(buildID, nodeID) + assert.Error(t, err) + }() + time.Sleep(time.Second) + ib.cancel() + wg.Wait() +} + +func Test_indexBuilder_releaseLockAndResetTask_error(t *testing.T) { + Params.Init() + ctx, cancel := context.WithCancel(context.Background()) + ib := &indexBuilder{ + ctx: ctx, + cancel: cancel, + tasks: map[int64]indexTaskState{ + buildID: indexTaskInit, + }, + meta: createMetaTable(&indexcoord.Catalog{Txn: NewMockEtcdKV()}), + ic: &IndexCoord{ + dataCoordClient: &DataCoordMock{ + CallReleaseSegmentLock: func(ctx context.Context, req *datapb.ReleaseSegmentLockRequest) (*commonpb.Status, error) { + return nil, errors.New("error") + }, + }, + }, + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := ib.releaseLockAndResetTask(buildID, nodeID) + assert.Error(t, err) + }() + time.Sleep(time.Second) + ib.cancel() + wg.Wait() +} diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index e3fb679f99..23dc06f1b6 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "sync" - "time" "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -34,7 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/indexpb" - "github.com/milvus-io/milvus/internal/util/tsoutil" ) // metaTable maintains index-related information @@ -369,7 +367,7 @@ func (mt *metaTable) BuildIndex(buildID UniqueID) error { } segIdx.IndexState = commonpb.IndexState_InProgress - err := mt.saveSegmentIndexMeta(segIdx) + err := mt.alterSegmentIndexes([]*model.SegmentIndex{segIdx}) if err != nil { log.Error("IndexCoord metaTable BuildIndex fail", zap.Int64("buildID", segIdx.BuildID), zap.Error(err)) return err @@ -421,19 +419,6 @@ func (mt *metaTable) CanCreateIndex(req *indexpb.CreateIndexRequest) bool { return true } -func (mt *metaTable) IsExpire(buildID UniqueID) bool { - mt.segmentIndexLock.RLock() - defer mt.segmentIndexLock.RUnlock() - - segIdx, ok := mt.buildID2SegmentIndex[buildID] - if !ok { - return true - } - - pTs, _ := tsoutil.ParseTS(segIdx.CreateTime) - return time.Since(pTs) > time.Minute*10 -} - func (mt *metaTable) checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool { if fieldIndex.IndexName != req.IndexName { return false @@ -910,13 +895,17 @@ func (mt *metaTable) GetBuildIDsFromSegIDs(segIDs []UniqueID) []UniqueID { func (mt *metaTable) RemoveIndex(collID, indexID UniqueID) error { mt.indexLock.Lock() defer mt.indexLock.Unlock() + log.Info("IndexCoord meta table remove index", zap.Int64("collID", collID), zap.Int64("indexID", indexID)) err := mt.catalog.DropIndex(context.Background(), collID, indexID) if err != nil { + log.Info("IndexCoord meta table remove index fail", zap.Int64("collID", collID), + zap.Int64("indexID", indexID), zap.Error(err)) return err } delete(mt.collectionIndexes[collID], indexID) + log.Info("IndexCoord meta table remove index success", zap.Int64("collID", collID), zap.Int64("indexID", indexID)) return nil } diff --git a/internal/indexcoord/task_state_test.go b/internal/indexcoord/task_state_test.go new file mode 100644 index 0000000000..630b1fe1eb --- /dev/null +++ b/internal/indexcoord/task_state_test.go @@ -0,0 +1,32 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexcoord + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexTaskState_String(t *testing.T) { + assert.Equal(t, indexTaskInit.String(), "Init") + assert.Equal(t, indexTaskInProgress.String(), "InProgress") + assert.Equal(t, indexTaskDone.String(), "Done") + assert.Equal(t, indexTaskDeleted.String(), "Deleted") + assert.Equal(t, indexTaskPrepare.String(), "Prepare") + assert.Equal(t, indexTaskRetry.String(), "Retry") +}