From a63cf8625c59a8d62254665cf1cf0b608d740092 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 10 Apr 2024 06:45:18 +0800 Subject: [PATCH] enhance: support to invalid the database meta cache (#32078) issue: #32077 /kind improvement Signed-off-by: SimFG --- internal/proxy/impl.go | 27 +++++---- internal/proxy/proxy_test.go | 12 ++++ internal/rootcoord/drop_db_task.go | 28 ++++++++- internal/rootcoord/drop_db_task_test.go | 81 +++++++++++++++++++------ internal/rootcoord/step.go | 15 +++++ 5 files changed, 130 insertions(+), 33 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 88548ff652..abb3962608 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -115,10 +115,11 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p collectionName := request.CollectionName collectionID := request.CollectionID + msgType := request.GetBase().GetMsgType() var aliasName []string if globalMetaCache != nil { - switch request.GetBase().GetMsgType() { + switch msgType { case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias: if collectionName != "" { globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached @@ -129,16 +130,16 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p } log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", collectionName)) case commonpb.MsgType_DropPartition: - if globalMetaCache != nil { - if collectionName != "" && request.GetPartitionName() != "" { - globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName()) - } else { - log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty", - zap.String("collectionName", collectionName), - zap.String("partitionName", request.GetPartitionName())) - return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil - } + if collectionName != "" && request.GetPartitionName() != "" { + globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName()) + } else { + log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty", + zap.String("collectionName", collectionName), + zap.String("partitionName", request.GetPartitionName())) + return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil } + case commonpb.MsgType_DropDatabase: + globalMetaCache.RemoveDatabase(ctx, request.GetDbName()) default: log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String())) @@ -151,7 +152,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p } } - if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection { + if msgType == commonpb.MsgType_DropCollection { // no need to handle error, since this Proxy may not create dml stream for the collection. node.chMgr.removeDMLStream(request.GetCollectionID()) // clean up collection level metrics @@ -159,6 +160,8 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p for _, alias := range aliasName { metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), alias) } + } else if msgType == commonpb.MsgType_DropDatabase { + metrics.CleanupProxyDBMetrics(paramtable.GetNodeID(), request.GetDbName()) } log.Info("complete to invalidate collection meta cache") @@ -298,8 +301,6 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab strconv.FormatInt(paramtable.GetNodeID(), 10), method, ).Observe(float64(tr.ElapseSpan().Milliseconds())) - - metrics.CleanupProxyDBMetrics(paramtable.GetNodeID(), request.GetDbName()) return dct.result, nil } diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 15fbb726bb..4f3b6bfe20 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -2427,6 +2427,18 @@ func TestProxy(t *testing.T) { _, err = globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.Error(t, err) + + resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropDatabase, + }, + DbName: dbName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + hasDatabase := globalMetaCache.HasDatabase(ctx, dbName) + assert.False(t, hasDatabase) }) wg.Add(1) diff --git a/internal/rootcoord/drop_db_task.go b/internal/rootcoord/drop_db_task.go index 65c3398fd4..15096c4e30 100644 --- a/internal/rootcoord/drop_db_task.go +++ b/internal/rootcoord/drop_db_task.go @@ -18,8 +18,12 @@ package rootcoord import ( "context" + "fmt" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/util/proxyutil" + "github.com/milvus-io/milvus/pkg/util" ) type dropDatabaseTask struct { @@ -28,9 +32,31 @@ type dropDatabaseTask struct { } func (t *dropDatabaseTask) Prepare(ctx context.Context) error { + if t.Req.GetDbName() == util.DefaultDBName { + return fmt.Errorf("can not drop default database") + } return nil } func (t *dropDatabaseTask) Execute(ctx context.Context) error { - return t.core.meta.DropDatabase(ctx, t.Req.GetDbName(), t.GetTs()) + redoTask := newBaseRedoTask(t.core.stepExecutor) + dbName := t.Req.GetDbName() + ts := t.GetTs() + redoTask.AddSyncStep(&deleteDatabaseMetaStep{ + baseStep: baseStep{core: t.core}, + databaseName: dbName, + ts: ts, + }) + redoTask.AddAsyncStep(&expireCacheStep{ + baseStep: baseStep{core: t.core}, + dbName: dbName, + ts: ts, + // make sure to send the "expire cache" request + // because it won't send this request when the length of collection names array is zero + collectionNames: []string{""}, + opts: []proxyutil.ExpireCacheOpt{ + proxyutil.SetMsgType(commonpb.MsgType_DropDatabase), + }, + }) + return redoTask.Execute(ctx) } diff --git a/internal/rootcoord/drop_db_task_test.go b/internal/rootcoord/drop_db_task_test.go index b065afe2b0..e51c5c7342 100644 --- a/internal/rootcoord/drop_db_task_test.go +++ b/internal/rootcoord/drop_db_task_test.go @@ -20,36 +20,79 @@ import ( "context" "testing" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" + "github.com/milvus-io/milvus/pkg/util" ) func Test_DropDBTask(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - meta.On("DropDatabase", - mock.Anything, - mock.Anything, - mock.Anything). - Return(nil) + t.Run("normal", func(t *testing.T) { + meta := mockrootcoord.NewIMetaTable(t) + meta.On("DropDatabase", + mock.Anything, + mock.Anything, + mock.Anything). + Return(nil) - core := newTestCore(withMeta(meta)) - task := &dropDatabaseTask{ - baseTask: newBaseTask(context.TODO(), core), - Req: &milvuspb.DropDatabaseRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropDatabase, + core := newTestCore(withMeta(meta), withValidProxyManager()) + task := &dropDatabaseTask{ + baseTask: newBaseTask(context.TODO(), core), + Req: &milvuspb.DropDatabaseRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropDatabase, + }, + DbName: "db", }, - DbName: "db", - }, - } + } - err := task.Prepare(context.Background()) - assert.NoError(t, err) + err := task.Prepare(context.Background()) + assert.NoError(t, err) - err = task.Execute(context.Background()) - assert.NoError(t, err) + err = task.Execute(context.Background()) + assert.NoError(t, err) + }) + + t.Run("default db", func(t *testing.T) { + meta := mockrootcoord.NewIMetaTable(t) + core := newTestCore(withMeta(meta)) + task := &dropDatabaseTask{ + baseTask: newBaseTask(context.TODO(), core), + Req: &milvuspb.DropDatabaseRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropDatabase, + }, + DbName: util.DefaultDBName, + }, + } + err := task.Prepare(context.Background()) + assert.Error(t, err) + }) + + t.Run("drop db fail", func(t *testing.T) { + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().DropDatabase( + mock.Anything, + mock.Anything, + mock.Anything). + Return(errors.New("mock drop db error")) + + core := newTestCore(withMeta(meta)) + task := &dropDatabaseTask{ + baseTask: newBaseTask(context.TODO(), core), + Req: &milvuspb.DropDatabaseRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropDatabase, + }, + DbName: "db", + }, + } + + err := task.Execute(context.Background()) + assert.Error(t, err) + }) } diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index ca5e10bc05..9f9da39da0 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -87,6 +87,21 @@ func (s *deleteCollectionMetaStep) Weight() stepPriority { return stepPriorityNormal } +type deleteDatabaseMetaStep struct { + baseStep + databaseName string + ts Timestamp +} + +func (s *deleteDatabaseMetaStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := s.core.meta.DropDatabase(ctx, s.databaseName, s.ts) + return nil, err +} + +func (s *deleteDatabaseMetaStep) Desc() string { + return fmt.Sprintf("delete database from meta table, name: %s, ts: %d", s.databaseName, s.ts) +} + type removeDmlChannelsStep struct { baseStep pChannels []string