milvus/internal/querynode/load_segment_task.go
xige-16 ec617d4952
Avoid delete segment which has been loaded (#20730)
Signed-off-by: xige-16 <xi.ge@zilliz.com>

Signed-off-by: xige-16 <xi.ge@zilliz.com>
2022-11-21 16:33:12 +08:00

236 lines
8.3 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import (
"context"
"fmt"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus/internal/log"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/samber/lo"
)
type loadSegmentsTask struct {
baseTask
req *queryPb.LoadSegmentsRequest
node *QueryNode
}
// loadSegmentsTask
func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
log.Ctx(ctx).Info("LoadSegmentTask PreExecute start")
var err error
// init meta
collectionID := l.req.GetCollectionID()
l.node.metaReplica.addCollection(collectionID, l.req.GetSchema())
for _, partitionID := range l.req.GetLoadMeta().GetPartitionIDs() {
err = l.node.metaReplica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
// filter segments that are already loaded in this querynode
var filteredInfos []*queryPb.SegmentLoadInfo
for _, info := range l.req.Infos {
has, err := l.node.metaReplica.hasSegment(info.SegmentID, segmentTypeSealed)
if err != nil {
return err
}
if !has {
filteredInfos = append(filteredInfos, info)
} else {
log.Info("ignore segment that is already loaded", zap.Int64("collectionID", info.CollectionID), zap.Int64("segmentID", info.SegmentID))
}
}
l.req.Infos = filteredInfos
log.Info("LoadSegmentTask PreExecute done")
return nil
}
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
log.Ctx(ctx).Info("LoadSegmentTask Execute start")
if len(l.req.Infos) == 0 {
log.Info("all segments loaded")
return nil
}
segmentIDs := lo.Map(l.req.Infos, func(info *queryPb.SegmentLoadInfo, idx int) UniqueID { return info.SegmentID })
l.node.metaReplica.addSegmentsLoadingList(segmentIDs)
defer l.node.metaReplica.removeSegmentsLoadingList(segmentIDs)
loadDoneSegmentIDs, loadErr := l.node.loader.LoadSegment(l.ctx, l.req, segmentTypeSealed)
if len(loadDoneSegmentIDs) > 0 {
vchanName := make([]string, 0)
for _, deltaPosition := range l.req.DeltaPositions {
vchanName = append(vchanName, deltaPosition.ChannelName)
}
// TODO delta channel need to released 1. if other watchDeltaChannel fail 2. when segment release
err := l.watchDeltaChannel(vchanName)
if err != nil {
// roll back
for _, segment := range l.req.Infos {
l.node.metaReplica.removeSegment(segment.SegmentID, segmentTypeSealed)
}
log.Warn("failed to watch Delta channel while load segment", zap.Int64("collectionID", l.req.CollectionID),
zap.Int64("replicaID", l.req.ReplicaID), zap.Error(err))
return err
}
runningGroup, groupCtx := errgroup.WithContext(l.ctx)
for _, deltaPosition := range l.req.DeltaPositions {
pos := deltaPosition
runningGroup.Go(func() error {
// reload data from dml channel
return l.node.loader.FromDmlCPLoadDelete(groupCtx, l.req.CollectionID, pos,
lo.FilterMap(l.req.Infos, func(info *queryPb.SegmentLoadInfo, _ int) (int64, bool) {
return info.GetSegmentID(), funcutil.SliceContain(loadDoneSegmentIDs, info.SegmentID) && info.GetInsertChannel() == pos.GetChannelName()
}))
})
}
err = runningGroup.Wait()
if err != nil {
for _, segment := range l.req.Infos {
l.node.metaReplica.removeSegment(segment.SegmentID, segmentTypeSealed)
}
for _, vchannel := range vchanName {
l.node.dataSyncService.removeEmptyFlowGraphByChannel(l.req.CollectionID, vchannel)
}
log.Warn("failed to load delete data while load segment", zap.Int64("collectionID", l.req.CollectionID),
zap.Int64("replicaID", l.req.ReplicaID), zap.Error(err))
return err
}
}
if loadErr != nil {
log.Warn("failed to load segment", zap.Int64("collectionID", l.req.CollectionID),
zap.Int64("replicaID", l.req.ReplicaID), zap.Error(loadErr))
return loadErr
}
log.Info("LoadSegmentTask Execute done", zap.Int64("collectionID", l.req.CollectionID),
zap.Int64("replicaID", l.req.ReplicaID))
return nil
}
// internal helper function to subscribe delta channel
func (l *loadSegmentsTask) watchDeltaChannel(vchanName []string) error {
collectionID := l.req.CollectionID
var vDeltaChannels []string
VPDeltaChannels := make(map[string]string)
for _, v := range vchanName {
dc, err := funcutil.ConvertChannelName(v, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
log.Warn("watchDeltaChannels, failed to convert deltaChannel from dmlChannel", zap.String("DmlChannel", v), zap.Error(err))
return err
}
p := funcutil.ToPhysicalChannel(dc)
vDeltaChannels = append(vDeltaChannels, dc)
VPDeltaChannels[dc] = p
}
log.Info("Starting WatchDeltaChannels ...",
zap.Int64("collectionID", collectionID),
zap.Any("channels", VPDeltaChannels),
)
coll, err := l.node.metaReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
// filter out duplicated channels
vDeltaChannels = coll.AddVDeltaChannels(vDeltaChannels, VPDeltaChannels)
defer func() {
if err != nil {
for _, vDeltaChannel := range vDeltaChannels {
coll.removeVDeltaChannel(vDeltaChannel)
}
}
}()
if len(vDeltaChannels) == 0 {
log.Warn("all delta channels has be added before, ignore watch delta requests")
return nil
}
channel2FlowGraph, err := l.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
if err != nil {
log.Warn("watchDeltaChannel, add flowGraph for deltaChannel failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels), zap.Error(err))
return err
}
consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, paramtable.GetNodeID())
// channels as consumer
for channel, fg := range channel2FlowGraph {
pchannel := VPDeltaChannels[channel]
// use pChannel to consume
err = fg.consumeFlowGraphFromLatest(pchannel, consumeSubName)
if err != nil {
log.Error("msgStream as consumer failed for deltaChannels", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
break
}
}
if err != nil {
log.Warn("watchDeltaChannel, add flowGraph for deltaChannel failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels), zap.Error(err))
for _, fg := range channel2FlowGraph {
fg.flowGraph.Close()
}
gcChannels := make([]Channel, 0)
for channel := range channel2FlowGraph {
gcChannels = append(gcChannels, channel)
}
l.node.dataSyncService.removeFlowGraphsByDeltaChannels(gcChannels)
return err
}
log.Info("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
// create tSafe
for _, channel := range vDeltaChannels {
l.node.tSafeReplica.addTSafe(channel)
}
// add tsafe watch in query shard if exists, we find no way to handle it if query shard not exist
for _, channel := range vDeltaChannels {
dmlChannel, err := funcutil.ConvertChannelName(channel, Params.CommonCfg.RootCoordDelta, Params.CommonCfg.RootCoordDml)
if err != nil {
log.Error("failed to convert delta channel to dml", zap.String("channel", channel), zap.Error(err))
panic(err)
}
err = l.node.queryShardService.addQueryShard(collectionID, dmlChannel, l.req.GetReplicaID())
if err != nil {
log.Error("failed to add shard Service to query shard", zap.String("channel", channel), zap.Error(err))
panic(err)
}
}
// start flow graphs
for _, fg := range channel2FlowGraph {
fg.flowGraph.Start()
}
log.Info("WatchDeltaChannels done", zap.Int64("collectionID", collectionID), zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels)))
return nil
}