mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
Block Flowgraph before handle compacted segments (#19988)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com> Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
454ff2f1df
commit
1b5e765307
@ -911,6 +911,12 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
||||
return status, nil
|
||||
}
|
||||
|
||||
ds, ok := node.flowgraphManager.getFlowgraphService(channel.getChannelName(oneSegment))
|
||||
if !ok {
|
||||
status.Reason = fmt.Sprintf("failed to find flow graph service, err=%s", err.Error())
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// check if all compactedFrom segments are valid
|
||||
var invalidSegIDs []UniqueID
|
||||
for _, segID := range req.GetCompactedFrom() {
|
||||
@ -934,6 +940,9 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
||||
|
||||
channel.(*ChannelMeta).initPKBloomFilter(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
|
||||
|
||||
// block all flow graph so it's safe to remove segment
|
||||
ds.fg.Blockall()
|
||||
defer ds.fg.Unblock()
|
||||
if err := channel.mergeFlushedSegments(targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
|
||||
status.Reason = err.Error()
|
||||
return status, nil
|
||||
|
@ -80,6 +80,18 @@ func (fg *TimeTickedFlowGraph) Start() {
|
||||
})
|
||||
}
|
||||
|
||||
func (fg *TimeTickedFlowGraph) Blockall() {
|
||||
for _, v := range fg.nodeCtx {
|
||||
v.Block()
|
||||
}
|
||||
}
|
||||
|
||||
func (fg *TimeTickedFlowGraph) Unblock() {
|
||||
for _, v := range fg.nodeCtx {
|
||||
v.Unblock()
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes all nodes in flowgraph
|
||||
func (fg *TimeTickedFlowGraph) Close() {
|
||||
fg.stopOnce.Do(func() {
|
||||
|
@ -58,6 +58,8 @@ type nodeCtx struct {
|
||||
|
||||
closeCh chan struct{} // notify work to exit
|
||||
closeWg *sync.WaitGroup
|
||||
|
||||
blockMutex sync.RWMutex
|
||||
}
|
||||
|
||||
// Start invoke Node `Start` method and start a worker goroutine
|
||||
@ -68,6 +70,19 @@ func (nodeCtx *nodeCtx) Start() {
|
||||
go nodeCtx.work()
|
||||
}
|
||||
|
||||
func (nodeCtx *nodeCtx) Block() {
|
||||
// input node operate function will be blocking
|
||||
if !nodeCtx.node.IsInputNode() {
|
||||
nodeCtx.blockMutex.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
func (nodeCtx *nodeCtx) Unblock() {
|
||||
if !nodeCtx.node.IsInputNode() {
|
||||
nodeCtx.blockMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func isCloseMsg(msgs []Msg) bool {
|
||||
if len(msgs) == 1 {
|
||||
msg, ok := msgs[0].(*MsgStreamMsg)
|
||||
@ -102,14 +117,15 @@ func (nodeCtx *nodeCtx) work() {
|
||||
if !nodeCtx.node.IsInputNode() {
|
||||
input = <-nodeCtx.inputChannel
|
||||
}
|
||||
|
||||
// the input message decides whether the operate method is executed
|
||||
if isCloseMsg(input) {
|
||||
output = input
|
||||
}
|
||||
if len(output) == 0 {
|
||||
n := nodeCtx.node
|
||||
nodeCtx.blockMutex.RLock()
|
||||
output = n.Operate(input)
|
||||
nodeCtx.blockMutex.RUnlock()
|
||||
}
|
||||
// the output decide whether the node should be closed.
|
||||
if isCloseMsg(output) {
|
||||
|
Loading…
Reference in New Issue
Block a user