Fix override index name

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2021-03-10 16:21:49 +08:00 committed by yefu.chen
parent 317158d4cb
commit aee7c74762
2 changed files with 90 additions and 15 deletions

View File

@ -784,6 +784,51 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, rsp.IndexDescriptions[0].IndexName, Params.DefaultIndexName) assert.Equal(t, rsp.IndexDescriptions[0].IndexName, Params.DefaultIndexName)
}) })
t.Run("over ride index", func(t *testing.T) {
req := &milvuspb.CreateIndexRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateIndex,
MsgID: 211,
Timestamp: 211,
SourceID: 211,
},
DbName: "",
CollectionName: "testColl",
FieldName: "vector",
ExtraParams: []*commonpb.KeyValuePair{
{
Key: "ik3",
Value: "iv3",
},
},
}
collMeta, err := core.MetaTable.GetCollectionByName("testColl")
assert.Nil(t, err)
assert.Equal(t, len(collMeta.FieldIndexes), 1)
oldIdx := collMeta.FieldIndexes[0].IndexID
rsp, err := core.CreateIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_ERROR_CODE_SUCCESS)
time.Sleep(time.Second)
collMeta, err = core.MetaTable.GetCollectionByName("testColl")
assert.Nil(t, err)
assert.Equal(t, len(collMeta.FieldIndexes), 2)
assert.Equal(t, oldIdx, collMeta.FieldIndexes[0].IndexID)
idxMeta, err := core.MetaTable.GetIndexByID(collMeta.FieldIndexes[1].IndexID)
assert.Nil(t, err)
assert.Equal(t, idxMeta.IndexName, Params.DefaultIndexName)
idxMeta, err = core.MetaTable.GetIndexByID(collMeta.FieldIndexes[0].IndexID)
assert.Nil(t, err)
assert.Equal(t, idxMeta.IndexName, Params.DefaultIndexName+"_bak")
})
t.Run("drop index", func(t *testing.T) { t.Run("drop index", func(t *testing.T) {
req := &milvuspb.DropIndexRequest{ req := &milvuspb.DropIndexRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{

View File

@ -775,6 +775,9 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
mt.ddLock.Lock() mt.ddLock.Lock()
defer mt.ddLock.Unlock() defer mt.ddLock.Unlock()
if idxInfo.IndexParams == nil {
return nil, schemapb.FieldSchema{}, fmt.Errorf("index param is nil")
}
collID, ok := mt.collName2ID[collName] collID, ok := mt.collName2ID[collName]
if !ok { if !ok {
return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName) return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
@ -788,25 +791,33 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
return nil, fieldSchema, err return nil, fieldSchema, err
} }
exist := false var dupIdx typeutil.UniqueID = 0
var existInfo pb.IndexInfo for _, f := range collMeta.FieldIndexes {
if idxInfo.IndexParams != nil { if f.FiledID == fieldSchema.FieldID {
for _, f := range collMeta.FieldIndexes { if info, ok := mt.indexID2Meta[f.IndexID]; ok {
if f.FiledID == fieldSchema.FieldID { if info.IndexName == idxInfo.IndexName {
existInfo, ok = mt.indexID2Meta[f.IndexID] dupIdx = info.IndexID
if !ok {
return nil, schemapb.FieldSchema{}, fmt.Errorf("index id = %d not found", f.IndexID)
}
// (collMeta.IndexNames[i] == indexName)
if EqualKeyPairArray(existInfo.IndexParams, idxInfo.IndexParams) {
exist = true
break break
} }
} }
} }
} }
if !exist && idxInfo.IndexParams != nil {
exist := false
var existInfo pb.IndexInfo
for _, f := range collMeta.FieldIndexes {
if f.FiledID == fieldSchema.FieldID {
existInfo, ok = mt.indexID2Meta[f.IndexID]
if !ok {
return nil, schemapb.FieldSchema{}, fmt.Errorf("index id = %d not found", f.IndexID)
}
if EqualKeyPairArray(existInfo.IndexParams, idxInfo.IndexParams) {
exist = true
break
}
}
}
if !exist {
idx := &pb.FieldIndexInfo{ idx := &pb.FieldIndexInfo{
FiledID: fieldSchema.FieldID, FiledID: fieldSchema.FieldID,
IndexID: idxInfo.IndexID, IndexID: idxInfo.IndexID,
@ -821,6 +832,15 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
v2 := proto.MarshalTextString(idxInfo) v2 := proto.MarshalTextString(idxInfo)
meta := map[string]string{k1: v1, k2: v2} meta := map[string]string{k1: v1, k2: v2}
if dupIdx != 0 {
dupInfo := mt.indexID2Meta[dupIdx]
dupInfo.IndexName = dupInfo.IndexName + "_bak"
mt.indexID2Meta[dupIdx] = dupInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
v := proto.MarshalTextString(&dupInfo)
meta[k] = v
}
err = mt.client.MultiSave(meta) err = mt.client.MultiSave(meta)
if err != nil { if err != nil {
_ = mt.reloadFromKV() _ = mt.reloadFromKV()
@ -834,7 +854,17 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
mt.indexID2Meta[existInfo.IndexID] = existInfo mt.indexID2Meta[existInfo.IndexID] = existInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10)) k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
v := proto.MarshalTextString(&existInfo) v := proto.MarshalTextString(&existInfo)
err = mt.client.Save(k, v) meta := map[string]string{k: v}
if dupIdx != 0 {
dupInfo := mt.indexID2Meta[dupIdx]
dupInfo.IndexName = dupInfo.IndexName + "_bak"
mt.indexID2Meta[dupIdx] = dupInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
v := proto.MarshalTextString(&dupInfo)
meta[k] = v
}
err = mt.client.MultiSave(meta)
if err != nil { if err != nil {
_ = mt.reloadFromKV() _ = mt.reloadFromKV()
return nil, schemapb.FieldSchema{}, err return nil, schemapb.FieldSchema{}, err