mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-04 04:49:08 +08:00
fix: [2.4] not found database name in the datacoord meta object (#33412)
- issue: #33410 - pr: #33411 Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
07b995fea4
commit
6585227c9c
@ -33,6 +33,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/broker"
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
@ -40,6 +41,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
@ -162,6 +164,42 @@ func (m *meta) reloadFromKV() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *meta) reloadCollectionsFromRootcoord(ctx context.Context, broker broker.Broker) error {
|
||||
resp, err := broker.ListDatabases(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, dbName := range resp.GetDbNames() {
|
||||
resp, err := broker.ShowCollections(ctx, dbName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, collectionID := range resp.GetCollectionIds() {
|
||||
resp, err := broker.DescribeCollectionInternal(ctx, collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partitionIDs, err := broker.ShowPartitionsInternal(ctx, collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
collection := &collectionInfo{
|
||||
ID: collectionID,
|
||||
Schema: resp.GetSchema(),
|
||||
Partitions: partitionIDs,
|
||||
StartPositions: resp.GetStartPositions(),
|
||||
Properties: funcutil.KeyValuePair2Map(resp.GetProperties()),
|
||||
CreatedAt: resp.GetCreatedTimestamp(),
|
||||
DatabaseName: resp.GetDbName(),
|
||||
DatabaseID: resp.GetDbId(),
|
||||
VChannelNames: resp.GetVirtualChannelNames(),
|
||||
}
|
||||
m.AddCollection(collection)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddCollection adds a collection into meta
|
||||
// Note that collection info is just for caching and will not be set into etcd from datacoord
|
||||
func (m *meta) AddCollection(collection *collectionInfo) {
|
||||
|
@ -27,7 +27,9 @@ import (
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/broker"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
||||
@ -1129,3 +1131,89 @@ func Test_meta_GcConfirm(t *testing.T) {
|
||||
|
||||
assert.False(t, m.GcConfirm(context.TODO(), 100, 10000))
|
||||
}
|
||||
|
||||
func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
|
||||
t.Run("fail to list database", func(t *testing.T) {
|
||||
m := &meta{
|
||||
collections: make(map[UniqueID]*collectionInfo),
|
||||
}
|
||||
mockBroker := broker.NewMockBroker(t)
|
||||
mockBroker.EXPECT().ListDatabases(mock.Anything).Return(nil, errors.New("list database failed, mocked"))
|
||||
err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("fail to show collections", func(t *testing.T) {
|
||||
m := &meta{
|
||||
collections: make(map[UniqueID]*collectionInfo),
|
||||
}
|
||||
mockBroker := broker.NewMockBroker(t)
|
||||
|
||||
mockBroker.EXPECT().ListDatabases(mock.Anything).Return(&milvuspb.ListDatabasesResponse{
|
||||
DbNames: []string{"db1"},
|
||||
}, nil)
|
||||
mockBroker.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, errors.New("show collections failed, mocked"))
|
||||
err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("fail to describe collection", func(t *testing.T) {
|
||||
m := &meta{
|
||||
collections: make(map[UniqueID]*collectionInfo),
|
||||
}
|
||||
mockBroker := broker.NewMockBroker(t)
|
||||
|
||||
mockBroker.EXPECT().ListDatabases(mock.Anything).Return(&milvuspb.ListDatabasesResponse{
|
||||
DbNames: []string{"db1"},
|
||||
}, nil)
|
||||
mockBroker.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
|
||||
CollectionNames: []string{"coll1"},
|
||||
CollectionIds: []int64{1000},
|
||||
}, nil)
|
||||
mockBroker.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(nil, errors.New("describe collection failed, mocked"))
|
||||
err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("fail to show partitions", func(t *testing.T) {
|
||||
m := &meta{
|
||||
collections: make(map[UniqueID]*collectionInfo),
|
||||
}
|
||||
mockBroker := broker.NewMockBroker(t)
|
||||
|
||||
mockBroker.EXPECT().ListDatabases(mock.Anything).Return(&milvuspb.ListDatabasesResponse{
|
||||
DbNames: []string{"db1"},
|
||||
}, nil)
|
||||
mockBroker.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
|
||||
CollectionNames: []string{"coll1"},
|
||||
CollectionIds: []int64{1000},
|
||||
}, nil)
|
||||
mockBroker.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{}, nil)
|
||||
mockBroker.EXPECT().ShowPartitionsInternal(mock.Anything, mock.Anything).Return(nil, errors.New("show partitions failed, mocked"))
|
||||
err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
m := &meta{
|
||||
collections: make(map[UniqueID]*collectionInfo),
|
||||
}
|
||||
mockBroker := broker.NewMockBroker(t)
|
||||
|
||||
mockBroker.EXPECT().ListDatabases(mock.Anything).Return(&milvuspb.ListDatabasesResponse{
|
||||
DbNames: []string{"db1"},
|
||||
}, nil)
|
||||
mockBroker.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
|
||||
CollectionNames: []string{"coll1"},
|
||||
CollectionIds: []int64{1000},
|
||||
}, nil)
|
||||
mockBroker.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
|
||||
CollectionID: 1000,
|
||||
}, nil)
|
||||
mockBroker.EXPECT().ShowPartitionsInternal(mock.Anything, mock.Anything).Return([]int64{2000}, nil)
|
||||
err := m.reloadCollectionsFromRootcoord(context.TODO(), mockBroker)
|
||||
assert.NoError(t, err)
|
||||
c := m.GetCollection(UniqueID(1000))
|
||||
assert.NotNil(t, c)
|
||||
})
|
||||
}
|
||||
|
@ -457,7 +457,9 @@ func (m *mockRootCoordClient) DropDatabase(ctx context.Context, in *milvuspb.Dro
|
||||
}
|
||||
|
||||
func (m *mockRootCoordClient) ListDatabases(ctx context.Context, in *milvuspb.ListDatabasesRequest, opts ...grpc.CallOption) (*milvuspb.ListDatabasesResponse, error) {
|
||||
panic("not implemented") // TODO: Implement
|
||||
return &milvuspb.ListDatabasesResponse{
|
||||
Status: merr.Success(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mockRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
|
@ -674,6 +674,14 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load collection information asynchronously
|
||||
// HINT: please make sure this is the last step in the `reloadEtcdFn` function !!!
|
||||
go func() {
|
||||
_ = retry.Do(s.ctx, func() error {
|
||||
return s.meta.reloadCollectionsFromRootcoord(s.ctx, s.broker)
|
||||
}, retry.Sleep(time.Second), retry.Attempts(connMetaMaxRetryTime))
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
return retry.Do(s.ctx, reloadEtcdFn, retry.Attempts(connMetaMaxRetryTime))
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/broker"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
@ -3017,6 +3018,12 @@ var globalTestTikv = tikv.SetupLocalTxn()
|
||||
func WithMeta(meta *meta) Option {
|
||||
return func(svr *Server) {
|
||||
svr.meta = meta
|
||||
|
||||
svr.watchClient = etcdkv.NewEtcdKV(svr.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
metaRootPath := Params.EtcdCfg.MetaRootPath.GetValue()
|
||||
svr.kv = etcdkv.NewEtcdKV(svr.etcdCli, metaRootPath,
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -3049,6 +3056,10 @@ func newTestServer(t *testing.T, opts ...Option) *Server {
|
||||
return newMockRootCoordClient(), nil
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(svr)
|
||||
}
|
||||
|
||||
err = svr.Init()
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -3071,10 +3082,6 @@ func newTestServer(t *testing.T, opts ...Option) *Server {
|
||||
close(signal)
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(svr)
|
||||
}
|
||||
|
||||
err = svr.Register()
|
||||
assert.NoError(t, err)
|
||||
<-signal
|
||||
|
@ -44,7 +44,9 @@ type ServerSuite struct {
|
||||
|
||||
func WithChannelManager(cm ChannelManager) Option {
|
||||
return func(svr *Server) {
|
||||
svr.sessionManager = NewSessionManagerImpl(withSessionCreator(svr.dataNodeCreator))
|
||||
svr.channelManager = cm
|
||||
svr.cluster = NewClusterImpl(svr.sessionManager, svr.channelManager)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user