fix: add the db information in the dml message (#37969)

- issue: #37966

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-11-27 10:02:35 +08:00 committed by GitHub
parent 24a0b05745
commit 971b4f17ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 21 additions and 22 deletions

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
@ -350,6 +351,7 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo
if err != nil { if err != nil {
return err return err
} }
expr.Register("cache", globalMetaCache)
// The privilege info is a little more. And to get this info, the query operation of involving multiple table queries is required. // The privilege info is a little more. And to get this info, the query operation of involving multiple table queries is required.
resp, err := rootCoord.ListPolicy(ctx, &internalpb.ListPolicyRequest{}) resp, err := rootCoord.ListPolicy(ctx, &internalpb.ListPolicyRequest{})

View File

@ -58,6 +58,7 @@ func genInsertMsgsByPartition(ctx context.Context,
), ),
CollectionID: insertMsg.CollectionID, CollectionID: insertMsg.CollectionID,
PartitionID: partitionID, PartitionID: partitionID,
DbName: insertMsg.DbName,
CollectionName: insertMsg.CollectionName, CollectionName: insertMsg.CollectionName,
PartitionName: partitionName, PartitionName: partitionName,
SegmentID: segmentID, SegmentID: segmentID,

View File

@ -56,6 +56,7 @@ type deleteTask struct {
primaryKeys *schemapb.IDs primaryKeys *schemapb.IDs
collectionID UniqueID collectionID UniqueID
partitionID UniqueID partitionID UniqueID
dbID UniqueID
partitionKeyMode bool partitionKeyMode bool
// set by scheduler // set by scheduler
@ -148,14 +149,11 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
result, numRows, err := repackDeleteMsgByHash( result, numRows, err := repackDeleteMsgByHash(
ctx, ctx,
dt.primaryKeys, dt.primaryKeys, dt.vChannels,
dt.vChannels, dt.idAllocator, dt.ts,
dt.idAllocator, dt.collectionID, dt.req.GetCollectionName(),
dt.ts, dt.partitionID, dt.req.GetPartitionName(),
dt.collectionID, dt.req.GetDbName(),
dt.req.GetCollectionName(),
dt.partitionID,
dt.req.GetPartitionName(),
) )
if err != nil { if err != nil {
return err return err
@ -204,6 +202,7 @@ func repackDeleteMsgByHash(
collectionName string, collectionName string,
partitionID int64, partitionID int64,
partitionName string, partitionName string,
dbName string,
) (map[uint32][]*msgstream.DeleteMsg, int64, error) { ) (map[uint32][]*msgstream.DeleteMsg, int64, error) {
maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt() maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt()
hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels) hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels)
@ -233,6 +232,7 @@ func repackDeleteMsgByHash(
PartitionID: partitionID, PartitionID: partitionID,
CollectionName: collectionName, CollectionName: collectionName,
PartitionName: partitionName, PartitionName: partitionName,
DbName: dbName,
PrimaryKeys: &schemapb.IDs{}, PrimaryKeys: &schemapb.IDs{},
ShardName: vchannel, ShardName: vchannel,
}, },
@ -413,6 +413,7 @@ func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs)
partitionKeyMode: dr.partitionKeyMode, partitionKeyMode: dr.partitionKeyMode,
vChannels: dr.vChannels, vChannels: dr.vChannels,
primaryKeys: primaryKeys, primaryKeys: primaryKeys,
dbID: dr.dbID,
} }
var enqueuedTask task = dt var enqueuedTask task = dt
if streamingutil.IsStreamingServiceEnabled() { if streamingutil.IsStreamingServiceEnabled() {

View File

@ -31,15 +31,12 @@ func (dt *deleteTaskByStreamingService) Execute(ctx context.Context) (err error)
dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID())) dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))
result, numRows, err := repackDeleteMsgByHash( result, numRows, err := repackDeleteMsgByHash(
ctx, ctx, dt.primaryKeys,
dt.primaryKeys, dt.vChannels, dt.idAllocator,
dt.vChannels, dt.ts, dt.collectionID,
dt.idAllocator,
dt.ts,
dt.collectionID,
dt.req.GetCollectionName(), dt.req.GetCollectionName(),
dt.partitionID, dt.partitionID, dt.req.GetPartitionName(),
dt.req.GetPartitionName(), dt.req.GetDbName(),
) )
if err != nil { if err != nil {
return err return err

View File

@ -109,13 +109,11 @@ func (it *upsertTaskByStreamingService) packDeleteMessage(ctx context.Context) (
result, numRows, err := repackDeleteMsgByHash( result, numRows, err := repackDeleteMsgByHash(
ctx, ctx,
it.upsertMsg.DeleteMsg.PrimaryKeys, it.upsertMsg.DeleteMsg.PrimaryKeys,
vChannels, vChannels, it.idAllocator,
it.idAllocator,
it.BeginTs(), it.BeginTs(),
it.upsertMsg.DeleteMsg.CollectionID, it.upsertMsg.DeleteMsg.CollectionID, it.upsertMsg.DeleteMsg.CollectionName,
it.upsertMsg.DeleteMsg.CollectionName, it.upsertMsg.DeleteMsg.PartitionID, it.upsertMsg.DeleteMsg.PartitionName,
it.upsertMsg.DeleteMsg.PartitionID, it.req.GetDbName(),
it.upsertMsg.DeleteMsg.PartitionName,
) )
if err != nil { if err != nil {
return nil, err return nil, err