From 51a9e54b216c692bcc3da70b6df1bad3f0802161 Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Wed, 15 Jun 2022 20:50:10 +0800 Subject: [PATCH] Fix a bug that causes infinite loop (#17550) This will cause AddNode() to keep its lock forever so DataCoord would just hang issue: #17366 Signed-off-by: Yuchen Gao --- internal/datacoord/channel_manager.go | 30 +++++++++++++---------- internal/datacoord/policy.go | 34 ++++++++++++++++----------- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index ba2561742b..4a817fecb5 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -704,7 +704,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { } } -// Release writes ToRlease channel watch states for a channel +// Release writes ToRelease channel watch states for a channel func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error { c.mu.Lock() defer c.mu.Unlock() @@ -723,7 +723,7 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error { return err } -// Reassign removes channel assignment from a datanode +// Reassign reassigns a channel to another DataNode. func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error { c.mu.Lock() defer c.mu.Unlock() @@ -747,10 +747,11 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error { return nil } - // reassign policy won't choose the same Node for a ressignment of a channel + // Reassign policy won't choose the original node when a reassigning a channel. updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) - if len(updates) <= 0 { // skip the remove if reassign to the original node - log.Warn("fail to reassign channel to other nodes, assign to the original Node", + if len(updates) <= 0 { + // Skip the remove if reassign to the original node. + log.Warn("failed to reassign channel to other nodes, assigning to the original DataNode", zap.Int64("nodeID", nodeID), zap.String("channel name", channelName)) updates.Add(nodeID, []*channel{ch}) @@ -760,11 +761,13 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error { } } - log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates)) + log.Info("channel manager reassigning channels", + zap.Int64("old node ID", nodeID), + zap.Array("updates", updates)) return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) } -// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info. +// CleanupAndReassign tries to clean up datanode's subscription, and then reassigns the channel to another DataNode. func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error { c.mu.Lock() defer c.mu.Unlock() @@ -796,11 +799,12 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) return nil } - // reassign policy won't choose the same Node for a ressignment of a channel + // Reassign policy won't choose the original node when a reassigning a channel. updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) - if len(updates) <= 0 { // skip the remove if reassign to the original node - log.Warn("fail to reassign channel to other nodes, add channel to the original node", - zap.Int64("nodeID", nodeID), + if len(updates) <= 0 { + // Skip the remove if reassign to the original node. + log.Warn("failed to reassign channel to other nodes, add channel to the original node", + zap.Int64("node ID", nodeID), zap.String("channel name", channelName)) updates.Add(nodeID, []*channel{chToCleanUp}) } else { @@ -809,7 +813,9 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) } } - log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates)) + log.Info("channel manager reassigning channels", + zap.Int64("old nodeID", nodeID), + zap.Array("updates", updates)) return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) } diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index 1e7391bbd3..b22fee67e6 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -51,48 +51,53 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) ChannelOpSet } // AvgAssignRegisterPolicy assigns channels with average to new registered node -// Register will not directly delete the node-channel pair, channel manager handles the release itself +// Register will not directly delete the node-channel pair. Channel manager will handle channel release. func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { opSet := BufferChannelAssignPolicy(store, nodeID) if len(opSet) != 0 { return opSet } - infos := store.GetNodesChannels() - infos = filterNode(infos, nodeID) + // Get a list of available node-channel info. + avaNodeChannel := filterNode(store.GetNodesChannels(), nodeID) channelNum := 0 - for _, info := range infos { + for _, info := range avaNodeChannel { channelNum += len(info.Channels) } - avg := channelNum / (len(store.GetNodes()) + 1) - if avg == 0 { + chPerNode := channelNum / (len(store.GetNodes()) + 1) + if chPerNode == 0 { return nil } // sort in descending order and reallocate - sort.Slice(infos, func(i, j int) bool { - return len(infos[i].Channels) > len(infos[j].Channels) + sort.Slice(avaNodeChannel, func(i, j int) bool { + return len(avaNodeChannel[i].Channels) > len(avaNodeChannel[j].Channels) }) releases := make(map[int64][]*channel) - for i := 0; i < avg; { - t := infos[i%len(infos)] - idx := i / len(infos) - if idx >= len(t.Channels) { + for i := 0; i < chPerNode; i++ { + // Pick a node with its channel to release. + toRelease := avaNodeChannel[i%len(avaNodeChannel)] + // Pick a channel that will be reassigned to the new node later. + chIdx := i / len(avaNodeChannel) + if chIdx >= len(toRelease.Channels) { + // Node has too few channels, simply skip. No re-picking. + // TODO: Consider re-picking in case assignment is extremely uneven? continue } - releases[t.NodeID] = append(releases[t.NodeID], t.Channels[idx]) - i++ + releases[toRelease.NodeID] = append(releases[toRelease.NodeID], toRelease.Channels[chIdx]) } opSet = ChannelOpSet{} + // Channels in `releases` are reassigned eventually by channel manager. for k, v := range releases { opSet.Add(k, v) } return opSet } +// filterNode filters out node-channel info where node ID == `nodeID`. func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo { filtered := make([]*NodeChannelInfo, 0) for _, info := range infos { @@ -141,6 +146,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic } } + // Channels in `releases` are reassigned eventually by channel manager. for id, channels := range releases { opSet.Add(id, channels) }