mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
enhance: support to invalid the database meta cache (#32078)
issue: #32077 /kind improvement Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
c5a9cae44e
commit
a63cf8625c
@ -115,10 +115,11 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
|
||||
|
||||
collectionName := request.CollectionName
|
||||
collectionID := request.CollectionID
|
||||
msgType := request.GetBase().GetMsgType()
|
||||
var aliasName []string
|
||||
|
||||
if globalMetaCache != nil {
|
||||
switch request.GetBase().GetMsgType() {
|
||||
switch msgType {
|
||||
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias:
|
||||
if collectionName != "" {
|
||||
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
|
||||
@ -129,16 +130,16 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
|
||||
}
|
||||
log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", collectionName))
|
||||
case commonpb.MsgType_DropPartition:
|
||||
if globalMetaCache != nil {
|
||||
if collectionName != "" && request.GetPartitionName() != "" {
|
||||
globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName())
|
||||
} else {
|
||||
log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty",
|
||||
zap.String("collectionName", collectionName),
|
||||
zap.String("partitionName", request.GetPartitionName()))
|
||||
return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil
|
||||
}
|
||||
if collectionName != "" && request.GetPartitionName() != "" {
|
||||
globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName())
|
||||
} else {
|
||||
log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty",
|
||||
zap.String("collectionName", collectionName),
|
||||
zap.String("partitionName", request.GetPartitionName()))
|
||||
return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil
|
||||
}
|
||||
case commonpb.MsgType_DropDatabase:
|
||||
globalMetaCache.RemoveDatabase(ctx, request.GetDbName())
|
||||
default:
|
||||
log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String()))
|
||||
|
||||
@ -151,7 +152,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
|
||||
}
|
||||
}
|
||||
|
||||
if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
|
||||
if msgType == commonpb.MsgType_DropCollection {
|
||||
// no need to handle error, since this Proxy may not create dml stream for the collection.
|
||||
node.chMgr.removeDMLStream(request.GetCollectionID())
|
||||
// clean up collection level metrics
|
||||
@ -159,6 +160,8 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
|
||||
for _, alias := range aliasName {
|
||||
metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), alias)
|
||||
}
|
||||
} else if msgType == commonpb.MsgType_DropDatabase {
|
||||
metrics.CleanupProxyDBMetrics(paramtable.GetNodeID(), request.GetDbName())
|
||||
}
|
||||
log.Info("complete to invalidate collection meta cache")
|
||||
|
||||
@ -298,8 +301,6 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
method,
|
||||
).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
metrics.CleanupProxyDBMetrics(paramtable.GetNodeID(), request.GetDbName())
|
||||
return dct.result, nil
|
||||
}
|
||||
|
||||
|
@ -2427,6 +2427,18 @@ func TestProxy(t *testing.T) {
|
||||
|
||||
_, err = globalMetaCache.GetCollectionID(ctx, dbName, collectionName)
|
||||
assert.Error(t, err)
|
||||
|
||||
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DropDatabase,
|
||||
},
|
||||
DbName: dbName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
|
||||
hasDatabase := globalMetaCache.HasDatabase(ctx, dbName)
|
||||
assert.False(t, hasDatabase)
|
||||
})
|
||||
|
||||
wg.Add(1)
|
||||
|
@ -18,8 +18,12 @@ package rootcoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||
"github.com/milvus-io/milvus/pkg/util"
|
||||
)
|
||||
|
||||
type dropDatabaseTask struct {
|
||||
@ -28,9 +32,31 @@ type dropDatabaseTask struct {
|
||||
}
|
||||
|
||||
func (t *dropDatabaseTask) Prepare(ctx context.Context) error {
|
||||
if t.Req.GetDbName() == util.DefaultDBName {
|
||||
return fmt.Errorf("can not drop default database")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *dropDatabaseTask) Execute(ctx context.Context) error {
|
||||
return t.core.meta.DropDatabase(ctx, t.Req.GetDbName(), t.GetTs())
|
||||
redoTask := newBaseRedoTask(t.core.stepExecutor)
|
||||
dbName := t.Req.GetDbName()
|
||||
ts := t.GetTs()
|
||||
redoTask.AddSyncStep(&deleteDatabaseMetaStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
databaseName: dbName,
|
||||
ts: ts,
|
||||
})
|
||||
redoTask.AddAsyncStep(&expireCacheStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
dbName: dbName,
|
||||
ts: ts,
|
||||
// make sure to send the "expire cache" request
|
||||
// because it won't send this request when the length of collection names array is zero
|
||||
collectionNames: []string{""},
|
||||
opts: []proxyutil.ExpireCacheOpt{
|
||||
proxyutil.SetMsgType(commonpb.MsgType_DropDatabase),
|
||||
},
|
||||
})
|
||||
return redoTask.Execute(ctx)
|
||||
}
|
||||
|
@ -20,36 +20,79 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||
"github.com/milvus-io/milvus/pkg/util"
|
||||
)
|
||||
|
||||
func Test_DropDBTask(t *testing.T) {
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
meta.On("DropDatabase",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything).
|
||||
Return(nil)
|
||||
t.Run("normal", func(t *testing.T) {
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
meta.On("DropDatabase",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything).
|
||||
Return(nil)
|
||||
|
||||
core := newTestCore(withMeta(meta))
|
||||
task := &dropDatabaseTask{
|
||||
baseTask: newBaseTask(context.TODO(), core),
|
||||
Req: &milvuspb.DropDatabaseRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DropDatabase,
|
||||
core := newTestCore(withMeta(meta), withValidProxyManager())
|
||||
task := &dropDatabaseTask{
|
||||
baseTask: newBaseTask(context.TODO(), core),
|
||||
Req: &milvuspb.DropDatabaseRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DropDatabase,
|
||||
},
|
||||
DbName: "db",
|
||||
},
|
||||
DbName: "db",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
err := task.Prepare(context.Background())
|
||||
assert.NoError(t, err)
|
||||
err := task.Prepare(context.Background())
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = task.Execute(context.Background())
|
||||
assert.NoError(t, err)
|
||||
err = task.Execute(context.Background())
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("default db", func(t *testing.T) {
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
core := newTestCore(withMeta(meta))
|
||||
task := &dropDatabaseTask{
|
||||
baseTask: newBaseTask(context.TODO(), core),
|
||||
Req: &milvuspb.DropDatabaseRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DropDatabase,
|
||||
},
|
||||
DbName: util.DefaultDBName,
|
||||
},
|
||||
}
|
||||
err := task.Prepare(context.Background())
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("drop db fail", func(t *testing.T) {
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
meta.EXPECT().DropDatabase(
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything).
|
||||
Return(errors.New("mock drop db error"))
|
||||
|
||||
core := newTestCore(withMeta(meta))
|
||||
task := &dropDatabaseTask{
|
||||
baseTask: newBaseTask(context.TODO(), core),
|
||||
Req: &milvuspb.DropDatabaseRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DropDatabase,
|
||||
},
|
||||
DbName: "db",
|
||||
},
|
||||
}
|
||||
|
||||
err := task.Execute(context.Background())
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
@ -87,6 +87,21 @@ func (s *deleteCollectionMetaStep) Weight() stepPriority {
|
||||
return stepPriorityNormal
|
||||
}
|
||||
|
||||
type deleteDatabaseMetaStep struct {
|
||||
baseStep
|
||||
databaseName string
|
||||
ts Timestamp
|
||||
}
|
||||
|
||||
func (s *deleteDatabaseMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
|
||||
err := s.core.meta.DropDatabase(ctx, s.databaseName, s.ts)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (s *deleteDatabaseMetaStep) Desc() string {
|
||||
return fmt.Sprintf("delete database from meta table, name: %s, ts: %d", s.databaseName, s.ts)
|
||||
}
|
||||
|
||||
type removeDmlChannelsStep struct {
|
||||
baseStep
|
||||
pChannels []string
|
||||
|
Loading…
Reference in New Issue
Block a user