Move meta store to kv catalog (#25915)

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2023-07-31 13:57:04 +08:00 committed by GitHub
parent 45def2a31f
commit a3e22786ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1053 additions and 1204 deletions

View File

@ -6,12 +6,10 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util"
)
var (
@ -24,10 +22,9 @@ const (
ReplicaPrefix = "querycoord-replica"
CollectionMetaPrefixV1 = "queryCoord-collectionMeta"
ReplicaMetaPrefixV1 = "queryCoord-ReplicaMeta"
ResourceGroupPrefix = "queryCoord-ResourceGroup"
)
type WatchStoreChan = clientv3.WatchChan
type Catalog struct {
cli kv.MetaKv
}
@ -38,13 +35,23 @@ func NewCatalog(cli kv.MetaKv) Catalog {
}
}
func (s Catalog) SaveCollection(info *querypb.CollectionLoadInfo) error {
k := EncodeCollectionLoadInfoKey(info.GetCollectionID())
v, err := proto.Marshal(info)
func (s Catalog) SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error {
k := EncodeCollectionLoadInfoKey(collection.GetCollectionID())
v, err := proto.Marshal(collection)
if err != nil {
return err
}
return s.cli.Save(k, string(v))
kvs := make(map[string]string)
for _, partition := range partitions {
key := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID())
value, err := proto.Marshal(partition)
if err != nil {
return err
}
kvs[key] = string(value)
}
kvs[k] = string(v)
return s.cli.MultiSave(kvs)
}
func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
@ -61,7 +68,7 @@ func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
}
func (s Catalog) SaveReplica(replica *querypb.Replica) error {
key := EncodeReplicaKey(replica.GetCollectionID(), replica.GetID())
key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID())
value, err := proto.Marshal(replica)
if err != nil {
return err
@ -69,6 +76,26 @@ func (s Catalog) SaveReplica(replica *querypb.Replica) error {
return s.cli.Save(key, string(value))
}
func (s Catalog) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {
ret := make(map[string]string)
for _, rg := range rgs {
key := encodeResourceGroupKey(rg.GetName())
value, err := proto.Marshal(rg)
if err != nil {
return err
}
ret[key] = string(value)
}
return s.cli.MultiSave(ret)
}
func (s Catalog) RemoveResourceGroup(rgName string) error {
key := encodeResourceGroupKey(rgName)
return s.cli.Remove(key)
}
func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(CollectionLoadInfoPrefix)
if err != nil {
@ -149,9 +176,46 @@ func (s Catalog) getReplicasFromV1() ([]*querypb.Replica, error) {
return ret, nil
}
func (s Catalog) ReleaseCollection(id int64) error {
k := EncodeCollectionLoadInfoKey(id)
return s.cli.Remove(k)
func (s Catalog) GetResourceGroups() ([]*querypb.ResourceGroup, error) {
_, rgs, err := s.cli.LoadWithPrefix(ResourceGroupPrefix)
if err != nil {
return nil, err
}
ret := make([]*querypb.ResourceGroup, 0, len(rgs))
for _, value := range rgs {
rg := &querypb.ResourceGroup{}
err := proto.Unmarshal([]byte(value), rg)
if err != nil {
return nil, err
}
ret = append(ret, rg)
}
return ret, nil
}
func (s Catalog) ReleaseCollection(collection int64) error {
// obtain partitions of this collection
_, values, err := s.cli.LoadWithPrefix(fmt.Sprintf("%s/%d", PartitionLoadInfoPrefix, collection))
if err != nil {
return err
}
partitions := make([]*querypb.PartitionLoadInfo, 0)
for _, v := range values {
info := querypb.PartitionLoadInfo{}
if err = proto.Unmarshal([]byte(v), &info); err != nil {
return err
}
partitions = append(partitions, &info)
}
// remove collection and obtained partitions
keys := lo.Map(partitions, func(partition *querypb.PartitionLoadInfo, _ int) string {
return EncodePartitionLoadInfoKey(collection, partition.GetPartitionID())
})
k := EncodeCollectionLoadInfoKey(collection)
keys = append(keys, k)
return s.cli.MultiRemove(keys)
}
func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error {
@ -162,17 +226,12 @@ func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error {
}
func (s Catalog) ReleaseReplicas(collectionID int64) error {
key := EncodeCollectionReplicaKey(collectionID)
key := encodeCollectionReplicaKey(collectionID)
return s.cli.RemoveWithPrefix(key)
}
func (s Catalog) ReleaseReplica(collection, replica int64) error {
key := EncodeReplicaKey(collection, replica)
return s.cli.Remove(key)
}
func (s Catalog) RemoveHandoffEvent(info *querypb.SegmentInfo) error {
key := EncodeHandoffEventKey(info.CollectionID, info.PartitionID, info.SegmentID)
key := encodeReplicaKey(collection, replica)
return s.cli.Remove(key)
}
@ -184,14 +243,14 @@ func EncodePartitionLoadInfoKey(collection, partition int64) string {
return fmt.Sprintf("%s/%d/%d", PartitionLoadInfoPrefix, collection, partition)
}
func EncodeReplicaKey(collection, replica int64) string {
func encodeReplicaKey(collection, replica int64) string {
return fmt.Sprintf("%s/%d/%d", ReplicaPrefix, collection, replica)
}
func EncodeCollectionReplicaKey(collection int64) string {
func encodeCollectionReplicaKey(collection int64) string {
return fmt.Sprintf("%s/%d", ReplicaPrefix, collection)
}
func EncodeHandoffEventKey(collection, partition, segment int64) string {
return fmt.Sprintf("%s/%d/%d/%d", util.HandoffSegmentPrefix, collection, partition, segment)
func encodeResourceGroupKey(rgName string) string {
return fmt.Sprintf("%s/%s", ResourceGroupPrefix, rgName)
}

View File

@ -1,27 +1,190 @@
package querycoord
import (
"sort"
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/util/etcd"
)
type StoreTestSuite struct {
type CatalogTestSuite struct {
suite.Suite
store Catalog
kv kv.MetaKv
catalog Catalog
}
func (suite *StoreTestSuite) SetupTest() {
//kv := memkv.NewMemoryKV()
//suite.store = NewMetaStore(kv)
func (suite *CatalogTestSuite) SetupSuite() {
Params.Init()
}
func (suite *StoreTestSuite) TearDownTest() {}
func (suite *CatalogTestSuite) SetupTest() {
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.catalog = NewCatalog(suite.kv)
}
func (suite *StoreTestSuite) TestLoadRelease() {
func (suite *CatalogTestSuite) TearDownTest() {
if suite.kv != nil {
suite.kv.Close()
}
}
func (suite *CatalogTestSuite) TestCollection() {
suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 1,
})
suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 2,
})
suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 3,
})
suite.catalog.ReleaseCollection(1)
suite.catalog.ReleaseCollection(2)
collections, err := suite.catalog.GetCollections()
suite.NoError(err)
suite.Len(collections, 1)
}
func (suite *CatalogTestSuite) TestCollectionWithPartition() {
suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 1,
})
suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 2,
}, &querypb.PartitionLoadInfo{
CollectionID: 2,
PartitionID: 102,
})
suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 3,
}, &querypb.PartitionLoadInfo{
CollectionID: 3,
PartitionID: 103,
})
suite.catalog.ReleaseCollection(1)
suite.catalog.ReleaseCollection(2)
collections, err := suite.catalog.GetCollections()
suite.NoError(err)
suite.Len(collections, 1)
suite.Equal(int64(3), collections[0].GetCollectionID())
partitions, err := suite.catalog.GetPartitions()
suite.NoError(err)
suite.Len(partitions, 1)
suite.Len(partitions[int64(3)], 1)
suite.Equal(int64(103), partitions[int64(3)][0].GetPartitionID())
}
func (suite *CatalogTestSuite) TestPartition() {
suite.catalog.SavePartition(&querypb.PartitionLoadInfo{
PartitionID: 1,
})
suite.catalog.SavePartition(&querypb.PartitionLoadInfo{
PartitionID: 2,
})
suite.catalog.SavePartition(&querypb.PartitionLoadInfo{
PartitionID: 3,
})
suite.catalog.ReleasePartition(1)
suite.catalog.ReleasePartition(2)
partitions, err := suite.catalog.GetPartitions()
suite.NoError(err)
suite.Len(partitions, 1)
}
func (suite *CatalogTestSuite) TestReplica() {
suite.catalog.SaveReplica(&querypb.Replica{
CollectionID: 1,
ID: 1,
})
suite.catalog.SaveReplica(&querypb.Replica{
CollectionID: 1,
ID: 2,
})
suite.catalog.SaveReplica(&querypb.Replica{
CollectionID: 1,
ID: 3,
})
suite.catalog.ReleaseReplica(1, 1)
suite.catalog.ReleaseReplica(1, 2)
replicas, err := suite.catalog.GetReplicas()
suite.NoError(err)
suite.Len(replicas, 1)
}
func (suite *CatalogTestSuite) TestResourceGroup() {
suite.catalog.SaveResourceGroup(&querypb.ResourceGroup{
Name: "rg1",
Capacity: 3,
Nodes: []int64{1, 2, 3},
})
suite.catalog.SaveResourceGroup(&querypb.ResourceGroup{
Name: "rg2",
Capacity: 3,
Nodes: []int64{4, 5},
})
suite.catalog.SaveResourceGroup(&querypb.ResourceGroup{
Name: "rg3",
Capacity: 0,
Nodes: []int64{},
})
suite.catalog.RemoveResourceGroup("rg3")
groups, err := suite.catalog.GetResourceGroups()
suite.NoError(err)
suite.Len(groups, 2)
sort.Slice(groups, func(i, j int) bool {
return groups[i].GetName() < groups[j].GetName()
})
suite.Equal("rg1", groups[0].GetName())
suite.Equal(int32(3), groups[0].GetCapacity())
suite.Equal([]int64{1, 2, 3}, groups[0].GetNodes())
suite.Equal("rg2", groups[1].GetName())
suite.Equal(int32(3), groups[1].GetCapacity())
suite.Equal([]int64{4, 5}, groups[1].GetNodes())
}
func (suite *CatalogTestSuite) TestLoadRelease() {
// TODO(sunby): add ut
}
func TestStoreSuite(t *testing.T) {
suite.Run(t, new(StoreTestSuite))
func TestCatalogSuite(t *testing.T) {
suite.Run(t, new(CatalogTestSuite))
}

View File

@ -0,0 +1,682 @@
// Code generated by mockery v2.32.0. DO NOT EDIT.
package mocks
import (
querypb "github.com/milvus-io/milvus/internal/proto/querypb"
mock "github.com/stretchr/testify/mock"
)
// QueryCoordCatalog is an autogenerated mock type for the QueryCoordCatalog type
type QueryCoordCatalog struct {
mock.Mock
}
type QueryCoordCatalog_Expecter struct {
mock *mock.Mock
}
func (_m *QueryCoordCatalog) EXPECT() *QueryCoordCatalog_Expecter {
return &QueryCoordCatalog_Expecter{mock: &_m.Mock}
}
// GetCollections provides a mock function with given fields:
func (_m *QueryCoordCatalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
ret := _m.Called()
var r0 []*querypb.CollectionLoadInfo
var r1 error
if rf, ok := ret.Get(0).(func() ([]*querypb.CollectionLoadInfo, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []*querypb.CollectionLoadInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*querypb.CollectionLoadInfo)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// QueryCoordCatalog_GetCollections_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollections'
type QueryCoordCatalog_GetCollections_Call struct {
*mock.Call
}
// GetCollections is a helper method to define mock.On call
func (_e *QueryCoordCatalog_Expecter) GetCollections() *QueryCoordCatalog_GetCollections_Call {
return &QueryCoordCatalog_GetCollections_Call{Call: _e.mock.On("GetCollections")}
}
func (_c *QueryCoordCatalog_GetCollections_Call) Run(run func()) *QueryCoordCatalog_GetCollections_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *QueryCoordCatalog_GetCollections_Call) Return(_a0 []*querypb.CollectionLoadInfo, _a1 error) *QueryCoordCatalog_GetCollections_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *QueryCoordCatalog_GetCollections_Call) RunAndReturn(run func() ([]*querypb.CollectionLoadInfo, error)) *QueryCoordCatalog_GetCollections_Call {
_c.Call.Return(run)
return _c
}
// GetPartitions provides a mock function with given fields:
func (_m *QueryCoordCatalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) {
ret := _m.Called()
var r0 map[int64][]*querypb.PartitionLoadInfo
var r1 error
if rf, ok := ret.Get(0).(func() (map[int64][]*querypb.PartitionLoadInfo, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() map[int64][]*querypb.PartitionLoadInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64][]*querypb.PartitionLoadInfo)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// QueryCoordCatalog_GetPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPartitions'
type QueryCoordCatalog_GetPartitions_Call struct {
*mock.Call
}
// GetPartitions is a helper method to define mock.On call
func (_e *QueryCoordCatalog_Expecter) GetPartitions() *QueryCoordCatalog_GetPartitions_Call {
return &QueryCoordCatalog_GetPartitions_Call{Call: _e.mock.On("GetPartitions")}
}
func (_c *QueryCoordCatalog_GetPartitions_Call) Run(run func()) *QueryCoordCatalog_GetPartitions_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *QueryCoordCatalog_GetPartitions_Call) Return(_a0 map[int64][]*querypb.PartitionLoadInfo, _a1 error) *QueryCoordCatalog_GetPartitions_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *QueryCoordCatalog_GetPartitions_Call) RunAndReturn(run func() (map[int64][]*querypb.PartitionLoadInfo, error)) *QueryCoordCatalog_GetPartitions_Call {
_c.Call.Return(run)
return _c
}
// GetReplicas provides a mock function with given fields:
func (_m *QueryCoordCatalog) GetReplicas() ([]*querypb.Replica, error) {
ret := _m.Called()
var r0 []*querypb.Replica
var r1 error
if rf, ok := ret.Get(0).(func() ([]*querypb.Replica, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []*querypb.Replica); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*querypb.Replica)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// QueryCoordCatalog_GetReplicas_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetReplicas'
type QueryCoordCatalog_GetReplicas_Call struct {
*mock.Call
}
// GetReplicas is a helper method to define mock.On call
func (_e *QueryCoordCatalog_Expecter) GetReplicas() *QueryCoordCatalog_GetReplicas_Call {
return &QueryCoordCatalog_GetReplicas_Call{Call: _e.mock.On("GetReplicas")}
}
func (_c *QueryCoordCatalog_GetReplicas_Call) Run(run func()) *QueryCoordCatalog_GetReplicas_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *QueryCoordCatalog_GetReplicas_Call) Return(_a0 []*querypb.Replica, _a1 error) *QueryCoordCatalog_GetReplicas_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *QueryCoordCatalog_GetReplicas_Call) RunAndReturn(run func() ([]*querypb.Replica, error)) *QueryCoordCatalog_GetReplicas_Call {
_c.Call.Return(run)
return _c
}
// GetResourceGroups provides a mock function with given fields:
func (_m *QueryCoordCatalog) GetResourceGroups() ([]*querypb.ResourceGroup, error) {
ret := _m.Called()
var r0 []*querypb.ResourceGroup
var r1 error
if rf, ok := ret.Get(0).(func() ([]*querypb.ResourceGroup, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []*querypb.ResourceGroup); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*querypb.ResourceGroup)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// QueryCoordCatalog_GetResourceGroups_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetResourceGroups'
type QueryCoordCatalog_GetResourceGroups_Call struct {
*mock.Call
}
// GetResourceGroups is a helper method to define mock.On call
func (_e *QueryCoordCatalog_Expecter) GetResourceGroups() *QueryCoordCatalog_GetResourceGroups_Call {
return &QueryCoordCatalog_GetResourceGroups_Call{Call: _e.mock.On("GetResourceGroups")}
}
func (_c *QueryCoordCatalog_GetResourceGroups_Call) Run(run func()) *QueryCoordCatalog_GetResourceGroups_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *QueryCoordCatalog_GetResourceGroups_Call) Return(_a0 []*querypb.ResourceGroup, _a1 error) *QueryCoordCatalog_GetResourceGroups_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *QueryCoordCatalog_GetResourceGroups_Call) RunAndReturn(run func() ([]*querypb.ResourceGroup, error)) *QueryCoordCatalog_GetResourceGroups_Call {
_c.Call.Return(run)
return _c
}
// ReleaseCollection provides a mock function with given fields: collection
func (_m *QueryCoordCatalog) ReleaseCollection(collection int64) error {
ret := _m.Called(collection)
var r0 error
if rf, ok := ret.Get(0).(func(int64) error); ok {
r0 = rf(collection)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_ReleaseCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleaseCollection'
type QueryCoordCatalog_ReleaseCollection_Call struct {
*mock.Call
}
// ReleaseCollection is a helper method to define mock.On call
// - collection int64
func (_e *QueryCoordCatalog_Expecter) ReleaseCollection(collection interface{}) *QueryCoordCatalog_ReleaseCollection_Call {
return &QueryCoordCatalog_ReleaseCollection_Call{Call: _e.mock.On("ReleaseCollection", collection)}
}
func (_c *QueryCoordCatalog_ReleaseCollection_Call) Run(run func(collection int64)) *QueryCoordCatalog_ReleaseCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *QueryCoordCatalog_ReleaseCollection_Call) Return(_a0 error) *QueryCoordCatalog_ReleaseCollection_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_ReleaseCollection_Call) RunAndReturn(run func(int64) error) *QueryCoordCatalog_ReleaseCollection_Call {
_c.Call.Return(run)
return _c
}
// ReleasePartition provides a mock function with given fields: collection, partitions
func (_m *QueryCoordCatalog) ReleasePartition(collection int64, partitions ...int64) error {
_va := make([]interface{}, len(partitions))
for _i := range partitions {
_va[_i] = partitions[_i]
}
var _ca []interface{}
_ca = append(_ca, collection)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(int64, ...int64) error); ok {
r0 = rf(collection, partitions...)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_ReleasePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleasePartition'
type QueryCoordCatalog_ReleasePartition_Call struct {
*mock.Call
}
// ReleasePartition is a helper method to define mock.On call
// - collection int64
// - partitions ...int64
func (_e *QueryCoordCatalog_Expecter) ReleasePartition(collection interface{}, partitions ...interface{}) *QueryCoordCatalog_ReleasePartition_Call {
return &QueryCoordCatalog_ReleasePartition_Call{Call: _e.mock.On("ReleasePartition",
append([]interface{}{collection}, partitions...)...)}
}
func (_c *QueryCoordCatalog_ReleasePartition_Call) Run(run func(collection int64, partitions ...int64)) *QueryCoordCatalog_ReleasePartition_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-1)
for i, a := range args[1:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(int64), variadicArgs...)
})
return _c
}
func (_c *QueryCoordCatalog_ReleasePartition_Call) Return(_a0 error) *QueryCoordCatalog_ReleasePartition_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_ReleasePartition_Call) RunAndReturn(run func(int64, ...int64) error) *QueryCoordCatalog_ReleasePartition_Call {
_c.Call.Return(run)
return _c
}
// ReleaseReplica provides a mock function with given fields: collection, replica
func (_m *QueryCoordCatalog) ReleaseReplica(collection int64, replica int64) error {
ret := _m.Called(collection, replica)
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64) error); ok {
r0 = rf(collection, replica)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_ReleaseReplica_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleaseReplica'
type QueryCoordCatalog_ReleaseReplica_Call struct {
*mock.Call
}
// ReleaseReplica is a helper method to define mock.On call
// - collection int64
// - replica int64
func (_e *QueryCoordCatalog_Expecter) ReleaseReplica(collection interface{}, replica interface{}) *QueryCoordCatalog_ReleaseReplica_Call {
return &QueryCoordCatalog_ReleaseReplica_Call{Call: _e.mock.On("ReleaseReplica", collection, replica)}
}
func (_c *QueryCoordCatalog_ReleaseReplica_Call) Run(run func(collection int64, replica int64)) *QueryCoordCatalog_ReleaseReplica_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
})
return _c
}
func (_c *QueryCoordCatalog_ReleaseReplica_Call) Return(_a0 error) *QueryCoordCatalog_ReleaseReplica_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_ReleaseReplica_Call) RunAndReturn(run func(int64, int64) error) *QueryCoordCatalog_ReleaseReplica_Call {
_c.Call.Return(run)
return _c
}
// ReleaseReplicas provides a mock function with given fields: collectionID
func (_m *QueryCoordCatalog) ReleaseReplicas(collectionID int64) error {
ret := _m.Called(collectionID)
var r0 error
if rf, ok := ret.Get(0).(func(int64) error); ok {
r0 = rf(collectionID)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_ReleaseReplicas_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleaseReplicas'
type QueryCoordCatalog_ReleaseReplicas_Call struct {
*mock.Call
}
// ReleaseReplicas is a helper method to define mock.On call
// - collectionID int64
func (_e *QueryCoordCatalog_Expecter) ReleaseReplicas(collectionID interface{}) *QueryCoordCatalog_ReleaseReplicas_Call {
return &QueryCoordCatalog_ReleaseReplicas_Call{Call: _e.mock.On("ReleaseReplicas", collectionID)}
}
func (_c *QueryCoordCatalog_ReleaseReplicas_Call) Run(run func(collectionID int64)) *QueryCoordCatalog_ReleaseReplicas_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *QueryCoordCatalog_ReleaseReplicas_Call) Return(_a0 error) *QueryCoordCatalog_ReleaseReplicas_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_ReleaseReplicas_Call) RunAndReturn(run func(int64) error) *QueryCoordCatalog_ReleaseReplicas_Call {
_c.Call.Return(run)
return _c
}
// RemoveResourceGroup provides a mock function with given fields: rgName
func (_m *QueryCoordCatalog) RemoveResourceGroup(rgName string) error {
ret := _m.Called(rgName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(rgName)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_RemoveResourceGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveResourceGroup'
type QueryCoordCatalog_RemoveResourceGroup_Call struct {
*mock.Call
}
// RemoveResourceGroup is a helper method to define mock.On call
// - rgName string
func (_e *QueryCoordCatalog_Expecter) RemoveResourceGroup(rgName interface{}) *QueryCoordCatalog_RemoveResourceGroup_Call {
return &QueryCoordCatalog_RemoveResourceGroup_Call{Call: _e.mock.On("RemoveResourceGroup", rgName)}
}
func (_c *QueryCoordCatalog_RemoveResourceGroup_Call) Run(run func(rgName string)) *QueryCoordCatalog_RemoveResourceGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *QueryCoordCatalog_RemoveResourceGroup_Call) Return(_a0 error) *QueryCoordCatalog_RemoveResourceGroup_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_RemoveResourceGroup_Call) RunAndReturn(run func(string) error) *QueryCoordCatalog_RemoveResourceGroup_Call {
_c.Call.Return(run)
return _c
}
// SaveCollection provides a mock function with given fields: collection, partitions
func (_m *QueryCoordCatalog) SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error {
_va := make([]interface{}, len(partitions))
for _i := range partitions {
_va[_i] = partitions[_i]
}
var _ca []interface{}
_ca = append(_ca, collection)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(*querypb.CollectionLoadInfo, ...*querypb.PartitionLoadInfo) error); ok {
r0 = rf(collection, partitions...)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_SaveCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCollection'
type QueryCoordCatalog_SaveCollection_Call struct {
*mock.Call
}
// SaveCollection is a helper method to define mock.On call
// - collection *querypb.CollectionLoadInfo
// - partitions ...*querypb.PartitionLoadInfo
func (_e *QueryCoordCatalog_Expecter) SaveCollection(collection interface{}, partitions ...interface{}) *QueryCoordCatalog_SaveCollection_Call {
return &QueryCoordCatalog_SaveCollection_Call{Call: _e.mock.On("SaveCollection",
append([]interface{}{collection}, partitions...)...)}
}
func (_c *QueryCoordCatalog_SaveCollection_Call) Run(run func(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo)) *QueryCoordCatalog_SaveCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]*querypb.PartitionLoadInfo, len(args)-1)
for i, a := range args[1:] {
if a != nil {
variadicArgs[i] = a.(*querypb.PartitionLoadInfo)
}
}
run(args[0].(*querypb.CollectionLoadInfo), variadicArgs...)
})
return _c
}
func (_c *QueryCoordCatalog_SaveCollection_Call) Return(_a0 error) *QueryCoordCatalog_SaveCollection_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_SaveCollection_Call) RunAndReturn(run func(*querypb.CollectionLoadInfo, ...*querypb.PartitionLoadInfo) error) *QueryCoordCatalog_SaveCollection_Call {
_c.Call.Return(run)
return _c
}
// SavePartition provides a mock function with given fields: info
func (_m *QueryCoordCatalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
_va := make([]interface{}, len(info))
for _i := range info {
_va[_i] = info[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(...*querypb.PartitionLoadInfo) error); ok {
r0 = rf(info...)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_SavePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SavePartition'
type QueryCoordCatalog_SavePartition_Call struct {
*mock.Call
}
// SavePartition is a helper method to define mock.On call
// - info ...*querypb.PartitionLoadInfo
func (_e *QueryCoordCatalog_Expecter) SavePartition(info ...interface{}) *QueryCoordCatalog_SavePartition_Call {
return &QueryCoordCatalog_SavePartition_Call{Call: _e.mock.On("SavePartition",
append([]interface{}{}, info...)...)}
}
func (_c *QueryCoordCatalog_SavePartition_Call) Run(run func(info ...*querypb.PartitionLoadInfo)) *QueryCoordCatalog_SavePartition_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]*querypb.PartitionLoadInfo, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(*querypb.PartitionLoadInfo)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *QueryCoordCatalog_SavePartition_Call) Return(_a0 error) *QueryCoordCatalog_SavePartition_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_SavePartition_Call) RunAndReturn(run func(...*querypb.PartitionLoadInfo) error) *QueryCoordCatalog_SavePartition_Call {
_c.Call.Return(run)
return _c
}
// SaveReplica provides a mock function with given fields: replica
func (_m *QueryCoordCatalog) SaveReplica(replica *querypb.Replica) error {
ret := _m.Called(replica)
var r0 error
if rf, ok := ret.Get(0).(func(*querypb.Replica) error); ok {
r0 = rf(replica)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_SaveReplica_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveReplica'
type QueryCoordCatalog_SaveReplica_Call struct {
*mock.Call
}
// SaveReplica is a helper method to define mock.On call
// - replica *querypb.Replica
func (_e *QueryCoordCatalog_Expecter) SaveReplica(replica interface{}) *QueryCoordCatalog_SaveReplica_Call {
return &QueryCoordCatalog_SaveReplica_Call{Call: _e.mock.On("SaveReplica", replica)}
}
func (_c *QueryCoordCatalog_SaveReplica_Call) Run(run func(replica *querypb.Replica)) *QueryCoordCatalog_SaveReplica_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*querypb.Replica))
})
return _c
}
func (_c *QueryCoordCatalog_SaveReplica_Call) Return(_a0 error) *QueryCoordCatalog_SaveReplica_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_SaveReplica_Call) RunAndReturn(run func(*querypb.Replica) error) *QueryCoordCatalog_SaveReplica_Call {
_c.Call.Return(run)
return _c
}
// SaveResourceGroup provides a mock function with given fields: rgs
func (_m *QueryCoordCatalog) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {
_va := make([]interface{}, len(rgs))
for _i := range rgs {
_va[_i] = rgs[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(...*querypb.ResourceGroup) error); ok {
r0 = rf(rgs...)
} else {
r0 = ret.Error(0)
}
return r0
}
// QueryCoordCatalog_SaveResourceGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveResourceGroup'
type QueryCoordCatalog_SaveResourceGroup_Call struct {
*mock.Call
}
// SaveResourceGroup is a helper method to define mock.On call
// - rgs ...*querypb.ResourceGroup
func (_e *QueryCoordCatalog_Expecter) SaveResourceGroup(rgs ...interface{}) *QueryCoordCatalog_SaveResourceGroup_Call {
return &QueryCoordCatalog_SaveResourceGroup_Call{Call: _e.mock.On("SaveResourceGroup",
append([]interface{}{}, rgs...)...)}
}
func (_c *QueryCoordCatalog_SaveResourceGroup_Call) Run(run func(rgs ...*querypb.ResourceGroup)) *QueryCoordCatalog_SaveResourceGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]*querypb.ResourceGroup, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(*querypb.ResourceGroup)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *QueryCoordCatalog_SaveResourceGroup_Call) Return(_a0 error) *QueryCoordCatalog_SaveResourceGroup_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *QueryCoordCatalog_SaveResourceGroup_Call) RunAndReturn(run func(...*querypb.ResourceGroup) error) *QueryCoordCatalog_SaveResourceGroup_Call {
_c.Call.Return(run)
return _c
}
// NewQueryCoordCatalog creates a new instance of QueryCoordCatalog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewQueryCoordCatalog(t interface {
mock.TestingT
Cleanup(func())
}) *QueryCoordCatalog {
mock := &QueryCoordCatalog{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -61,7 +62,7 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.broker = meta.NewMockBroker(suite.T())
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
nodeManager := session.NewNodeManager()
testMeta := meta.NewMeta(idAllocator, store, nodeManager)

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -60,7 +61,7 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.broker = meta.NewMockBroker(suite.T())
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
nodeManager := session.NewNodeManager()
testMeta := meta.NewMeta(idAllocator, store, nodeManager)

View File

@ -22,6 +22,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -66,7 +67,7 @@ func (suite *BalanceCheckerTestSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -64,7 +65,7 @@ func (suite *ChannelCheckerTestSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -69,7 +70,7 @@ func (suite *CheckerControllerSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)

View File

@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -62,7 +63,7 @@ func (suite *IndexCheckerSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := params.RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -65,7 +66,7 @@ func (suite *SegmentCheckerTestSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -65,7 +66,7 @@ func (suite *DistControllerTestSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager())

View File

@ -28,6 +28,9 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
@ -56,7 +59,7 @@ type JobSuite struct {
// Dependencies
kv kv.MetaKv
store meta.Store
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
cluster *session.MockCluster
@ -149,7 +152,7 @@ func (suite *JobSuite) SetupTest() {
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.store = meta.NewMetaStore(suite.kv)
suite.store = querycoord.NewCatalog(suite.kv)
suite.dist = meta.NewDistributionManager()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)
@ -1071,7 +1074,7 @@ func (suite *JobSuite) TestDynamicRelease() {
func (suite *JobSuite) TestLoadCollectionStoreFailed() {
// Store collection failed
store := meta.NewMockStore(suite.T())
store := mocks.NewQueryCoordCatalog(suite.T())
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), store, suite.nodeMgr)
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
@ -1114,7 +1117,7 @@ func (suite *JobSuite) TestLoadCollectionStoreFailed() {
func (suite *JobSuite) TestLoadPartitionStoreFailed() {
// Store partition failed
store := meta.NewMockStore(suite.T())
store := mocks.NewQueryCoordCatalog(suite.T())
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), store, suite.nodeMgr)
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)

View File

@ -27,6 +27,7 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
@ -99,25 +100,25 @@ type CollectionManager struct {
collections map[UniqueID]*Collection
partitions map[UniqueID]*Partition
store Store
catalog metastore.QueryCoordCatalog
}
func NewCollectionManager(store Store) *CollectionManager {
func NewCollectionManager(catalog metastore.QueryCoordCatalog) *CollectionManager {
return &CollectionManager{
collections: make(map[int64]*Collection),
partitions: make(map[int64]*Partition),
store: store,
catalog: catalog,
}
}
// Recover recovers collections from kv store,
// panics if failed
func (m *CollectionManager) Recover(broker Broker) error {
collections, err := m.store.GetCollections()
collections, err := m.catalog.GetCollections()
if err != nil {
return err
}
partitions, err := m.store.GetPartitions()
partitions, err := m.catalog.GetPartitions()
if err != nil {
return err
}
@ -127,7 +128,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
_, err = broker.GetCollectionSchema(context.Background(), collection.GetCollectionID())
if errors.Is(err, merr.ErrCollectionNotFound) {
log.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
m.store.ReleaseCollection(collection.GetCollectionID())
m.catalog.ReleaseCollection(collection.GetCollectionID())
continue
}
if err != nil {
@ -140,7 +141,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
zap.String("status", collection.GetStatus().String()),
zap.Int32("replicaNumber", collection.GetReplicaNumber()),
)
m.store.ReleaseCollection(collection.GetCollectionID())
m.catalog.ReleaseCollection(collection.GetCollectionID())
continue
}
m.collections[collection.CollectionID] = &Collection{
@ -152,7 +153,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
existPartitions, err := broker.GetPartitions(context.Background(), collection)
if errors.Is(err, merr.ErrCollectionNotFound) {
log.Info("skip dropped collection during recovery", zap.Int64("collection", collection))
m.store.ReleaseCollection(collection)
m.catalog.ReleaseCollection(collection)
continue
}
if err != nil {
@ -169,7 +170,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
if len(omitPartitions) > 0 {
log.Info("skip dropped partitions during recovery",
zap.Int64("collection", collection), zap.Int64s("partitions", omitPartitions))
m.store.ReleasePartition(collection, omitPartitions...)
m.catalog.ReleasePartition(collection, omitPartitions...)
}
sawLoaded := false
@ -181,7 +182,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
zap.Int64("partitionID", partition.GetPartitionID()),
zap.String("status", partition.GetStatus().String()),
)
m.store.ReleasePartition(collection, partition.GetPartitionID())
m.catalog.ReleasePartition(collection, partition.GetPartitionID())
continue
}
@ -192,7 +193,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
}
if !sawLoaded {
m.store.ReleaseCollection(collection)
m.catalog.ReleaseCollection(collection)
}
}
@ -426,7 +427,7 @@ func (m *CollectionManager) putCollection(withSave bool, collection *Collection,
partitionInfos := lo.Map(partitions, func(partition *Partition, _ int) *querypb.PartitionLoadInfo {
return partition.PartitionLoadInfo
})
err := m.store.SaveCollection(collection.CollectionLoadInfo, partitionInfos...)
err := m.catalog.SaveCollection(collection.CollectionLoadInfo, partitionInfos...)
if err != nil {
return err
}
@ -460,7 +461,7 @@ func (m *CollectionManager) putPartition(partitions []*Partition, withSave bool)
loadInfos := lo.Map(partitions, func(partition *Partition, _ int) *querypb.PartitionLoadInfo {
return partition.PartitionLoadInfo
})
err := m.store.SavePartition(loadInfos...)
err := m.catalog.SavePartition(loadInfos...)
if err != nil {
return err
}
@ -535,7 +536,7 @@ func (m *CollectionManager) RemoveCollection(collectionID UniqueID) error {
_, ok := m.collections[collectionID]
if ok {
err := m.store.ReleaseCollection(collectionID)
err := m.catalog.ReleaseCollection(collectionID)
if err != nil {
return err
}
@ -562,7 +563,7 @@ func (m *CollectionManager) RemovePartition(ids ...UniqueID) error {
func (m *CollectionManager) removePartition(ids ...UniqueID) error {
partition := m.partitions[ids[0]]
err := m.store.ReleasePartition(partition.CollectionID, ids...)
err := m.catalog.ReleasePartition(partition.CollectionID, ids...)
if err != nil {
return err
}

View File

@ -28,6 +28,8 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/util/etcd"
@ -46,9 +48,9 @@ type CollectionManagerSuite struct {
parLoadPercent map[int64][]int32
// Mocks
kv kv.MetaKv
store Store
broker *MockBroker
kv kv.MetaKv
catalog metastore.QueryCoordCatalog
broker *MockBroker
// Test object
mgr *CollectionManager
@ -93,10 +95,10 @@ func (suite *CollectionManagerSuite) SetupTest() {
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.store = NewMetaStore(suite.kv)
suite.catalog = querycoord.NewCatalog(suite.kv)
suite.broker = NewMockBroker(suite.T())
suite.mgr = NewCollectionManager(suite.store)
suite.mgr = NewCollectionManager(suite.catalog)
suite.loadAll()
}

View File

@ -16,7 +16,10 @@
package meta
import "github.com/milvus-io/milvus/internal/querycoordv2/session"
import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
)
type Meta struct {
*CollectionManager
@ -26,12 +29,12 @@ type Meta struct {
func NewMeta(
idAllocator func() (int64, error),
store Store,
catalog metastore.QueryCoordCatalog,
nodeMgr *session.NodeManager,
) *Meta {
return &Meta{
NewCollectionManager(store),
NewReplicaManager(idAllocator, store),
NewResourceManager(store, nodeMgr),
NewCollectionManager(catalog),
NewReplicaManager(idAllocator, catalog),
NewResourceManager(catalog, nodeMgr),
}
}

View File

@ -1,606 +0,0 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
package meta
import (
querypb "github.com/milvus-io/milvus/internal/proto/querypb"
mock "github.com/stretchr/testify/mock"
)
// MockStore is an autogenerated mock type for the Store type
type MockStore struct {
mock.Mock
}
type MockStore_Expecter struct {
mock *mock.Mock
}
func (_m *MockStore) EXPECT() *MockStore_Expecter {
return &MockStore_Expecter{mock: &_m.Mock}
}
// GetCollections provides a mock function with given fields:
func (_m *MockStore) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
ret := _m.Called()
var r0 []*querypb.CollectionLoadInfo
if rf, ok := ret.Get(0).(func() []*querypb.CollectionLoadInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*querypb.CollectionLoadInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockStore_GetCollections_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollections'
type MockStore_GetCollections_Call struct {
*mock.Call
}
// GetCollections is a helper method to define mock.On call
func (_e *MockStore_Expecter) GetCollections() *MockStore_GetCollections_Call {
return &MockStore_GetCollections_Call{Call: _e.mock.On("GetCollections")}
}
func (_c *MockStore_GetCollections_Call) Run(run func()) *MockStore_GetCollections_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockStore_GetCollections_Call) Return(_a0 []*querypb.CollectionLoadInfo, _a1 error) *MockStore_GetCollections_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetPartitions provides a mock function with given fields:
func (_m *MockStore) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) {
ret := _m.Called()
var r0 map[int64][]*querypb.PartitionLoadInfo
if rf, ok := ret.Get(0).(func() map[int64][]*querypb.PartitionLoadInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64][]*querypb.PartitionLoadInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockStore_GetPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPartitions'
type MockStore_GetPartitions_Call struct {
*mock.Call
}
// GetPartitions is a helper method to define mock.On call
func (_e *MockStore_Expecter) GetPartitions() *MockStore_GetPartitions_Call {
return &MockStore_GetPartitions_Call{Call: _e.mock.On("GetPartitions")}
}
func (_c *MockStore_GetPartitions_Call) Run(run func()) *MockStore_GetPartitions_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockStore_GetPartitions_Call) Return(_a0 map[int64][]*querypb.PartitionLoadInfo, _a1 error) *MockStore_GetPartitions_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetReplicas provides a mock function with given fields:
func (_m *MockStore) GetReplicas() ([]*querypb.Replica, error) {
ret := _m.Called()
var r0 []*querypb.Replica
if rf, ok := ret.Get(0).(func() []*querypb.Replica); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*querypb.Replica)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockStore_GetReplicas_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetReplicas'
type MockStore_GetReplicas_Call struct {
*mock.Call
}
// GetReplicas is a helper method to define mock.On call
func (_e *MockStore_Expecter) GetReplicas() *MockStore_GetReplicas_Call {
return &MockStore_GetReplicas_Call{Call: _e.mock.On("GetReplicas")}
}
func (_c *MockStore_GetReplicas_Call) Run(run func()) *MockStore_GetReplicas_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockStore_GetReplicas_Call) Return(_a0 []*querypb.Replica, _a1 error) *MockStore_GetReplicas_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetResourceGroups provides a mock function with given fields:
func (_m *MockStore) GetResourceGroups() ([]*querypb.ResourceGroup, error) {
ret := _m.Called()
var r0 []*querypb.ResourceGroup
if rf, ok := ret.Get(0).(func() []*querypb.ResourceGroup); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*querypb.ResourceGroup)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockStore_GetResourceGroups_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetResourceGroups'
type MockStore_GetResourceGroups_Call struct {
*mock.Call
}
// GetResourceGroups is a helper method to define mock.On call
func (_e *MockStore_Expecter) GetResourceGroups() *MockStore_GetResourceGroups_Call {
return &MockStore_GetResourceGroups_Call{Call: _e.mock.On("GetResourceGroups")}
}
func (_c *MockStore_GetResourceGroups_Call) Run(run func()) *MockStore_GetResourceGroups_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockStore_GetResourceGroups_Call) Return(_a0 []*querypb.ResourceGroup, _a1 error) *MockStore_GetResourceGroups_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// ReleaseCollection provides a mock function with given fields: collection
func (_m *MockStore) ReleaseCollection(collection int64) error {
ret := _m.Called(collection)
var r0 error
if rf, ok := ret.Get(0).(func(int64) error); ok {
r0 = rf(collection)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_ReleaseCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleaseCollection'
type MockStore_ReleaseCollection_Call struct {
*mock.Call
}
// ReleaseCollection is a helper method to define mock.On call
// - collection int64
func (_e *MockStore_Expecter) ReleaseCollection(collection interface{}) *MockStore_ReleaseCollection_Call {
return &MockStore_ReleaseCollection_Call{Call: _e.mock.On("ReleaseCollection", collection)}
}
func (_c *MockStore_ReleaseCollection_Call) Run(run func(collection int64)) *MockStore_ReleaseCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockStore_ReleaseCollection_Call) Return(_a0 error) *MockStore_ReleaseCollection_Call {
_c.Call.Return(_a0)
return _c
}
// ReleasePartition provides a mock function with given fields: collection, partitions
func (_m *MockStore) ReleasePartition(collection int64, partitions ...int64) error {
_va := make([]interface{}, len(partitions))
for _i := range partitions {
_va[_i] = partitions[_i]
}
var _ca []interface{}
_ca = append(_ca, collection)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(int64, ...int64) error); ok {
r0 = rf(collection, partitions...)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_ReleasePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleasePartition'
type MockStore_ReleasePartition_Call struct {
*mock.Call
}
// ReleasePartition is a helper method to define mock.On call
// - collection int64
// - partitions ...int64
func (_e *MockStore_Expecter) ReleasePartition(collection interface{}, partitions ...interface{}) *MockStore_ReleasePartition_Call {
return &MockStore_ReleasePartition_Call{Call: _e.mock.On("ReleasePartition",
append([]interface{}{collection}, partitions...)...)}
}
func (_c *MockStore_ReleasePartition_Call) Run(run func(collection int64, partitions ...int64)) *MockStore_ReleasePartition_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-1)
for i, a := range args[1:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(int64), variadicArgs...)
})
return _c
}
func (_c *MockStore_ReleasePartition_Call) Return(_a0 error) *MockStore_ReleasePartition_Call {
_c.Call.Return(_a0)
return _c
}
// ReleaseReplica provides a mock function with given fields: collection, replica
func (_m *MockStore) ReleaseReplica(collection int64, replica int64) error {
ret := _m.Called(collection, replica)
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64) error); ok {
r0 = rf(collection, replica)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_ReleaseReplica_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleaseReplica'
type MockStore_ReleaseReplica_Call struct {
*mock.Call
}
// ReleaseReplica is a helper method to define mock.On call
// - collection int64
// - replica int64
func (_e *MockStore_Expecter) ReleaseReplica(collection interface{}, replica interface{}) *MockStore_ReleaseReplica_Call {
return &MockStore_ReleaseReplica_Call{Call: _e.mock.On("ReleaseReplica", collection, replica)}
}
func (_c *MockStore_ReleaseReplica_Call) Run(run func(collection int64, replica int64)) *MockStore_ReleaseReplica_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
})
return _c
}
func (_c *MockStore_ReleaseReplica_Call) Return(_a0 error) *MockStore_ReleaseReplica_Call {
_c.Call.Return(_a0)
return _c
}
// ReleaseReplicas provides a mock function with given fields: collectionID
func (_m *MockStore) ReleaseReplicas(collectionID int64) error {
ret := _m.Called(collectionID)
var r0 error
if rf, ok := ret.Get(0).(func(int64) error); ok {
r0 = rf(collectionID)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_ReleaseReplicas_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleaseReplicas'
type MockStore_ReleaseReplicas_Call struct {
*mock.Call
}
// ReleaseReplicas is a helper method to define mock.On call
// - collectionID int64
func (_e *MockStore_Expecter) ReleaseReplicas(collectionID interface{}) *MockStore_ReleaseReplicas_Call {
return &MockStore_ReleaseReplicas_Call{Call: _e.mock.On("ReleaseReplicas", collectionID)}
}
func (_c *MockStore_ReleaseReplicas_Call) Run(run func(collectionID int64)) *MockStore_ReleaseReplicas_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockStore_ReleaseReplicas_Call) Return(_a0 error) *MockStore_ReleaseReplicas_Call {
_c.Call.Return(_a0)
return _c
}
// RemoveResourceGroup provides a mock function with given fields: rgName
func (_m *MockStore) RemoveResourceGroup(rgName string) error {
ret := _m.Called(rgName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(rgName)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_RemoveResourceGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveResourceGroup'
type MockStore_RemoveResourceGroup_Call struct {
*mock.Call
}
// RemoveResourceGroup is a helper method to define mock.On call
// - rgName string
func (_e *MockStore_Expecter) RemoveResourceGroup(rgName interface{}) *MockStore_RemoveResourceGroup_Call {
return &MockStore_RemoveResourceGroup_Call{Call: _e.mock.On("RemoveResourceGroup", rgName)}
}
func (_c *MockStore_RemoveResourceGroup_Call) Run(run func(rgName string)) *MockStore_RemoveResourceGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockStore_RemoveResourceGroup_Call) Return(_a0 error) *MockStore_RemoveResourceGroup_Call {
_c.Call.Return(_a0)
return _c
}
// SaveCollection provides a mock function with given fields: collection, partitions
func (_m *MockStore) SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error {
_va := make([]interface{}, len(partitions))
for _i := range partitions {
_va[_i] = partitions[_i]
}
var _ca []interface{}
_ca = append(_ca, collection)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(*querypb.CollectionLoadInfo, ...*querypb.PartitionLoadInfo) error); ok {
r0 = rf(collection, partitions...)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_SaveCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCollection'
type MockStore_SaveCollection_Call struct {
*mock.Call
}
// SaveCollection is a helper method to define mock.On call
// - collection *querypb.CollectionLoadInfo
// - partitions ...*querypb.PartitionLoadInfo
func (_e *MockStore_Expecter) SaveCollection(collection interface{}, partitions ...interface{}) *MockStore_SaveCollection_Call {
return &MockStore_SaveCollection_Call{Call: _e.mock.On("SaveCollection",
append([]interface{}{collection}, partitions...)...)}
}
func (_c *MockStore_SaveCollection_Call) Run(run func(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo)) *MockStore_SaveCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]*querypb.PartitionLoadInfo, len(args)-1)
for i, a := range args[1:] {
if a != nil {
variadicArgs[i] = a.(*querypb.PartitionLoadInfo)
}
}
run(args[0].(*querypb.CollectionLoadInfo), variadicArgs...)
})
return _c
}
func (_c *MockStore_SaveCollection_Call) Return(_a0 error) *MockStore_SaveCollection_Call {
_c.Call.Return(_a0)
return _c
}
// SavePartition provides a mock function with given fields: info
func (_m *MockStore) SavePartition(info ...*querypb.PartitionLoadInfo) error {
_va := make([]interface{}, len(info))
for _i := range info {
_va[_i] = info[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(...*querypb.PartitionLoadInfo) error); ok {
r0 = rf(info...)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_SavePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SavePartition'
type MockStore_SavePartition_Call struct {
*mock.Call
}
// SavePartition is a helper method to define mock.On call
// - info ...*querypb.PartitionLoadInfo
func (_e *MockStore_Expecter) SavePartition(info ...interface{}) *MockStore_SavePartition_Call {
return &MockStore_SavePartition_Call{Call: _e.mock.On("SavePartition",
append([]interface{}{}, info...)...)}
}
func (_c *MockStore_SavePartition_Call) Run(run func(info ...*querypb.PartitionLoadInfo)) *MockStore_SavePartition_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]*querypb.PartitionLoadInfo, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(*querypb.PartitionLoadInfo)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockStore_SavePartition_Call) Return(_a0 error) *MockStore_SavePartition_Call {
_c.Call.Return(_a0)
return _c
}
// SaveReplica provides a mock function with given fields: replica
func (_m *MockStore) SaveReplica(replica *querypb.Replica) error {
ret := _m.Called(replica)
var r0 error
if rf, ok := ret.Get(0).(func(*querypb.Replica) error); ok {
r0 = rf(replica)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_SaveReplica_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveReplica'
type MockStore_SaveReplica_Call struct {
*mock.Call
}
// SaveReplica is a helper method to define mock.On call
// - replica *querypb.Replica
func (_e *MockStore_Expecter) SaveReplica(replica interface{}) *MockStore_SaveReplica_Call {
return &MockStore_SaveReplica_Call{Call: _e.mock.On("SaveReplica", replica)}
}
func (_c *MockStore_SaveReplica_Call) Run(run func(replica *querypb.Replica)) *MockStore_SaveReplica_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*querypb.Replica))
})
return _c
}
func (_c *MockStore_SaveReplica_Call) Return(_a0 error) *MockStore_SaveReplica_Call {
_c.Call.Return(_a0)
return _c
}
// SaveResourceGroup provides a mock function with given fields: rgs
func (_m *MockStore) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {
_va := make([]interface{}, len(rgs))
for _i := range rgs {
_va[_i] = rgs[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(...*querypb.ResourceGroup) error); ok {
r0 = rf(rgs...)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStore_SaveResourceGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveResourceGroup'
type MockStore_SaveResourceGroup_Call struct {
*mock.Call
}
// SaveResourceGroup is a helper method to define mock.On call
// - rgs ...*querypb.ResourceGroup
func (_e *MockStore_Expecter) SaveResourceGroup(rgs ...interface{}) *MockStore_SaveResourceGroup_Call {
return &MockStore_SaveResourceGroup_Call{Call: _e.mock.On("SaveResourceGroup",
append([]interface{}{}, rgs...)...)}
}
func (_c *MockStore_SaveResourceGroup_Call) Run(run func(rgs ...*querypb.ResourceGroup)) *MockStore_SaveResourceGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]*querypb.ResourceGroup, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(*querypb.ResourceGroup)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockStore_SaveResourceGroup_Call) Return(_a0 error) *MockStore_SaveResourceGroup_Call {
_c.Call.Return(_a0)
return _c
}
type mockConstructorTestingTNewMockStore interface {
mock.TestingT
Cleanup(func())
}
// NewMockStore creates a new instance of MockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStore(t mockConstructorTestingTNewMockStore) *MockStore {
mock := &MockStore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -100,20 +101,20 @@ type ReplicaManager struct {
idAllocator func() (int64, error)
replicas map[UniqueID]*Replica
store Store
catalog metastore.QueryCoordCatalog
}
func NewReplicaManager(idAllocator func() (int64, error), store Store) *ReplicaManager {
func NewReplicaManager(idAllocator func() (int64, error), catalog metastore.QueryCoordCatalog) *ReplicaManager {
return &ReplicaManager{
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
store: store,
catalog: catalog,
}
}
// Recover recovers the replicas for given collections from meta store
func (m *ReplicaManager) Recover(collections []int64) error {
replicas, err := m.store.GetReplicas()
replicas, err := m.catalog.GetReplicas()
if err != nil {
return fmt.Errorf("failed to recover replicas, err=%w", err)
}
@ -135,7 +136,7 @@ func (m *ReplicaManager) Recover(collections []int64) error {
zap.Int64s("nodes", replica.GetNodes()),
)
} else {
err := m.store.ReleaseReplica(replica.GetCollectionID(), replica.GetID())
err := m.catalog.ReleaseReplica(replica.GetCollectionID(), replica.GetID())
if err != nil {
return err
}
@ -196,7 +197,7 @@ func (m *ReplicaManager) spawn(collectionID UniqueID, rgName string) (*Replica,
func (m *ReplicaManager) put(replicas ...*Replica) error {
for _, replica := range replicas {
err := m.store.SaveReplica(replica.Replica)
err := m.catalog.SaveReplica(replica.Replica)
if err != nil {
return err
}
@ -211,7 +212,7 @@ func (m *ReplicaManager) RemoveCollection(collectionID UniqueID) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
err := m.store.ReleaseReplicas(collectionID)
err := m.catalog.ReleaseReplicas(collectionID)
if err != nil {
return err
}

View File

@ -25,6 +25,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/util/etcd"
)
@ -37,7 +39,7 @@ type ReplicaManagerSuite struct {
replicaNumbers []int32
idAllocator func() (int64, error)
kv kv.MetaKv
store Store
catalog metastore.QueryCoordCatalog
mgr *ReplicaManager
}
@ -62,10 +64,10 @@ func (suite *ReplicaManagerSuite) SetupTest() {
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.store = NewMetaStore(suite.kv)
suite.catalog = querycoord.NewCatalog(suite.kv)
suite.idAllocator = RandomIncrementIDAllocator()
suite.mgr = NewReplicaManager(suite.idAllocator, suite.store)
suite.mgr = NewReplicaManager(suite.idAllocator, suite.catalog)
suite.spawnAndPutAll()
}
@ -130,7 +132,7 @@ func (suite *ReplicaManagerSuite) TestRecover() {
}
value, err := proto.Marshal(&replicaInfo)
suite.NoError(err)
suite.kv.Save(ReplicaMetaPrefixV1+"/2100", string(value))
suite.kv.Save(querycoord.ReplicaMetaPrefixV1+"/2100", string(value))
suite.clearMemory()
mgr.Recover(append(suite.collections, 1000))
@ -214,7 +216,7 @@ func (suite *ReplicaManagerSuite) spawnAndPutAll() {
}
func (suite *ReplicaManagerSuite) TestResourceGroup() {
mgr := NewReplicaManager(suite.idAllocator, suite.store)
mgr := NewReplicaManager(suite.idAllocator, suite.catalog)
replica1, err := mgr.spawn(int64(1000), DefaultResourceGroupName)
replica1.AddNode(1)
suite.NoError(err)

View File

@ -23,6 +23,7 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
@ -111,18 +112,18 @@ func (rg *ResourceGroup) GetCapacity() int {
type ResourceManager struct {
groups map[string]*ResourceGroup
store Store
catalog metastore.QueryCoordCatalog
nodeMgr *session.NodeManager
rwmutex sync.RWMutex
}
func NewResourceManager(store Store, nodeMgr *session.NodeManager) *ResourceManager {
func NewResourceManager(catalog metastore.QueryCoordCatalog, nodeMgr *session.NodeManager) *ResourceManager {
groupMap := make(map[string]*ResourceGroup)
groupMap[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupCapacity)
return &ResourceManager{
groups: groupMap,
store: store,
catalog: catalog,
nodeMgr: nodeMgr,
}
}
@ -142,7 +143,7 @@ func (rm *ResourceManager) AddResourceGroup(rgName string) error {
return ErrRGLimit
}
err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{
err := rm.catalog.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: 0,
})
@ -177,7 +178,7 @@ func (rm *ResourceManager) RemoveResourceGroup(rgName string) error {
return ErrDeleteNonEmptyRG
}
err := rm.store.RemoveResourceGroup(rgName)
err := rm.catalog.RemoveResourceGroup(rgName)
if err != nil {
log.Info("failed to remove resource group",
zap.String("rgName", rgName),
@ -224,7 +225,7 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error {
// default rg capacity won't be changed
deltaCapacity = 0
}
err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{
err := rm.catalog.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: int32(rm.groups[rgName].GetCapacity() + deltaCapacity),
Nodes: newNodes,
@ -291,7 +292,7 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
deltaCapacity = 0
}
err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{
err := rm.catalog.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: int32(rm.groups[rgName].GetCapacity() + deltaCapacity),
Nodes: newNodes,
@ -452,7 +453,7 @@ func (rm *ResourceManager) HandleNodeUp(node int64) (string, error) {
// assign new node to default rg
newNodes := rm.groups[DefaultResourceGroupName].GetNodes()
newNodes = append(newNodes, node)
err = rm.store.SaveResourceGroup(&querypb.ResourceGroup{
err = rm.catalog.SaveResourceGroup(&querypb.ResourceGroup{
Name: DefaultResourceGroupName,
Capacity: int32(rm.groups[DefaultResourceGroupName].GetCapacity()),
Nodes: newNodes,
@ -489,7 +490,7 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) {
newNodes = append(newNodes, nid)
}
}
err = rm.store.SaveResourceGroup(&querypb.ResourceGroup{
err = rm.catalog.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: int32(rm.groups[rgName].GetCapacity()),
Nodes: newNodes,
@ -607,7 +608,7 @@ func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode i
Nodes: toNodeList,
}
return movedNodes, rm.store.SaveResourceGroup(fromRG, toRG)
return movedNodes, rm.catalog.SaveResourceGroup(fromRG, toRG)
}
// auto recover rg, return recover used node num
@ -655,7 +656,7 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) ([]int64, err
func (rm *ResourceManager) Recover() error {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
rgs, err := rm.store.GetResourceGroups()
rgs, err := rm.catalog.GetResourceGroups()
if err != nil {
return ErrRecoverResourceGroupToStore
}

View File

@ -24,6 +24,8 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -56,7 +58,7 @@ func (suite *ResourceManagerSuite) SetupTest() {
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
store := NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
suite.manager = NewResourceManager(store, session.NewNodeManager())
}
@ -380,7 +382,7 @@ func (suite *ResourceManagerSuite) TestDefaultResourceGroup() {
}
func (suite *ResourceManagerSuite) TestStoreFailed() {
store := NewMockStore(suite.T())
store := mocks.NewQueryCoordCatalog(suite.T())
nodeMgr := session.NewNodeManager()
manager := NewResourceManager(store, nodeMgr)

View File

@ -1,281 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package meta
import (
"fmt"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
var (
ErrInvalidKey = errors.New("invalid load info key")
)
const (
CollectionLoadInfoPrefix = "querycoord-collection-loadinfo"
PartitionLoadInfoPrefix = "querycoord-partition-loadinfo"
ReplicaPrefix = "querycoord-replica"
CollectionMetaPrefixV1 = "queryCoord-collectionMeta"
ReplicaMetaPrefixV1 = "queryCoord-ReplicaMeta"
ResourceGroupPrefix = "queryCoord-ResourceGroup"
)
type WatchStoreChan = clientv3.WatchChan
// Store is used to save and get from object storage.
type Store interface {
metastore.QueryCoordCatalog
}
type metaStore struct {
cli kv.MetaKv
}
func NewMetaStore(cli kv.MetaKv) metaStore {
return metaStore{
cli: cli,
}
}
func (s metaStore) SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error {
k := encodeCollectionLoadInfoKey(collection.GetCollectionID())
v, err := proto.Marshal(collection)
if err != nil {
return err
}
kvs := make(map[string]string)
for _, partition := range partitions {
key := encodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID())
value, err := proto.Marshal(partition)
if err != nil {
return err
}
kvs[key] = string(value)
}
kvs[k] = string(v)
return s.cli.MultiSave(kvs)
}
func (s metaStore) SavePartition(info ...*querypb.PartitionLoadInfo) error {
kvs := make(map[string]string)
for _, partition := range info {
key := encodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID())
value, err := proto.Marshal(partition)
if err != nil {
return err
}
kvs[key] = string(value)
}
return s.cli.MultiSave(kvs)
}
func (s metaStore) SaveReplica(replica *querypb.Replica) error {
key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID())
value, err := proto.Marshal(replica)
if err != nil {
return err
}
return s.cli.Save(key, string(value))
}
func (s metaStore) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {
ret := make(map[string]string)
for _, rg := range rgs {
key := encodeResourceGroupKey(rg.GetName())
value, err := proto.Marshal(rg)
if err != nil {
return err
}
ret[key] = string(value)
}
return s.cli.MultiSave(ret)
}
func (s metaStore) RemoveResourceGroup(rgName string) error {
key := encodeResourceGroupKey(rgName)
return s.cli.Remove(key)
}
func (s metaStore) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(CollectionLoadInfoPrefix)
if err != nil {
return nil, err
}
ret := make([]*querypb.CollectionLoadInfo, 0, len(values))
for _, v := range values {
info := querypb.CollectionLoadInfo{}
if err := proto.Unmarshal([]byte(v), &info); err != nil {
return nil, err
}
ret = append(ret, &info)
}
return ret, nil
}
func (s metaStore) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(PartitionLoadInfoPrefix)
if err != nil {
return nil, err
}
ret := make(map[int64][]*querypb.PartitionLoadInfo)
for _, v := range values {
info := querypb.PartitionLoadInfo{}
if err := proto.Unmarshal([]byte(v), &info); err != nil {
return nil, err
}
ret[info.GetCollectionID()] = append(ret[info.GetCollectionID()], &info)
}
return ret, nil
}
func (s metaStore) GetReplicas() ([]*querypb.Replica, error) {
_, values, err := s.cli.LoadWithPrefix(ReplicaPrefix)
if err != nil {
return nil, err
}
ret := make([]*querypb.Replica, 0, len(values))
for _, v := range values {
info := querypb.Replica{}
if err := proto.Unmarshal([]byte(v), &info); err != nil {
return nil, err
}
ret = append(ret, &info)
}
replicasV1, err := s.getReplicasFromV1()
if err != nil {
return nil, err
}
ret = append(ret, replicasV1...)
return ret, nil
}
func (s metaStore) getReplicasFromV1() ([]*querypb.Replica, error) {
_, replicaValues, err := s.cli.LoadWithPrefix(ReplicaMetaPrefixV1)
if err != nil {
return nil, err
}
ret := make([]*querypb.Replica, 0, len(replicaValues))
for _, value := range replicaValues {
replicaInfo := milvuspb.ReplicaInfo{}
err = proto.Unmarshal([]byte(value), &replicaInfo)
if err != nil {
return nil, err
}
ret = append(ret, &querypb.Replica{
ID: replicaInfo.GetReplicaID(),
CollectionID: replicaInfo.GetCollectionID(),
Nodes: replicaInfo.GetNodeIds(),
})
}
return ret, nil
}
func (s metaStore) GetResourceGroups() ([]*querypb.ResourceGroup, error) {
_, rgs, err := s.cli.LoadWithPrefix(ResourceGroupPrefix)
if err != nil {
return nil, err
}
ret := make([]*querypb.ResourceGroup, 0, len(rgs))
for _, value := range rgs {
rg := &querypb.ResourceGroup{}
err := proto.Unmarshal([]byte(value), rg)
if err != nil {
return nil, err
}
ret = append(ret, rg)
}
return ret, nil
}
func (s metaStore) ReleaseCollection(collection int64) error {
// obtain partitions of this collection
_, values, err := s.cli.LoadWithPrefix(fmt.Sprintf("%s/%d", PartitionLoadInfoPrefix, collection))
if err != nil {
return err
}
partitions := make([]*querypb.PartitionLoadInfo, 0)
for _, v := range values {
info := querypb.PartitionLoadInfo{}
if err = proto.Unmarshal([]byte(v), &info); err != nil {
return err
}
partitions = append(partitions, &info)
}
// remove collection and obtained partitions
keys := lo.Map(partitions, func(partition *querypb.PartitionLoadInfo, _ int) string {
return encodePartitionLoadInfoKey(collection, partition.GetPartitionID())
})
k := encodeCollectionLoadInfoKey(collection)
keys = append(keys, k)
return s.cli.MultiRemove(keys)
}
func (s metaStore) ReleasePartition(collection int64, partitions ...int64) error {
keys := lo.Map(partitions, func(partition int64, _ int) string {
return encodePartitionLoadInfoKey(collection, partition)
})
return s.cli.MultiRemove(keys)
}
func (s metaStore) ReleaseReplicas(collectionID int64) error {
key := encodeCollectionReplicaKey(collectionID)
return s.cli.RemoveWithPrefix(key)
}
func (s metaStore) ReleaseReplica(collection, replica int64) error {
key := encodeReplicaKey(collection, replica)
return s.cli.Remove(key)
}
func encodeCollectionLoadInfoKey(collection int64) string {
return fmt.Sprintf("%s/%d", CollectionLoadInfoPrefix, collection)
}
func encodePartitionLoadInfoKey(collection, partition int64) string {
return fmt.Sprintf("%s/%d/%d", PartitionLoadInfoPrefix, collection, partition)
}
func encodeReplicaKey(collection, replica int64) string {
return fmt.Sprintf("%s/%d/%d", ReplicaPrefix, collection, replica)
}
func encodeCollectionReplicaKey(collection int64) string {
return fmt.Sprintf("%s/%d", ReplicaPrefix, collection)
}
func encodeResourceGroupKey(rgName string) string {
return fmt.Sprintf("%s/%s", ResourceGroupPrefix, rgName)
}

View File

@ -1,206 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package meta
import (
"sort"
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/util/etcd"
)
type StoreTestSuite struct {
suite.Suite
kv kv.MetaKv
store metaStore
}
func (suite *StoreTestSuite) SetupSuite() {
Params.Init()
}
func (suite *StoreTestSuite) SetupTest() {
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.store = NewMetaStore(suite.kv)
}
func (suite *StoreTestSuite) TearDownTest() {
if suite.kv != nil {
suite.kv.Close()
}
}
func (suite *StoreTestSuite) TestCollection() {
suite.store.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 1,
})
suite.store.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 2,
})
suite.store.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 3,
})
suite.store.ReleaseCollection(1)
suite.store.ReleaseCollection(2)
collections, err := suite.store.GetCollections()
suite.NoError(err)
suite.Len(collections, 1)
}
func (suite *StoreTestSuite) TestCollectionWithPartition() {
suite.store.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 1,
})
suite.store.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 2,
}, &querypb.PartitionLoadInfo{
CollectionID: 2,
PartitionID: 102,
})
suite.store.SaveCollection(&querypb.CollectionLoadInfo{
CollectionID: 3,
}, &querypb.PartitionLoadInfo{
CollectionID: 3,
PartitionID: 103,
})
suite.store.ReleaseCollection(1)
suite.store.ReleaseCollection(2)
collections, err := suite.store.GetCollections()
suite.NoError(err)
suite.Len(collections, 1)
suite.Equal(int64(3), collections[0].GetCollectionID())
partitions, err := suite.store.GetPartitions()
suite.NoError(err)
suite.Len(partitions, 1)
suite.Len(partitions[int64(3)], 1)
suite.Equal(int64(103), partitions[int64(3)][0].GetPartitionID())
}
func (suite *StoreTestSuite) TestPartition() {
suite.store.SavePartition(&querypb.PartitionLoadInfo{
PartitionID: 1,
})
suite.store.SavePartition(&querypb.PartitionLoadInfo{
PartitionID: 2,
})
suite.store.SavePartition(&querypb.PartitionLoadInfo{
PartitionID: 3,
})
suite.store.ReleasePartition(1)
suite.store.ReleasePartition(2)
partitions, err := suite.store.GetPartitions()
suite.NoError(err)
suite.Len(partitions, 1)
}
func (suite *StoreTestSuite) TestReplica() {
suite.store.SaveReplica(&querypb.Replica{
CollectionID: 1,
ID: 1,
})
suite.store.SaveReplica(&querypb.Replica{
CollectionID: 1,
ID: 2,
})
suite.store.SaveReplica(&querypb.Replica{
CollectionID: 1,
ID: 3,
})
suite.store.ReleaseReplica(1, 1)
suite.store.ReleaseReplica(1, 2)
replicas, err := suite.store.GetReplicas()
suite.NoError(err)
suite.Len(replicas, 1)
}
func (suite *StoreTestSuite) TestResourceGroup() {
suite.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: "rg1",
Capacity: 3,
Nodes: []int64{1, 2, 3},
})
suite.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: "rg2",
Capacity: 3,
Nodes: []int64{4, 5},
})
suite.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: "rg3",
Capacity: 0,
Nodes: []int64{},
})
suite.store.RemoveResourceGroup("rg3")
groups, err := suite.store.GetResourceGroups()
suite.NoError(err)
suite.Len(groups, 2)
sort.Slice(groups, func(i, j int) bool {
return groups[i].GetName() < groups[j].GetName()
})
suite.Equal("rg1", groups[0].GetName())
suite.Equal(int32(3), groups[0].GetCapacity())
suite.Equal([]int64{1, 2, 3}, groups[0].GetNodes())
suite.Equal("rg2", groups[1].GetName())
suite.Equal(int32(3), groups[1].GetCapacity())
suite.Equal([]int64{4, 5}, groups[1].GetNodes())
}
func (suite *StoreTestSuite) TestLoadRelease() {
// TODO(sunby): add ut
}
func TestStoreSuite(t *testing.T) {
suite.Run(t, new(StoreTestSuite))
}

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -105,7 +106,7 @@ func (suite *TargetManagerSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.meta = NewMeta(idAllocator, store, session.NewNodeManager())
suite.broker = NewMockBroker(suite.T())

View File

@ -29,6 +29,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
@ -56,7 +58,7 @@ type CollectionObserverSuite struct {
idAllocator func() (int64, error)
etcd *clientv3.Client
kv kv.MetaKv
store meta.Store
store metastore.QueryCoordCatalog
broker *meta.MockBroker
// Dependencies
@ -180,7 +182,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(client, Params.EtcdCfg.MetaRootPath.GetValue()+"-"+RandomMetaRootPath())
suite.Require().NoError(err)
log.Debug("create meta store...")
suite.store = meta.NewMetaStore(suite.kv)
suite.store = querycoord.NewCatalog(suite.kv)
// Dependencies
suite.dist = meta.NewDistributionManager()

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -67,7 +68,7 @@ func (suite *LeaderObserverTestSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager())
suite.broker = meta.NewMockBroker(suite.T())

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -69,7 +70,7 @@ func (suite *ReplicaObserverSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -41,7 +42,7 @@ type ResourceObserverSuite struct {
kv kv.MetaKv
//dependency
store *meta.MockStore
store *mocks.QueryCoordCatalog
meta *meta.Meta
observer *ResourceObserver
nodeMgr *session.NodeManager
@ -70,7 +71,7 @@ func (suite *ResourceObserverSuite) SetupTest() {
suite.kv = etcdKV.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
suite.store = meta.NewMockStore(suite.T())
suite.store = mocks.NewQueryCoordCatalog(suite.T())
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, suite.store, suite.nodeMgr)

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -74,7 +75,7 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := meta.NewMetaStore(suite.kv)
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager())

View File

@ -35,6 +35,8 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/dist"
@ -79,7 +81,7 @@ type Server struct {
rootCoord types.RootCoord
// Meta
store meta.Store
store metastore.QueryCoordCatalog
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
@ -297,7 +299,7 @@ func (s *Server) initMeta() error {
record := timerecord.NewTimeRecorder("querycoord")
log.Info("init meta")
s.store = meta.NewMetaStore(s.kv)
s.store = querycoord.NewCatalog(s.kv)
s.meta = meta.NewMeta(s.idAllocator, s.store, s.nodeMgr)
s.broker = meta.NewCoordinatorBroker(

View File

@ -31,6 +31,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -67,7 +69,7 @@ type ServiceSuite struct {
// Dependencies
kv kv.MetaKv
store meta.Store
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
@ -133,7 +135,7 @@ func (suite *ServiceSuite) SetupTest() {
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.store = meta.NewMetaStore(suite.kv)
suite.store = querycoord.NewCatalog(suite.kv)
suite.dist = meta.NewDistributionManager()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)

View File

@ -31,6 +31,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -67,7 +69,7 @@ type TaskSuite struct {
// Dependencies
kv kv.MetaKv
store meta.Store
store metastore.QueryCoordCatalog
meta *meta.Meta
dist *meta.DistributionManager
target *meta.TargetManager
@ -134,7 +136,7 @@ func (suite *TaskSuite) SetupTest() {
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.store = meta.NewMetaStore(suite.kv)
suite.store = querycoord.NewCatalog(suite.kv)
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store, session.NewNodeManager())
suite.dist = meta.NewDistributionManager()
suite.broker = meta.NewMockBroker(suite.T())

View File

@ -24,6 +24,8 @@ import (
"github.com/stretchr/testify/mock"
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -45,7 +47,7 @@ func TestSpawnReplicasWithRG(t *testing.T) {
config.EtcdTLSMinVersion.GetValue())
kv := etcdKV.NewEtcdKV(cli, config.MetaRootPath.GetValue())
store := meta.NewMetaStore(kv)
store := querycoord.NewCatalog(kv)
nodeMgr := session.NewNodeManager()
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
m.ResourceManager.AddResourceGroup("rg1")
@ -118,7 +120,7 @@ func TestSpawnReplicasWithRG(t *testing.T) {
func TestAddNodesToCollectionsInRGFailed(t *testing.T) {
Params.Init()
store := meta.NewMockStore(t)
store := mocks.NewQueryCoordCatalog(t)
store.EXPECT().SaveCollection(mock.Anything).Return(nil)
store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(4)
store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
@ -180,7 +182,7 @@ func TestAddNodesToCollectionsInRGFailed(t *testing.T) {
func TestAddNodesToCollectionsInRG(t *testing.T) {
Params.Init()
store := meta.NewMockStore(t)
store := mocks.NewQueryCoordCatalog(t)
store.EXPECT().SaveCollection(mock.Anything).Return(nil)
store.EXPECT().SaveReplica(mock.Anything).Return(nil)
store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)