From 6585227c9cb79a380d1dc63ef82c989c4e0dcd8f Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 28 May 2024 10:07:43 +0800 Subject: [PATCH] fix: [2.4] not found database name in the datacoord meta object (#33412) - issue: #33410 - pr: #33411 Signed-off-by: SimFG --- internal/datacoord/meta.go | 38 +++++++++++++ internal/datacoord/meta_test.go | 88 +++++++++++++++++++++++++++++ internal/datacoord/mock_test.go | 4 +- internal/datacoord/server.go | 8 +++ internal/datacoord/server_test.go | 15 +++-- internal/datacoord/services_test.go | 2 + 6 files changed, 150 insertions(+), 5 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index c540742f93..b5f2c19490 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -40,6 +41,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -162,6 +164,42 @@ func (m *meta) reloadFromKV() error { return nil } +func (m *meta) reloadCollectionsFromRootcoord(ctx context.Context, broker broker.Broker) error { + resp, err := broker.ListDatabases(ctx) + if err != nil { + return err + } + for _, dbName := range resp.GetDbNames() { + resp, err := broker.ShowCollections(ctx, dbName) + if err != nil { + return err + } + for _, collectionID := range resp.GetCollectionIds() { + resp, err := broker.DescribeCollectionInternal(ctx, collectionID) + if err != nil { + return err + } + partitionIDs, err := broker.ShowPartitionsInternal(ctx, collectionID) + if err != nil { + return err + } + collection := &collectionInfo{ + ID: collectionID, + Schema: resp.GetSchema(), + Partitions: partitionIDs, + StartPositions: resp.GetStartPositions(), + Properties: funcutil.KeyValuePair2Map(resp.GetProperties()), + CreatedAt: resp.GetCreatedTimestamp(), + DatabaseName: resp.GetDbName(), + DatabaseID: resp.GetDbId(), + VChannelNames: resp.GetVirtualChannelNames(), + } + m.AddCollection(collection) + } + } + return nil +} + // AddCollection adds a collection into meta // Note that collection info is just for caching and will not be set into etcd from datacoord func (m *meta) AddCollection(collection *collectionInfo) { diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 5d3bab7aef..dd0471eebc 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -27,7 +27,9 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/kv" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" @@ -1129,3 +1131,89 @@ func Test_meta_GcConfirm(t *testing.T) { assert.False(t, m.GcConfirm(context.TODO(), 100, 10000)) } + +func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) { + t.Run("fail to list database", func(t *testing.T) { + m := &meta{ + collections: make(map[UniqueID]*collectionInfo), + } + mockBroker := broker.NewMockBroker(t) + mockBroker.EXPECT().ListDatabases(mock.Anything).Return(nil, errors.New("list database failed, mocked")) + err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker) + assert.Error(t, err) + }) + + t.Run("fail to show collections", func(t *testing.T) { + m := &meta{ + collections: make(map[UniqueID]*collectionInfo), + } + mockBroker := broker.NewMockBroker(t) + + mockBroker.EXPECT().ListDatabases(mock.Anything).Return(&milvuspb.ListDatabasesResponse{ + DbNames: []string{"db1"}, + }, nil) + mockBroker.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, errors.New("show collections failed, mocked")) + err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker) + assert.Error(t, err) + }) + + t.Run("fail to describe collection", func(t *testing.T) { + m := &meta{ + collections: make(map[UniqueID]*collectionInfo), + } + mockBroker := broker.NewMockBroker(t) + + mockBroker.EXPECT().ListDatabases(mock.Anything).Return(&milvuspb.ListDatabasesResponse{ + DbNames: []string{"db1"}, + }, nil) + mockBroker.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ + CollectionNames: []string{"coll1"}, + CollectionIds: []int64{1000}, + }, nil) + mockBroker.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(nil, errors.New("describe collection failed, mocked")) + err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker) + assert.Error(t, err) + }) + + t.Run("fail to show partitions", func(t *testing.T) { + m := &meta{ + collections: make(map[UniqueID]*collectionInfo), + } + mockBroker := broker.NewMockBroker(t) + + mockBroker.EXPECT().ListDatabases(mock.Anything).Return(&milvuspb.ListDatabasesResponse{ + DbNames: []string{"db1"}, + }, nil) + mockBroker.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ + CollectionNames: []string{"coll1"}, + CollectionIds: []int64{1000}, + }, nil) + mockBroker.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{}, nil) + mockBroker.EXPECT().ShowPartitionsInternal(mock.Anything, mock.Anything).Return(nil, errors.New("show partitions failed, mocked")) + err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker) + assert.Error(t, err) + }) + + t.Run("success", func(t *testing.T) { + m := &meta{ + collections: make(map[UniqueID]*collectionInfo), + } + mockBroker := broker.NewMockBroker(t) + + mockBroker.EXPECT().ListDatabases(mock.Anything).Return(&milvuspb.ListDatabasesResponse{ + DbNames: []string{"db1"}, + }, nil) + mockBroker.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ + CollectionNames: []string{"coll1"}, + CollectionIds: []int64{1000}, + }, nil) + mockBroker.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ + CollectionID: 1000, + }, nil) + mockBroker.EXPECT().ShowPartitionsInternal(mock.Anything, mock.Anything).Return([]int64{2000}, nil) + err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker) + assert.NoError(t, err) + c := m.GetCollection(UniqueID(1000)) + assert.NotNil(t, c) + }) +} diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 70c0c9482c..42cbf851f2 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -457,7 +457,9 @@ func (m *mockRootCoordClient) DropDatabase(ctx context.Context, in *milvuspb.Dro } func (m *mockRootCoordClient) ListDatabases(ctx context.Context, in *milvuspb.ListDatabasesRequest, opts ...grpc.CallOption) (*milvuspb.ListDatabasesResponse, error) { - panic("not implemented") // TODO: Implement + return &milvuspb.ListDatabasesResponse{ + Status: merr.Success(), + }, nil } func (m *mockRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index b595ea1ecd..2125686ff4 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -674,6 +674,14 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error { if err != nil { return err } + + // Load collection information asynchronously + // HINT: please make sure this is the last step in the `reloadEtcdFn` function !!! + go func() { + _ = retry.Do(s.ctx, func() error { + return s.meta.reloadCollectionsFromRootcoord(s.ctx, s.broker) + }, retry.Sleep(time.Second), retry.Attempts(connMetaMaxRetryTime)) + }() return nil } return retry.Do(s.ctx, reloadEtcdFn, retry.Attempts(connMetaMaxRetryTime)) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 349c498617..5ef708b072 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/broker" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -3017,6 +3018,12 @@ var globalTestTikv = tikv.SetupLocalTxn() func WithMeta(meta *meta) Option { return func(svr *Server) { svr.meta = meta + + svr.watchClient = etcdkv.NewEtcdKV(svr.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(), + etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) + metaRootPath := Params.EtcdCfg.MetaRootPath.GetValue() + svr.kv = etcdkv.NewEtcdKV(svr.etcdCli, metaRootPath, + etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } @@ -3049,6 +3056,10 @@ func newTestServer(t *testing.T, opts ...Option) *Server { return newMockRootCoordClient(), nil } + for _, opt := range opts { + opt(svr) + } + err = svr.Init() assert.NoError(t, err) @@ -3071,10 +3082,6 @@ func newTestServer(t *testing.T, opts ...Option) *Server { close(signal) } - for _, opt := range opts { - opt(svr) - } - err = svr.Register() assert.NoError(t, err) <-signal diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 4db96c9a05..ddb813acfa 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -44,7 +44,9 @@ type ServerSuite struct { func WithChannelManager(cm ChannelManager) Option { return func(svr *Server) { + svr.sessionManager = NewSessionManagerImpl(withSessionCreator(svr.dataNodeCreator)) svr.channelManager = cm + svr.cluster = NewClusterImpl(svr.sessionManager, svr.channelManager) } }