Fix a bug that causes infinite loop (#17550)

This will cause AddNode() to keep its lock forever so DataCoord would just hang

issue: #17366
Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
Ten Thousand Leaves 2022-06-15 20:50:10 +08:00 committed by GitHub
parent 98b32f3eba
commit 51a9e54b21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 26 deletions

View File

@ -704,7 +704,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
}
}
// Release writes ToRlease channel watch states for a channel
// Release writes ToRelease channel watch states for a channel
func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -723,7 +723,7 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
return err
}
// Reassign removes channel assignment from a datanode
// Reassign reassigns a channel to another DataNode.
func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -747,10 +747,11 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
return nil
}
// reassign policy won't choose the same Node for a ressignment of a channel
// Reassign policy won't choose the original node when a reassigning a channel.
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 { // skip the remove if reassign to the original node
log.Warn("fail to reassign channel to other nodes, assign to the original Node",
if len(updates) <= 0 {
// Skip the remove if reassign to the original node.
log.Warn("failed to reassign channel to other nodes, assigning to the original DataNode",
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{ch})
@ -760,11 +761,13 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
}
}
log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))
log.Info("channel manager reassigning channels",
zap.Int64("old node ID", nodeID),
zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}
// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info.
// CleanupAndReassign tries to clean up datanode's subscription, and then reassigns the channel to another DataNode.
func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -796,11 +799,12 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
return nil
}
// reassign policy won't choose the same Node for a ressignment of a channel
// Reassign policy won't choose the original node when a reassigning a channel.
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 { // skip the remove if reassign to the original node
log.Warn("fail to reassign channel to other nodes, add channel to the original node",
zap.Int64("nodeID", nodeID),
if len(updates) <= 0 {
// Skip the remove if reassign to the original node.
log.Warn("failed to reassign channel to other nodes, add channel to the original node",
zap.Int64("node ID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{chToCleanUp})
} else {
@ -809,7 +813,9 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
}
}
log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates))
log.Info("channel manager reassigning channels",
zap.Int64("old nodeID", nodeID),
zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}

View File

@ -51,48 +51,53 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) ChannelOpSet
}
// AvgAssignRegisterPolicy assigns channels with average to new registered node
// Register will not directly delete the node-channel pair, channel manager handles the release itself
// Register will not directly delete the node-channel pair. Channel manager will handle channel release.
func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet {
opSet := BufferChannelAssignPolicy(store, nodeID)
if len(opSet) != 0 {
return opSet
}
infos := store.GetNodesChannels()
infos = filterNode(infos, nodeID)
// Get a list of available node-channel info.
avaNodeChannel := filterNode(store.GetNodesChannels(), nodeID)
channelNum := 0
for _, info := range infos {
for _, info := range avaNodeChannel {
channelNum += len(info.Channels)
}
avg := channelNum / (len(store.GetNodes()) + 1)
if avg == 0 {
chPerNode := channelNum / (len(store.GetNodes()) + 1)
if chPerNode == 0 {
return nil
}
// sort in descending order and reallocate
sort.Slice(infos, func(i, j int) bool {
return len(infos[i].Channels) > len(infos[j].Channels)
sort.Slice(avaNodeChannel, func(i, j int) bool {
return len(avaNodeChannel[i].Channels) > len(avaNodeChannel[j].Channels)
})
releases := make(map[int64][]*channel)
for i := 0; i < avg; {
t := infos[i%len(infos)]
idx := i / len(infos)
if idx >= len(t.Channels) {
for i := 0; i < chPerNode; i++ {
// Pick a node with its channel to release.
toRelease := avaNodeChannel[i%len(avaNodeChannel)]
// Pick a channel that will be reassigned to the new node later.
chIdx := i / len(avaNodeChannel)
if chIdx >= len(toRelease.Channels) {
// Node has too few channels, simply skip. No re-picking.
// TODO: Consider re-picking in case assignment is extremely uneven?
continue
}
releases[t.NodeID] = append(releases[t.NodeID], t.Channels[idx])
i++
releases[toRelease.NodeID] = append(releases[toRelease.NodeID], toRelease.Channels[chIdx])
}
opSet = ChannelOpSet{}
// Channels in `releases` are reassigned eventually by channel manager.
for k, v := range releases {
opSet.Add(k, v)
}
return opSet
}
// filterNode filters out node-channel info where node ID == `nodeID`.
func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo {
filtered := make([]*NodeChannelInfo, 0)
for _, info := range infos {
@ -141,6 +146,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic
}
}
// Channels in `releases` are reassigned eventually by channel manager.
for id, channels := range releases {
opSet.Add(id, channels)
}