Remove redundant LoadPrefix requests for Catalog ListCollections (#21551)

Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2023-01-06 10:09:36 +08:00 committed by GitHub
parent 24d5faf7aa
commit 09e1d4fc83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 159 additions and 43 deletions

View File

@ -274,11 +274,8 @@ func (kc *Catalog) listFieldsAfter210(ctx context.Context, collectionID typeutil
return fields, nil
}
func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*model.Collection, error) {
collMeta, err := kc.loadCollection(ctx, collectionID, ts)
if err != nil {
return nil, err
}
func (kc *Catalog) appendPartitionAndFieldsInfo(ctx context.Context, collMeta *pb.CollectionInfo,
ts typeutil.Timestamp) (*model.Collection, error) {
collection := model.UnmarshalCollectionModel(collMeta)
@ -286,13 +283,13 @@ func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.
return collection, nil
}
partitions, err := kc.listPartitionsAfter210(ctx, collectionID, ts)
partitions, err := kc.listPartitionsAfter210(ctx, collection.CollectionID, ts)
if err != nil {
return nil, err
}
collection.Partitions = partitions
fields, err := kc.listFieldsAfter210(ctx, collectionID, ts)
fields, err := kc.listFieldsAfter210(ctx, collection.CollectionID, ts)
if err != nil {
return nil, err
}
@ -301,6 +298,16 @@ func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.
return collection, nil
}
func (kc *Catalog) GetCollectionByID(ctx context.Context, collectionID typeutil.UniqueID,
ts typeutil.Timestamp) (*model.Collection, error) {
collMeta, err := kc.loadCollection(ctx, collectionID, ts)
if err != nil {
return nil, err
}
return kc.appendPartitionAndFieldsInfo(ctx, collMeta, ts)
}
func (kc *Catalog) CollectionExists(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool {
_, err := kc.GetCollectionByID(ctx, collectionID, ts)
return err == nil
@ -501,7 +508,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, ts typeutil.Timestamp) (
zap.String("prefix", CollectionMetaPrefix),
zap.Uint64("timestamp", ts),
zap.Error(err))
return nil, nil
return nil, err
}
colls := make(map[string]*model.Collection)
@ -512,7 +519,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, ts typeutil.Timestamp) (
log.Warn("unmarshal collection info failed", zap.Error(err))
continue
}
collection, err := kc.GetCollectionByID(ctx, collMeta.GetID(), ts)
collection, err := kc.appendPartitionAndFieldsInfo(ctx, &collMeta, ts)
if err != nil {
return nil, err
}

View File

@ -31,20 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type MockedTxnKV struct {
kv.TxnKV
loadWithPrefixFn func(key string) ([]string, []string, error)
}
func (mc *MockedTxnKV) LoadWithPrefix(key string) ([]string, []string, error) {
return mc.loadWithPrefixFn(key)
}
type MockedSnapShotKV struct {
mock.Mock
kv.SnapShotKV
}
var (
indexName = "idx"
IndexID = 1
@ -58,29 +44,152 @@ var (
}
)
func getStrIndexPb(t *testing.T) string {
idxPB := model.MarshalIndexModel(&index)
msg, err := proto.Marshal(idxPB)
assert.Nil(t, err)
return string(msg)
}
func TestCatalog_ListCollections(t *testing.T) {
ctx := context.Background()
func getStrSegIdxPb(idx model.Index, newSegIdx model.SegmentIndex) (string, error) {
segIdxInfo := &pb.SegmentIndexInfo{
CollectionID: idx.CollectionID,
PartitionID: newSegIdx.PartitionID,
SegmentID: newSegIdx.SegmentID,
BuildID: newSegIdx.BuildID,
//EnableIndex: newSegIdx.EnableIndex,
CreateTime: newSegIdx.CreateTime,
FieldID: idx.FieldID,
IndexID: idx.IndexID,
coll1 := &pb.CollectionInfo{
ID: 1,
PartitionIDs: []int64{100},
PartitionNames: []string{"0"},
PartitionCreatedTimestamps: []uint64{1},
Schema: &schemapb.CollectionSchema{
Name: "c1",
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
Name: "f1",
},
},
},
}
msg, err := proto.Marshal(segIdxInfo)
if err != nil {
return "", err
coll2 := &pb.CollectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Name: "c1",
Fields: []*schemapb.FieldSchema{
{},
},
},
}
return string(msg), nil
targetErr := errors.New("fail")
t.Run("load collection with prefix fail", func(t *testing.T) {
kv := mocks.NewSnapShotKV(t)
ts := uint64(1)
kv.On("LoadWithPrefix", CollectionMetaPrefix, ts).
Return(nil, nil, targetErr)
kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, ts)
assert.ErrorIs(t, err, targetErr)
assert.Nil(t, ret)
})
t.Run("list partition fail", func(t *testing.T) {
kv := mocks.NewSnapShotKV(t)
ts := uint64(1)
bColl, err := proto.Marshal(coll2)
assert.NoError(t, err)
kv.On("LoadWithPrefix", CollectionMetaPrefix, ts).
Return(nil, []string{string(bColl)}, nil)
kv.On("LoadWithPrefix", mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, PartitionMetaPrefix)
}), ts).
Return(nil, nil, targetErr)
kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, ts)
assert.ErrorIs(t, err, targetErr)
assert.Nil(t, ret)
})
t.Run("list fields fail", func(t *testing.T) {
kv := mocks.NewSnapShotKV(t)
ts := uint64(1)
bColl, err := proto.Marshal(coll2)
assert.NoError(t, err)
kv.On("LoadWithPrefix", CollectionMetaPrefix, ts).
Return(nil, []string{string(bColl)}, nil)
partitionMeta := &pb.PartitionInfo{}
pm, err := proto.Marshal(partitionMeta)
assert.NoError(t, err)
kv.On("LoadWithPrefix", mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, PartitionMetaPrefix)
}), ts).
Return(nil, []string{string(pm)}, nil)
kv.On("LoadWithPrefix", mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, FieldMetaPrefix)
}), ts).
Return(nil, nil, targetErr)
kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, ts)
assert.ErrorIs(t, err, targetErr)
assert.Nil(t, ret)
})
t.Run("list collection ok for 210 version", func(t *testing.T) {
kv := mocks.NewSnapShotKV(t)
ts := uint64(1)
bColl, err := proto.Marshal(coll1)
assert.NoError(t, err)
kv.On("LoadWithPrefix", CollectionMetaPrefix, ts).
Return(nil, []string{string(bColl)}, nil)
kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, ts)
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
modCol := ret["c1"]
assert.Equal(t, coll1.ID, modCol.CollectionID)
})
t.Run("list collection ok for the newest version", func(t *testing.T) {
kv := mocks.NewSnapShotKV(t)
ts := uint64(1)
bColl, err := proto.Marshal(coll2)
assert.NoError(t, err)
kv.On("LoadWithPrefix", CollectionMetaPrefix, ts).
Return(nil, []string{string(bColl)}, nil)
partitionMeta := &pb.PartitionInfo{}
pm, err := proto.Marshal(partitionMeta)
assert.NoError(t, err)
kv.On("LoadWithPrefix", mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, PartitionMetaPrefix)
}), ts).
Return(nil, []string{string(pm)}, nil)
fieldMeta := &schemapb.FieldSchema{}
fm, err := proto.Marshal(fieldMeta)
assert.NoError(t, err)
kv.On("LoadWithPrefix", mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, FieldMetaPrefix)
}), ts).
Return(nil, []string{string(fm)}, nil)
kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, ts)
assert.NoError(t, err)
assert.NotNil(t, ret)
assert.Equal(t, 1, len(ret))
})
}
func TestCatalog_loadCollection(t *testing.T) {