milvus/internal/datacoord/index_meta_test.go

1291 lines
32 KiB
Go
Raw Normal View History

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package datacoord contains core functions in datacoord
package datacoord
import (
"context"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/kv/mocks"
mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/common"
)
func TestMeta_CanCreateIndex(t *testing.T) {
var (
collID = UniqueID(1)
// partID = UniqueID(2)
indexID = UniqueID(10)
fieldID = UniqueID(100)
indexName = "_default_idx"
typeParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
indexParams = []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "FLAT",
},
}
)
catalog := catalogmocks.NewDataCoordCatalog(t)
catalog.On("CreateIndex",
mock.Anything,
mock.Anything,
).Return(nil)
m := &meta{
RWMutex: sync.RWMutex{},
ctx: context.Background(),
catalog: catalog,
collections: nil,
segments: nil,
channelCPs: nil,
chunkManager: nil,
indexes: map[UniqueID]map[UniqueID]*model.Index{},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{},
}
req := &indexpb.CreateIndexRequest{
CollectionID: collID,
FieldID: fieldID,
IndexName: indexName,
TypeParams: typeParams,
IndexParams: indexParams,
Timestamp: 0,
IsAutoIndex: false,
UserIndexParams: nil,
}
t.Run("can create index", func(t *testing.T) {
tmpIndexID, err := m.CanCreateIndex(req)
assert.NoError(t, err)
assert.Equal(t, int64(0), tmpIndexID)
index := &model.Index{
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
}
err = m.CreateIndex(index)
assert.NoError(t, err)
tmpIndexID, err = m.CanCreateIndex(req)
assert.NoError(t, err)
assert.Equal(t, indexID, tmpIndexID)
})
t.Run("params not consistent", func(t *testing.T) {
req.TypeParams = append(req.TypeParams, &commonpb.KeyValuePair{Key: "primary_key", Value: "false"})
tmpIndexID, err := m.CanCreateIndex(req)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.TypeParams = []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "64"}}
tmpIndexID, err = m.CanCreateIndex(req)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.TypeParams = typeParams
req.IndexParams = append(req.IndexParams, &commonpb.KeyValuePair{Key: "metrics_type", Value: "L2"})
tmpIndexID, err = m.CanCreateIndex(req)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "HNSW"}}
tmpIndexID, err = m.CanCreateIndex(req)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.IndexParams = indexParams
req.FieldID++
tmpIndexID, err = m.CanCreateIndex(req)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
})
t.Run("multiple indexes", func(t *testing.T) {
req.IndexName = "_default_idx_2"
req.FieldID = fieldID
tmpIndexID, err := m.CanCreateIndex(req)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
})
t.Run("index has been deleted", func(t *testing.T) {
m.indexes[collID][indexID].IsDeleted = true
tmpIndexID, err := m.CanCreateIndex(req)
assert.NoError(t, err)
assert.Equal(t, int64(0), tmpIndexID)
})
}
func TestMeta_HasSameReq(t *testing.T) {
var (
collID = UniqueID(1)
// partID = UniqueID(2)
indexID = UniqueID(10)
fieldID = UniqueID(100)
indexName = "_default_idx"
typeParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
indexParams = []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "FLAT",
},
}
)
m := &meta{
RWMutex: sync.RWMutex{},
ctx: context.Background(),
catalog: catalogmocks.NewDataCoordCatalog(t),
collections: nil,
segments: nil,
channelCPs: nil,
chunkManager: nil,
indexes: map[UniqueID]map[UniqueID]*model.Index{},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{},
}
req := &indexpb.CreateIndexRequest{
CollectionID: collID,
FieldID: fieldID,
IndexName: indexName,
TypeParams: typeParams,
IndexParams: indexParams,
Timestamp: 0,
IsAutoIndex: false,
UserIndexParams: nil,
}
t.Run("no indexes", func(t *testing.T) {
has, _ := m.HasSameReq(req)
assert.False(t, has)
})
t.Run("has same req", func(t *testing.T) {
m.indexes[collID] = map[UniqueID]*model.Index{
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 10,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
}
has, _ := m.HasSameReq(req)
assert.True(t, has)
})
t.Run("params not consistent", func(t *testing.T) {
req.TypeParams = []*commonpb.KeyValuePair{{}}
has, _ := m.HasSameReq(req)
assert.False(t, has)
})
t.Run("index has been deleted", func(t *testing.T) {
m.indexes[collID][indexID].IsDeleted = true
has, _ := m.HasSameReq(req)
assert.False(t, has)
})
}
func TestMeta_CreateIndex(t *testing.T) {
index := &model.Index{
TenantID: "",
CollectionID: 1,
FieldID: 2,
IndexID: 3,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 12,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "FLAT",
},
},
IsAutoIndex: false,
UserIndexParams: nil,
}
t.Run("success", func(t *testing.T) {
sc := catalogmocks.NewDataCoordCatalog(t)
sc.On("CreateIndex",
mock.Anything,
mock.Anything,
).Return(nil)
m := &meta{
RWMutex: sync.RWMutex{},
ctx: context.Background(),
catalog: sc,
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
}
err := m.CreateIndex(index)
assert.NoError(t, err)
})
t.Run("save fail", func(t *testing.T) {
ec := catalogmocks.NewDataCoordCatalog(t)
ec.On("CreateIndex",
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
m := &meta{
RWMutex: sync.RWMutex{},
ctx: context.Background(),
catalog: ec,
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
}
err := m.CreateIndex(index)
assert.Error(t, err)
})
}
func TestMeta_AddSegmentIndex(t *testing.T) {
sc := catalogmocks.NewDataCoordCatalog(t)
sc.On("CreateSegmentIndex",
mock.Anything,
mock.Anything,
).Return(nil)
ec := catalogmocks.NewDataCoordCatalog(t)
ec.On("CreateSegmentIndex",
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
m := &meta{
RWMutex: sync.RWMutex{},
ctx: context.Background(),
catalog: ec,
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
1: {
SegmentInfo: nil,
segmentIndexes: map[UniqueID]*model.SegmentIndex{},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
},
},
}
segmentIndex := &model.SegmentIndex{
SegmentID: 1,
CollectionID: 2,
PartitionID: 3,
NumRows: 10240,
IndexID: 4,
BuildID: 5,
NodeID: 6,
IndexVersion: 0,
IndexState: 0,
FailReason: "",
IsDeleted: false,
CreateTime: 12,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
}
t.Run("save meta fail", func(t *testing.T) {
err := m.AddSegmentIndex(segmentIndex)
assert.Error(t, err)
})
t.Run("success", func(t *testing.T) {
m.catalog = sc
err := m.AddSegmentIndex(segmentIndex)
assert.NoError(t, err)
})
}
func TestMeta_GetIndexIDByName(t *testing.T) {
var (
collID = UniqueID(1)
// partID = UniqueID(2)
indexID = UniqueID(10)
fieldID = UniqueID(100)
indexName = "_default_idx"
typeParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
indexParams = []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "FLAT",
},
}
)
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe()
m := &meta{
RWMutex: sync.RWMutex{},
ctx: context.Background(),
catalog: &datacoord.Catalog{MetaKv: metakv},
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
}
t.Run("no indexes", func(t *testing.T) {
indexID2CreateTS := m.GetIndexIDByName(collID, indexName)
assert.Equal(t, 0, len(indexID2CreateTS))
})
t.Run("success", func(t *testing.T) {
m.indexes[collID] = map[UniqueID]*model.Index{
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 12,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
}
indexID2CreateTS := m.GetIndexIDByName(collID, indexName)
assert.Contains(t, indexID2CreateTS, indexID)
})
}
func TestMeta_GetSegmentIndexState(t *testing.T) {
var (
collID = UniqueID(1)
partID = UniqueID(2)
indexID = UniqueID(10)
fieldID = UniqueID(100)
segID = UniqueID(1000)
buildID = UniqueID(10000)
indexName = "_default_idx"
typeParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
indexParams = []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "FLAT",
},
}
)
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe()
m := &meta{
RWMutex: sync.RWMutex{},
ctx: context.Background(),
catalog: &datacoord.Catalog{MetaKv: metakv},
indexes: map[UniqueID]map[UniqueID]*model.Index{},
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: nil,
segmentIndexes: map[UniqueID]*model.SegmentIndex{},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
},
},
}
t.Run("segment has no index", func(t *testing.T) {
state := m.GetSegmentIndexState(collID, segID)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state)
})
t.Run("meta not saved yet", func(t *testing.T) {
m.indexes[collID] = map[UniqueID]*model.Index{
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 12,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
}
state := m.GetSegmentIndexState(collID, segID)
assert.Equal(t, commonpb.IndexState_Unissued, state.state)
})
t.Run("segment not exist", func(t *testing.T) {
state := m.GetSegmentIndexState(collID, segID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state)
})
t.Run("unissued", func(t *testing.T) {
m.segments.SetSegmentIndex(segID, &model.SegmentIndex{
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10250,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: 12,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
})
state := m.GetSegmentIndexState(collID, segID)
assert.Equal(t, commonpb.IndexState_Unissued, state.state)
})
t.Run("finish", func(t *testing.T) {
m.segments.SetSegmentIndex(segID, &model.SegmentIndex{
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10250,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 0,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 12,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
})
state := m.GetSegmentIndexState(collID, segID)
assert.Equal(t, commonpb.IndexState_Finished, state.state)
})
}
func TestMeta_GetSegmentIndexStateOnField(t *testing.T) {
var (
collID = UniqueID(1)
partID = UniqueID(2)
indexID = UniqueID(10)
fieldID = UniqueID(100)
segID = UniqueID(1000)
buildID = UniqueID(10000)
indexName = "_default_idx"
typeParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
indexParams = []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "FLAT",
},
}
)
m := &meta{
RWMutex: sync.RWMutex{},
ctx: context.Background(),
catalog: nil,
collections: nil,
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
},
},
},
channelCPs: nil,
chunkManager: nil,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 10,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
}
t.Run("success", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID)
assert.Equal(t, commonpb.IndexState_Finished, state.state)
})
t.Run("no index on field", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state)
})
t.Run("no index", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID+1, segID, fieldID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state)
})
t.Run("segment not exist", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID+1, fieldID)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.state)
})
}
func TestMeta_MarkIndexAsDeleted(t *testing.T) {
sc := catalogmocks.NewDataCoordCatalog(t)
sc.On("AlterIndexes",
mock.Anything,
mock.Anything,
).Return(nil)
ec := catalogmocks.NewDataCoordCatalog(t)
ec.On("AlterIndexes",
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
m := &meta{
catalog: sc,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 10,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
indexID + 1: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: "_default_idx_102",
IsDeleted: true,
CreateTime: 1,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
}
t.Run("fail", func(t *testing.T) {
m.catalog = ec
err := m.MarkIndexAsDeleted(collID, []UniqueID{indexID, indexID + 1, indexID + 2})
assert.Error(t, err)
})
t.Run("success", func(t *testing.T) {
m.catalog = sc
err := m.MarkIndexAsDeleted(collID, []UniqueID{indexID, indexID + 1, indexID + 2})
assert.NoError(t, err)
err = m.MarkIndexAsDeleted(collID, []UniqueID{indexID, indexID + 1, indexID + 2})
assert.NoError(t, err)
err = m.MarkIndexAsDeleted(collID+1, []UniqueID{indexID, indexID + 1, indexID + 2})
assert.NoError(t, err)
})
}
func TestMeta_GetSegmentIndexes(t *testing.T) {
m := createMetaTable(&datacoord.Catalog{MetaKv: mocks.NewMetaKv(t)})
t.Run("success", func(t *testing.T) {
segIndexes := m.GetSegmentIndexes(segID)
assert.Equal(t, 1, len(segIndexes))
})
t.Run("segment not exist", func(t *testing.T) {
segIndexes := m.GetSegmentIndexes(segID + 100)
assert.Equal(t, 0, len(segIndexes))
})
t.Run("no index exist", func(t *testing.T) {
m = &meta{
RWMutex: sync.RWMutex{},
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
},
},
},
},
indexes: nil,
buildID2SegmentIndex: nil,
}
segIndexes := m.GetSegmentIndexes(segID)
assert.Equal(t, 0, len(segIndexes))
})
}
func TestMeta_GetFieldIDByIndexID(t *testing.T) {
m := &meta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
}
t.Run("success", func(t *testing.T) {
fID := m.GetFieldIDByIndexID(collID, indexID)
assert.Equal(t, fieldID, fID)
})
t.Run("fail", func(t *testing.T) {
fID := m.GetFieldIDByIndexID(collID, indexID+1)
assert.Equal(t, UniqueID(0), fID)
})
}
func TestMeta_GetIndexNameByID(t *testing.T) {
m := &meta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
}
t.Run("success", func(t *testing.T) {
iName := m.GetIndexNameByID(collID, indexID)
assert.Equal(t, indexName, iName)
})
t.Run("fail", func(t *testing.T) {
iName := m.GetIndexNameByID(collID, indexID+1)
assert.Equal(t, "", iName)
})
}
func TestMeta_GetTypeParams(t *testing.T) {
m := &meta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
}
t.Run("success", func(t *testing.T) {
tp := m.GetTypeParams(collID, indexID)
assert.Equal(t, 1, len(tp))
})
t.Run("not exist", func(t *testing.T) {
tp := m.GetTypeParams(collID, indexID+1)
assert.Equal(t, 0, len(tp))
tp = m.GetTypeParams(collID+1, indexID)
assert.Equal(t, 0, len(tp))
})
}
func TestMeta_GetIndexParams(t *testing.T) {
m := &meta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
}
t.Run("success", func(t *testing.T) {
ip := m.GetIndexParams(collID, indexID)
assert.Equal(t, 1, len(ip))
})
t.Run("not exist", func(t *testing.T) {
ip := m.GetIndexParams(collID, indexID+1)
assert.Equal(t, 0, len(ip))
ip = m.GetIndexParams(collID+1, indexID)
assert.Equal(t, 0, len(ip))
})
}
func TestMeta_GetIndexJob(t *testing.T) {
m := &meta{
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
}
t.Run("exist", func(t *testing.T) {
segIndex, exist := m.GetIndexJob(buildID)
assert.True(t, exist)
assert.NotNil(t, segIndex)
})
t.Run("not exist", func(t *testing.T) {
segIndex, exist := m.GetIndexJob(buildID + 1)
assert.False(t, exist)
assert.Nil(t, segIndex)
})
}
func TestMeta_IsIndexExist(t *testing.T) {
m := &meta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
indexID + 1: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID + 1,
IndexName: "index2",
IsDeleted: true,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
}
t.Run("exist", func(t *testing.T) {
exist := m.IsIndexExist(collID, indexID)
assert.True(t, exist)
})
t.Run("not exist", func(t *testing.T) {
exist := m.IsIndexExist(collID, indexID+1)
assert.False(t, exist)
exist = m.IsIndexExist(collID, indexID+2)
assert.False(t, exist)
exist = m.IsIndexExist(collID+1, indexID)
assert.False(t, exist)
})
}
func updateSegmentIndexMeta(t *testing.T) *meta {
sc := catalogmocks.NewDataCoordCatalog(t)
sc.On("AlterSegmentIndexes",
mock.Anything,
mock.Anything,
).Return(nil)
return &meta{
catalog: sc,
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Flushed,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
},
},
},
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
}
}
func TestMeta_UpdateVersion(t *testing.T) {
m := updateSegmentIndexMeta(t)
ec := catalogmocks.NewDataCoordCatalog(t)
ec.On("AlterSegmentIndexes",
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
t.Run("success", func(t *testing.T) {
err := m.UpdateVersion(buildID, nodeID)
assert.NoError(t, err)
})
t.Run("fail", func(t *testing.T) {
m.catalog = ec
err := m.UpdateVersion(buildID, nodeID)
assert.Error(t, err)
})
t.Run("not exist", func(t *testing.T) {
err := m.UpdateVersion(buildID+1, nodeID)
assert.Error(t, err)
})
}
func TestMeta_FinishTask(t *testing.T) {
m := updateSegmentIndexMeta(t)
t.Run("success", func(t *testing.T) {
err := m.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 1024,
FailReason: "",
})
assert.NoError(t, err)
})
t.Run("fail", func(t *testing.T) {
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
m.catalog = &datacoord.Catalog{
MetaKv: metakv,
}
err := m.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 1024,
FailReason: "",
})
assert.Error(t, err)
})
t.Run("not exist", func(t *testing.T) {
err := m.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID + 1,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 1024,
FailReason: "",
})
assert.NoError(t, err)
})
}
func TestMeta_BuildIndex(t *testing.T) {
m := updateSegmentIndexMeta(t)
ec := catalogmocks.NewDataCoordCatalog(t)
ec.On("AlterSegmentIndexes",
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
t.Run("success", func(t *testing.T) {
err := m.BuildIndex(buildID)
assert.NoError(t, err)
})
t.Run("fail", func(t *testing.T) {
m.catalog = ec
err := m.BuildIndex(buildID)
assert.Error(t, err)
})
t.Run("not exist", func(t *testing.T) {
err := m.BuildIndex(buildID + 1)
assert.Error(t, err)
})
}
func TestMeta_GetHasUnindexTaskSegments(t *testing.T) {
m := &meta{
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Flushed,
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Growing,
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 2,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Dropped,
},
},
},
},
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
}
t.Run("normal", func(t *testing.T) {
segments := m.GetHasUnindexTaskSegments()
assert.Equal(t, 1, len(segments))
assert.Equal(t, segID, segments[0].ID)
})
}
// see also: https://github.com/milvus-io/milvus/issues/21660
func TestUpdateSegmentIndexNotExists(t *testing.T) {
m := &meta{
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{},
},
indexes: map[UniqueID]map[UniqueID]*model.Index{},
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
}
assert.NotPanics(t, func() {
m.updateSegmentIndex(&model.SegmentIndex{
SegmentID: 1,
IndexID: 2,
})
})
}
func TestMeta_DeleteTask_Error(t *testing.T) {
m := &meta{buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex)}
t.Run("segment index not found", func(t *testing.T) {
err := m.DeleteTask(buildID)
assert.NoError(t, err)
})
t.Run("segment update failed", func(t *testing.T) {
ec := catalogmocks.NewDataCoordCatalog(t)
ec.On("AlterSegmentIndexes",
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
m.catalog = ec
m.buildID2SegmentIndex[buildID] = &model.SegmentIndex{
SegmentID: segID,
PartitionID: partID,
CollectionID: collID,
}
err := m.DeleteTask(buildID)
assert.Error(t, err)
})
}