mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 03:18:29 +08:00
Fix channelUnsubscribe data race and logic (#16946)
- Add a RWMutex for container/list which is not goroutine-safe - Fix the element in list is never removed Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
2d3981c98c
commit
ae717bf991
@ -45,6 +45,7 @@ type channelUnsubscribeHandler struct {
|
||||
kvClient *etcdkv.EtcdKV
|
||||
factory msgstream.Factory
|
||||
|
||||
mut sync.RWMutex // mutex for channelInfos, since container/list is not goroutine-safe
|
||||
channelInfos *list.List
|
||||
downNodeChan chan int64
|
||||
|
||||
@ -73,6 +74,13 @@ func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factor
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
// appendUnsubInfo pushes unsub info safely
|
||||
func (csh *channelUnsubscribeHandler) appendUnsubInfo(info *querypb.UnsubscribeChannelInfo) {
|
||||
csh.mut.Lock()
|
||||
defer csh.mut.Unlock()
|
||||
csh.channelInfos.PushBack(info)
|
||||
}
|
||||
|
||||
// reloadFromKV reload unsolved channels to unsubscribe
|
||||
func (csh *channelUnsubscribeHandler) reloadFromKV() error {
|
||||
log.Info("start reload unsubscribe channelInfo from kv")
|
||||
@ -86,7 +94,7 @@ func (csh *channelUnsubscribeHandler) reloadFromKV() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
csh.channelInfos.PushBack(channelInfo)
|
||||
csh.appendUnsubInfo(channelInfo)
|
||||
csh.downNodeChan <- channelInfo.NodeID
|
||||
}
|
||||
|
||||
@ -102,11 +110,14 @@ func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.Un
|
||||
}
|
||||
// when queryCoord is restarted multiple times, the nodeID of added channelInfo may be the same
|
||||
hasEnqueue := false
|
||||
// reduce the lock range to iteration here, since `addUnsubscribeChannelInfo` is called one by one
|
||||
csh.mut.RLock()
|
||||
for e := csh.channelInfos.Back(); e != nil; e = e.Prev() {
|
||||
if e.Value.(*querypb.UnsubscribeChannelInfo).NodeID == nodeID {
|
||||
hasEnqueue = true
|
||||
}
|
||||
}
|
||||
csh.mut.RUnlock()
|
||||
|
||||
if !hasEnqueue {
|
||||
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
|
||||
@ -114,7 +125,7 @@ func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.Un
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
csh.channelInfos.PushBack(info)
|
||||
csh.appendUnsubInfo(info)
|
||||
csh.downNodeChan <- info.NodeID
|
||||
log.Info("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID))
|
||||
}
|
||||
@ -129,7 +140,10 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
|
||||
log.Info("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end")
|
||||
return
|
||||
case <-csh.downNodeChan:
|
||||
csh.mut.RLock()
|
||||
e := csh.channelInfos.Front()
|
||||
channelInfo := csh.channelInfos.Front().Value.(*querypb.UnsubscribeChannelInfo)
|
||||
csh.mut.RUnlock()
|
||||
nodeID := channelInfo.NodeID
|
||||
for _, collectionChannels := range channelInfo.CollectionChannels {
|
||||
collectionID := collectionChannels.CollectionID
|
||||
@ -142,6 +156,10 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
|
||||
log.Error("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
csh.mut.Lock()
|
||||
csh.channelInfos.Remove(e)
|
||||
csh.mut.Unlock()
|
||||
log.Info("unsubscribe channels success", zap.Int64("nodeID", nodeID))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user