diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 92755c639c..b8a0076fd9 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -333,7 +333,16 @@ func (c *ChannelManager) AddNode(nodeID int64) error { zap.Int64("registered node", nodeID), zap.Array("updates", updates)) - return c.updateWithTimer(updates, datapb.ChannelWatchState_ToRelease) + state := datapb.ChannelWatchState_ToRelease + + for _, u := range updates { + if u.Type == Delete && u.NodeID == bufferID { + state = datapb.ChannelWatchState_ToWatch + break + } + } + + return c.updateWithTimer(updates, state) } // DeleteNode deletes the node from the cluster. diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index cdc1e10f23..6c16e08436 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -283,7 +283,27 @@ func TestChannelManager(t *testing.T) { }() prefix := Params.DataCoordCfg.ChannelWatchSubPath - t.Run("test AddNode", func(t *testing.T) { + waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) { + for { + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName) + v, err := metakv.Load(key) + if err == nil && len(v) > 0 { + watchInfo, err := parseWatchInfo(key, []byte(v)) + require.NoError(t, err) + require.Equal(t, waitState, watchInfo.GetState()) + + watchInfo.State = storeState + data, err := proto.Marshal(watchInfo) + require.NoError(t, err) + + metakv.Save(key, string(data)) + break + } + time.Sleep(100 * time.Millisecond) + } + } + + t.Run("test AddNode with avalible node", func(t *testing.T) { // Note: this test is based on the default registerPolicy defer metakv.RemoveWithPrefix("") var ( @@ -322,6 +342,45 @@ func TestChannelManager(t *testing.T) { checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID) }) + t.Run("test AddNode with no available node", func(t *testing.T) { + // Note: this test is based on the default registerPolicy + defer metakv.RemoveWithPrefix("") + var ( + collectionID = UniqueID(8) + nodeID = UniqueID(119) + channel1, channel2 = "channel1", "channel2" + ) + + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + bufferID: {bufferID, []*channel{ + {channel1, collectionID}, + {channel2, collectionID}, + }}, + }, + } + + err = chManager.AddNode(nodeID) + assert.NoError(t, err) + + waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1) + waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel2) + + chInfo := chManager.store.GetNode(nodeID) + assert.Equal(t, 2, len(chInfo.Channels)) + chManager.Match(nodeID, channel1) + chManager.Match(nodeID, channel2) + + err = chManager.Watch(&channel{"channel-3", collectionID}) + assert.NoError(t, err) + checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID) + chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel-3", nodeID}) + + }) + t.Run("test Watch", func(t *testing.T) { defer metakv.RemoveWithPrefix("") var ( @@ -786,7 +845,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { prefix := Params.DataCoordCfg.ChannelWatchSubPath - t.Run("one node with two channels add a new node", func(t *testing.T) { + t.Run("one node with three channels add a new node", func(t *testing.T) { defer metakv.RemoveWithPrefix("") var ( diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 444c214de6..3956f3f6f7 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -125,7 +125,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { defer t.wg.Done() // If AutoCompaction disabled, global loop will not start - if !Params.DataCoordCfg.EnableAutoCompaction { + if !Params.DataCoordCfg.GetEnableAutoCompaction() { return } @@ -176,7 +176,7 @@ func (t *compactionTrigger) triggerCompaction(timetravel *timetravel) error { // triggerSingleCompaction triger a compaction bundled with collection-partiiton-channel-segment func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, timetravel *timetravel) error { // If AutoCompaction diabled, flush request will not trigger compaction - if !Params.DataCoordCfg.EnableAutoCompaction { + if !Params.DataCoordCfg.GetEnableAutoCompaction() { return nil } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 3f27115794..e71a608e60 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -664,7 +664,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - Params.DataCoordCfg.EnableAutoCompaction = tt.fields.autoCompactionEnabled + Params.DataCoordCfg.SetEnableAutoCompaction(tt.fields.autoCompactionEnabled) tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, @@ -1083,7 +1083,7 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - Params.DataCoordCfg.EnableAutoCompaction = tt.fields.enableAutoCompaction + Params.DataCoordCfg.SetEnableAutoCompaction(tt.fields.enableAutoCompaction) tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index c6382be6e0..f7904d6db8 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -884,7 +884,7 @@ type dataCoordConfig struct { UpdatedTime time.Time EnableCompaction bool - EnableAutoCompaction bool + EnableAutoCompaction atomic.Value EnableGarbageCollection bool // Garbage Collection @@ -955,7 +955,19 @@ func (p *dataCoordConfig) initGCDropTolerance() { } func (p *dataCoordConfig) initEnableAutoCompaction() { - p.EnableAutoCompaction = p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false) + p.EnableAutoCompaction.Store(p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false)) +} + +func (p *dataCoordConfig) SetEnableAutoCompaction(enable bool) { + p.EnableAutoCompaction.Store(enable) +} + +func (p *dataCoordConfig) GetEnableAutoCompaction() bool { + enable := p.EnableAutoCompaction.Load() + if enable != nil { + return enable.(bool) + } + return false } func (p *dataCoordConfig) SetNodeID(id UniqueID) {