mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Optimize reSentDdMsg logic (#13378)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
parent
276b9a6cf5
commit
829077ad0c
@ -1053,7 +1053,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
var invalidateCache bool
|
||||
invalidateCache := false
|
||||
var ts typeutil.Timestamp
|
||||
var collName string
|
||||
|
||||
@ -1065,14 +1065,14 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
||||
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
|
||||
return err
|
||||
}
|
||||
collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
if _, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0); err != nil {
|
||||
if _, err = c.SendDdCreateCollectionReq(ctx, &ddReq, ddReq.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
log.Debug("collection has been created, skip re-send CreateCollection",
|
||||
zap.String("collection name", collName))
|
||||
}
|
||||
if _, err = c.SendDdCreateCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
invalidateCache = false
|
||||
case DropCollectionDDType:
|
||||
var ddReq = internalpb.DropCollectionRequest{}
|
||||
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
|
||||
@ -1080,14 +1080,15 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
||||
}
|
||||
ts = ddReq.Base.Timestamp
|
||||
collName = ddReq.CollectionName
|
||||
collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
if collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0); err == nil {
|
||||
if err = c.SendDdDropCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
invalidateCache = true
|
||||
} else {
|
||||
log.Debug("collection has been removed, skip re-send DropCollection",
|
||||
zap.String("collection name", collName))
|
||||
}
|
||||
if err = c.SendDdDropCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
invalidateCache = true
|
||||
case CreatePartitionDDType:
|
||||
var ddReq = internalpb.CreatePartitionRequest{}
|
||||
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
|
||||
@ -1099,13 +1100,15 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err == nil {
|
||||
return fmt.Errorf("partition %s already created", ddReq.PartitionName)
|
||||
if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err != nil {
|
||||
if err = c.SendDdCreatePartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
invalidateCache = true
|
||||
} else {
|
||||
log.Debug("partition has been created, skip re-send CreatePartition",
|
||||
zap.String("collection name", collName), zap.String("partition name", ddReq.PartitionName))
|
||||
}
|
||||
if err = c.SendDdCreatePartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
invalidateCache = true
|
||||
case DropPartitionDDType:
|
||||
var ddReq = internalpb.DropPartitionRequest{}
|
||||
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
|
||||
@ -1117,13 +1120,15 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err != nil {
|
||||
return err
|
||||
if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err == nil {
|
||||
if err = c.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
invalidateCache = true
|
||||
} else {
|
||||
log.Debug("partition has been removed, skip re-send DropPartition",
|
||||
zap.String("collection name", collName), zap.String("partition name", ddReq.PartitionName))
|
||||
}
|
||||
if err = c.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
invalidateCache = true
|
||||
default:
|
||||
return fmt.Errorf("invalid DdOperation %s", ddOp.Type)
|
||||
}
|
||||
|
@ -919,7 +919,7 @@ func TestRootCoord(t *testing.T) {
|
||||
assert.Equal(t, collMeta.PartitionIDs[1], ddReq.PartitionID)
|
||||
|
||||
err = core.reSendDdMsg(core.ctx, true)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("has partition", func(t *testing.T) {
|
||||
@ -1254,7 +1254,7 @@ func TestRootCoord(t *testing.T) {
|
||||
assert.Equal(t, dropPartID, ddReq.PartitionID)
|
||||
|
||||
err = core.reSendDdMsg(core.ctx, true)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("remove DQL msgstream", func(t *testing.T) {
|
||||
@ -1343,7 +1343,7 @@ func TestRootCoord(t *testing.T) {
|
||||
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
|
||||
|
||||
err = core.reSendDdMsg(core.ctx, true)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("context_cancel", func(t *testing.T) {
|
||||
|
@ -220,27 +220,26 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
|
||||
Data: ids[pchan],
|
||||
})
|
||||
}
|
||||
err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr)
|
||||
if err != nil {
|
||||
|
||||
// update meta table after send dd operation
|
||||
if err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr); err != nil {
|
||||
t.core.chanTimeTick.removeDmlChannels(chanNames...)
|
||||
t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...)
|
||||
// it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic
|
||||
return fmt.Errorf("meta table add collection failed,error = %w", err)
|
||||
}
|
||||
|
||||
// use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process
|
||||
t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
|
||||
t.core.SendTimeTick(ts, reason)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err = createCollectionFn()
|
||||
if err != nil {
|
||||
if err = createCollectionFn(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.core.CallWatchChannels(ctx, collID, vchanNames)
|
||||
if err != nil {
|
||||
if err = t.core.CallWatchChannels(ctx, collID, vchanNames); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -313,16 +312,16 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
|
||||
// clear ddl timetick in all conditions
|
||||
defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
|
||||
|
||||
err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr)
|
||||
if err != nil {
|
||||
if err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames)
|
||||
if err != nil {
|
||||
// update meta table after send dd operation
|
||||
if err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process
|
||||
t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
|
||||
t.core.SendTimeTick(ts, reason)
|
||||
|
||||
@ -343,8 +342,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
err = dropCollectionFn()
|
||||
if err != nil {
|
||||
if err = dropCollectionFn(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -526,23 +524,22 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
|
||||
// clear ddl timetick in all conditions
|
||||
defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
|
||||
|
||||
err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr)
|
||||
if err != nil {
|
||||
if err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames)
|
||||
if err != nil {
|
||||
// update meta table after send dd operation
|
||||
if err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process
|
||||
t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
|
||||
t.core.SendTimeTick(ts, reason)
|
||||
return nil
|
||||
}
|
||||
|
||||
err = createPartitionFn()
|
||||
if err != nil {
|
||||
if err = createPartitionFn(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -611,23 +608,22 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
|
||||
// clear ddl timetick in all conditions
|
||||
defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
|
||||
|
||||
_, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr)
|
||||
if err != nil {
|
||||
if err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames)
|
||||
if err != nil {
|
||||
// update meta table after send dd operation
|
||||
if _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process
|
||||
t.core.chanTimeTick.removeDdlTimeTick(ts, reason)
|
||||
t.core.SendTimeTick(ts, reason)
|
||||
return nil
|
||||
}
|
||||
|
||||
err = dropPartitionFn()
|
||||
if err != nil {
|
||||
if err = dropPartitionFn(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user