From 829077ad0c85c380f75289bab3fa7576e8ad0674 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 15 Dec 2021 08:59:08 +0800 Subject: [PATCH] Optimize reSentDdMsg logic (#13378) Signed-off-by: yudong.cai --- internal/rootcoord/root_coord.go | 59 +++++++++++++++------------ internal/rootcoord/root_coord_test.go | 6 +-- internal/rootcoord/task.go | 46 ++++++++++----------- 3 files changed, 56 insertions(+), 55 deletions(-) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 66553c112f..ca8543edf4 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1053,7 +1053,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { return err } - var invalidateCache bool + invalidateCache := false var ts typeutil.Timestamp var collName string @@ -1065,14 +1065,14 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil { return err } - collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0) - if err != nil { - return err + if _, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0); err != nil { + if _, err = c.SendDdCreateCollectionReq(ctx, &ddReq, ddReq.PhysicalChannelNames); err != nil { + return err + } + } else { + log.Debug("collection has been created, skip re-send CreateCollection", + zap.String("collection name", collName)) } - if _, err = c.SendDdCreateCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { - return err - } - invalidateCache = false case DropCollectionDDType: var ddReq = internalpb.DropCollectionRequest{} if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil { @@ -1080,14 +1080,15 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { } ts = ddReq.Base.Timestamp collName = ddReq.CollectionName - collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0) - if err != nil { - return err + if collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0); err == nil { + if err = c.SendDdDropCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { + return err + } + invalidateCache = true + } else { + log.Debug("collection has been removed, skip re-send DropCollection", + zap.String("collection name", collName)) } - if err = c.SendDdDropCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { - return err - } - invalidateCache = true case CreatePartitionDDType: var ddReq = internalpb.CreatePartitionRequest{} if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil { @@ -1099,13 +1100,15 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { if err != nil { return err } - if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err == nil { - return fmt.Errorf("partition %s already created", ddReq.PartitionName) + if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err != nil { + if err = c.SendDdCreatePartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { + return err + } + invalidateCache = true + } else { + log.Debug("partition has been created, skip re-send CreatePartition", + zap.String("collection name", collName), zap.String("partition name", ddReq.PartitionName)) } - if err = c.SendDdCreatePartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { - return err - } - invalidateCache = true case DropPartitionDDType: var ddReq = internalpb.DropPartitionRequest{} if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil { @@ -1117,13 +1120,15 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { if err != nil { return err } - if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err != nil { - return err + if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err == nil { + if err = c.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { + return err + } + invalidateCache = true + } else { + log.Debug("partition has been removed, skip re-send DropPartition", + zap.String("collection name", collName), zap.String("partition name", ddReq.PartitionName)) } - if err = c.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { - return err - } - invalidateCache = true default: return fmt.Errorf("invalid DdOperation %s", ddOp.Type) } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 2224b300bb..7d487ed3b6 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -919,7 +919,7 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, collMeta.PartitionIDs[1], ddReq.PartitionID) err = core.reSendDdMsg(core.ctx, true) - assert.NotNil(t, err) + assert.Nil(t, err) }) t.Run("has partition", func(t *testing.T) { @@ -1254,7 +1254,7 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, dropPartID, ddReq.PartitionID) err = core.reSendDdMsg(core.ctx, true) - assert.NotNil(t, err) + assert.Nil(t, err) }) t.Run("remove DQL msgstream", func(t *testing.T) { @@ -1343,7 +1343,7 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, collMeta.ID, ddReq.CollectionID) err = core.reSendDdMsg(core.ctx, true) - assert.NotNil(t, err) + assert.Nil(t, err) }) t.Run("context_cancel", func(t *testing.T) { diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index cd947c43ba..eef5599699 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -220,27 +220,26 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { Data: ids[pchan], }) } - err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr) - if err != nil { + + // update meta table after send dd operation + if err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr); err != nil { t.core.chanTimeTick.removeDmlChannels(chanNames...) t.core.chanTimeTick.removeDeltaChannels(deltaChanNames...) // it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic return fmt.Errorf("meta table add collection failed,error = %w", err) } + // use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process t.core.chanTimeTick.removeDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) - return nil } - err = createCollectionFn() - if err != nil { + if err = createCollectionFn(); err != nil { return err } - err = t.core.CallWatchChannels(ctx, collID, vchanNames) - if err != nil { + if err = t.core.CallWatchChannels(ctx, collID, vchanNames); err != nil { return err } @@ -313,16 +312,16 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { // clear ddl timetick in all conditions defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason) - err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr) - if err != nil { + if err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames); err != nil { return err } - err = t.core.SendDdDropCollectionReq(ctx, &ddReq, collMeta.PhysicalChannelNames) - if err != nil { + // update meta table after send dd operation + if err = t.core.MetaTable.DeleteCollection(collMeta.ID, ts, ddOpStr); err != nil { return err } + // use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process t.core.chanTimeTick.removeDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) @@ -343,8 +342,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { return nil } - err = dropCollectionFn() - if err != nil { + if err = dropCollectionFn(); err != nil { return err } @@ -526,23 +524,22 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { // clear ddl timetick in all conditions defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason) - err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr) - if err != nil { + if err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames); err != nil { return err } - err = t.core.SendDdCreatePartitionReq(ctx, &ddReq, collMeta.PhysicalChannelNames) - if err != nil { + // update meta table after send dd operation + if err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partID, ts, ddOpStr); err != nil { return err } + // use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process t.core.chanTimeTick.removeDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) return nil } - err = createPartitionFn() - if err != nil { + if err = createPartitionFn(); err != nil { return err } @@ -611,23 +608,22 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { // clear ddl timetick in all conditions defer t.core.chanTimeTick.removeDdlTimeTick(ts, reason) - _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr) - if err != nil { + if err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil { return err } - err = t.core.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames) - if err != nil { + // update meta table after send dd operation + if _, err = t.core.MetaTable.DeletePartition(collInfo.ID, t.Req.PartitionName, ts, ddOpStr); err != nil { return err } + // use addDdlTimeTick and removeDdlTimeTick to mark DDL operation in process t.core.chanTimeTick.removeDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) return nil } - err = dropPartitionFn() - if err != nil { + if err = dropPartitionFn(); err != nil { return err }