2021-04-19 13:47:10 +08:00
|
|
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
|
2021-04-12 09:18:43 +08:00
|
|
|
package querynode
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"math/rand"
|
2021-04-16 14:40:33 +08:00
|
|
|
"strconv"
|
2021-06-09 11:37:55 +08:00
|
|
|
"time"
|
2021-04-12 09:18:43 +08:00
|
|
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
2021-06-15 20:06:10 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
2021-06-16 11:09:56 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
2021-06-15 20:06:10 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
2021-04-22 14:45:57 +08:00
|
|
|
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
|
2021-04-12 09:18:43 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
type task interface {
|
|
|
|
ID() UniqueID // return ReqID
|
|
|
|
SetID(uid UniqueID) // set ReqID
|
|
|
|
Timestamp() Timestamp
|
|
|
|
PreExecute(ctx context.Context) error
|
|
|
|
Execute(ctx context.Context) error
|
|
|
|
PostExecute(ctx context.Context) error
|
|
|
|
WaitToFinish() error
|
|
|
|
Notify(err error)
|
|
|
|
OnEnqueue() error
|
|
|
|
}
|
|
|
|
|
|
|
|
type baseTask struct {
|
|
|
|
done chan error
|
|
|
|
ctx context.Context
|
|
|
|
id UniqueID
|
|
|
|
}
|
|
|
|
|
2021-04-16 14:40:33 +08:00
|
|
|
type watchDmChannelsTask struct {
|
|
|
|
baseTask
|
|
|
|
req *queryPb.WatchDmChannelsRequest
|
|
|
|
node *QueryNode
|
|
|
|
}
|
|
|
|
|
2021-04-12 09:18:43 +08:00
|
|
|
type loadSegmentsTask struct {
|
|
|
|
baseTask
|
|
|
|
req *queryPb.LoadSegmentsRequest
|
|
|
|
node *QueryNode
|
|
|
|
}
|
|
|
|
|
|
|
|
type releaseCollectionTask struct {
|
|
|
|
baseTask
|
|
|
|
req *queryPb.ReleaseCollectionRequest
|
|
|
|
node *QueryNode
|
|
|
|
}
|
|
|
|
|
|
|
|
type releasePartitionsTask struct {
|
|
|
|
baseTask
|
|
|
|
req *queryPb.ReleasePartitionsRequest
|
|
|
|
node *QueryNode
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *baseTask) ID() UniqueID {
|
|
|
|
return b.id
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *baseTask) SetID(uid UniqueID) {
|
|
|
|
b.id = uid
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *baseTask) WaitToFinish() error {
|
2021-04-16 14:40:33 +08:00
|
|
|
err := <-b.done
|
|
|
|
return err
|
2021-04-12 09:18:43 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (b *baseTask) Notify(err error) {
|
|
|
|
b.done <- err
|
|
|
|
}
|
|
|
|
|
2021-04-16 14:40:33 +08:00
|
|
|
// watchDmChannelsTask
|
|
|
|
func (w *watchDmChannelsTask) Timestamp() Timestamp {
|
|
|
|
if w.req.Base == nil {
|
|
|
|
log.Error("nil base req in watchDmChannelsTask", zap.Any("collectionID", w.req.CollectionID))
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
return w.req.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *watchDmChannelsTask) OnEnqueue() error {
|
|
|
|
if w.req == nil || w.req.Base == nil {
|
|
|
|
w.SetID(rand.Int63n(100000000000))
|
|
|
|
} else {
|
|
|
|
w.SetID(w.req.Base.MsgID)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *watchDmChannelsTask) PreExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|
|
|
collectionID := w.req.CollectionID
|
2021-06-15 12:41:40 +08:00
|
|
|
partitionID := w.req.PartitionID
|
|
|
|
loadPartition := partitionID != 0
|
2021-06-09 11:37:55 +08:00
|
|
|
|
2021-06-15 20:06:10 +08:00
|
|
|
// get all vChannels
|
|
|
|
vChannels := make([]Channel, 0)
|
|
|
|
pChannels := make([]Channel, 0)
|
2021-06-15 12:41:40 +08:00
|
|
|
for _, info := range w.req.Infos {
|
|
|
|
vChannels = append(vChannels, info.ChannelName)
|
2021-04-16 14:40:33 +08:00
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("starting WatchDmChannels ...",
|
|
|
|
zap.Any("collectionName", w.req.Schema.Name),
|
|
|
|
zap.Any("collectionID", collectionID),
|
2021-06-16 12:05:56 +08:00
|
|
|
zap.String("vChannels", fmt.Sprintln(vChannels)))
|
2021-06-15 12:41:40 +08:00
|
|
|
|
2021-06-15 20:06:10 +08:00
|
|
|
// get physical channels
|
|
|
|
desColReq := &milvuspb.DescribeCollectionRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_DescribeCollection,
|
|
|
|
},
|
|
|
|
CollectionID: collectionID,
|
|
|
|
}
|
|
|
|
desColRsp, err := w.node.masterService.DescribeCollection(ctx, desColReq)
|
|
|
|
if err != nil {
|
2021-06-16 12:05:56 +08:00
|
|
|
log.Error("get channels failed, err = " + err.Error())
|
2021-06-15 20:06:10 +08:00
|
|
|
return err
|
|
|
|
}
|
2021-06-16 12:05:56 +08:00
|
|
|
log.Debug("get channels from master",
|
2021-06-15 20:06:10 +08:00
|
|
|
zap.Any("collectionID", collectionID),
|
|
|
|
zap.Any("vChannels", desColRsp.VirtualChannelNames),
|
|
|
|
zap.Any("pChannels", desColRsp.PhysicalChannelNames),
|
|
|
|
)
|
|
|
|
VPChannels := make(map[string]string) // map[vChannel]pChannel
|
|
|
|
for _, ch := range vChannels {
|
|
|
|
for i := range desColRsp.VirtualChannelNames {
|
|
|
|
if desColRsp.VirtualChannelNames[i] == ch {
|
|
|
|
VPChannels[ch] = desColRsp.PhysicalChannelNames[i]
|
|
|
|
pChannels = append(pChannels, desColRsp.PhysicalChannelNames[i])
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(VPChannels) != len(vChannels) {
|
|
|
|
return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID))
|
|
|
|
}
|
|
|
|
log.Debug("get physical channels done", zap.Any("collectionID", collectionID))
|
2021-06-15 12:41:40 +08:00
|
|
|
|
|
|
|
// init replica
|
|
|
|
if hasCollectionInStreaming := w.node.streaming.replica.hasCollection(collectionID); !hasCollectionInStreaming {
|
|
|
|
err := w.node.streaming.replica.addCollection(collectionID, w.req.Schema)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
w.node.streaming.replica.initExcludedSegments(collectionID)
|
|
|
|
}
|
2021-06-16 12:05:56 +08:00
|
|
|
collection, err := w.node.streaming.replica.getCollectionByID(collectionID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
collection.addVChannels(vChannels)
|
|
|
|
collection.addPChannels(pChannels)
|
2021-06-15 12:41:40 +08:00
|
|
|
if hasCollectionInHistorical := w.node.historical.replica.hasCollection(collectionID); !hasCollectionInHistorical {
|
|
|
|
err := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if loadPartition {
|
|
|
|
if hasPartitionInStreaming := w.node.streaming.replica.hasPartition(partitionID); !hasPartitionInStreaming {
|
|
|
|
err := w.node.streaming.replica.addPartition(collectionID, partitionID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if hasPartitionInHistorical := w.node.historical.replica.hasPartition(partitionID); !hasPartitionInHistorical {
|
|
|
|
err := w.node.historical.replica.addPartition(collectionID, partitionID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2021-04-16 14:40:33 +08:00
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("watchDMChannel, init replica done", zap.Any("collectionID", collectionID))
|
2021-04-16 14:40:33 +08:00
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
// get subscription name
|
2021-04-16 14:40:33 +08:00
|
|
|
getUniqueSubName := func() string {
|
|
|
|
prefixName := Params.MsgChannelSubName
|
2021-06-15 12:41:40 +08:00
|
|
|
return prefixName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int())
|
2021-04-16 14:40:33 +08:00
|
|
|
}
|
2021-06-09 11:37:55 +08:00
|
|
|
consumeSubName := getUniqueSubName()
|
2021-04-16 14:40:33 +08:00
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
// group channels by to seeking or consuming
|
|
|
|
toSeekChannels := make([]*internalpb.MsgPosition, 0)
|
2021-06-15 20:06:10 +08:00
|
|
|
toSubChannels := make([]Channel, 0)
|
2021-04-16 14:40:33 +08:00
|
|
|
for _, info := range w.req.Infos {
|
2021-06-15 12:41:40 +08:00
|
|
|
if info.SeekPosition == nil || len(info.SeekPosition.MsgID) == 0 {
|
|
|
|
toSubChannels = append(toSubChannels, info.ChannelName)
|
2021-04-16 14:40:33 +08:00
|
|
|
continue
|
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
info.SeekPosition.MsgGroup = consumeSubName
|
|
|
|
toSeekChannels = append(toSeekChannels, info.SeekPosition)
|
2021-04-16 14:40:33 +08:00
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("watchDMChannel, group channels done", zap.Any("collectionID", collectionID))
|
2021-04-16 14:40:33 +08:00
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
// add check points info
|
2021-06-16 11:09:56 +08:00
|
|
|
checkPointInfos := make([]*datapb.SegmentInfo, 0)
|
2021-06-15 12:41:40 +08:00
|
|
|
for _, info := range w.req.Infos {
|
2021-06-16 11:09:56 +08:00
|
|
|
checkPointInfos = append(checkPointInfos, info.UnflushedSegments...)
|
2021-06-15 12:41:40 +08:00
|
|
|
}
|
2021-06-15 20:06:10 +08:00
|
|
|
err = w.node.streaming.replica.addExcludedSegments(collectionID, checkPointInfos)
|
2021-06-09 11:37:55 +08:00
|
|
|
if err != nil {
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Error(err.Error())
|
2021-06-09 11:37:55 +08:00
|
|
|
return err
|
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("watchDMChannel, add check points info done", zap.Any("collectionID", collectionID))
|
2021-06-09 11:37:55 +08:00
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
// create tSafe
|
2021-06-16 12:05:56 +08:00
|
|
|
for _, channel := range vChannels {
|
2021-06-15 12:41:40 +08:00
|
|
|
w.node.streaming.tSafeReplica.addTSafe(channel)
|
|
|
|
}
|
|
|
|
|
|
|
|
// add flow graph
|
|
|
|
if loadPartition {
|
|
|
|
err = w.node.streaming.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
log.Debug("query node add partition flow graphs", zap.Any("channels", vChannels))
|
|
|
|
} else {
|
|
|
|
err = w.node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, vChannels)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
log.Debug("query node add collection flow graphs", zap.Any("channels", vChannels))
|
|
|
|
}
|
|
|
|
|
|
|
|
// channels as consumer
|
2021-06-15 20:06:10 +08:00
|
|
|
var nodeFGs map[Channel]*queryNodeFlowGraph
|
2021-06-15 12:41:40 +08:00
|
|
|
if loadPartition {
|
|
|
|
nodeFGs, err = w.node.streaming.dataSyncService.getPartitionFlowGraphs(partitionID, vChannels)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
nodeFGs, err = w.node.streaming.dataSyncService.getCollectionFlowGraphs(collectionID, vChannels)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-06-09 11:37:55 +08:00
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
for _, channel := range toSubChannels {
|
2021-06-09 11:37:55 +08:00
|
|
|
for _, fg := range nodeFGs {
|
|
|
|
if fg.channel == channel {
|
2021-06-15 20:06:10 +08:00
|
|
|
// use pChannel to consume
|
|
|
|
err := fg.consumerFlowGraph(VPChannels[channel], consumeSubName)
|
2021-06-09 11:37:55 +08:00
|
|
|
if err != nil {
|
|
|
|
errMsg := "msgStream consume error :" + err.Error()
|
|
|
|
log.Error(errMsg)
|
|
|
|
return errors.New(errMsg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("as consumer channels",
|
|
|
|
zap.Any("collectionID", collectionID),
|
|
|
|
zap.Any("toSubChannels", toSubChannels))
|
2021-06-09 11:37:55 +08:00
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
// seek channel
|
|
|
|
for _, pos := range toSeekChannels {
|
2021-06-09 11:37:55 +08:00
|
|
|
for _, fg := range nodeFGs {
|
|
|
|
if fg.channel == pos.ChannelName {
|
2021-06-15 12:41:40 +08:00
|
|
|
pos.MsgGroup = consumeSubName
|
2021-06-15 20:06:10 +08:00
|
|
|
// use pChannel to seek
|
|
|
|
pos.ChannelName = VPChannels[fg.channel]
|
2021-06-09 11:37:55 +08:00
|
|
|
err := fg.seekQueryNodeFlowGraph(pos)
|
|
|
|
if err != nil {
|
|
|
|
errMsg := "msgStream seek error :" + err.Error()
|
|
|
|
log.Error(errMsg)
|
|
|
|
return errors.New(errMsg)
|
|
|
|
}
|
|
|
|
}
|
2021-04-16 14:40:33 +08:00
|
|
|
}
|
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("seek all channel done",
|
|
|
|
zap.Any("collectionID", collectionID),
|
|
|
|
zap.Any("toSeekChannels", toSeekChannels))
|
2021-05-28 15:40:32 +08:00
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
// start flow graphs
|
|
|
|
if loadPartition {
|
|
|
|
err = w.node.streaming.dataSyncService.startPartitionFlowGraph(partitionID, vChannels)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
err = w.node.streaming.dataSyncService.startCollectionFlowGraph(collectionID, vChannels)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-05-28 15:40:32 +08:00
|
|
|
}
|
2021-06-09 11:37:55 +08:00
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(vChannels)))
|
2021-04-16 14:40:33 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *watchDmChannelsTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-04-12 09:18:43 +08:00
|
|
|
// loadSegmentsTask
|
|
|
|
func (l *loadSegmentsTask) Timestamp() Timestamp {
|
2021-04-16 14:40:33 +08:00
|
|
|
if l.req.Base == nil {
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Error("nil base req in loadSegmentsTask")
|
2021-04-16 14:40:33 +08:00
|
|
|
return 0
|
|
|
|
}
|
2021-04-12 09:18:43 +08:00
|
|
|
return l.req.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *loadSegmentsTask) OnEnqueue() error {
|
|
|
|
if l.req == nil || l.req.Base == nil {
|
|
|
|
l.SetID(rand.Int63n(100000000000))
|
|
|
|
} else {
|
|
|
|
l.SetID(l.req.Base.MsgID)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
|
|
|
|
// TODO: support db
|
|
|
|
log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(l.req)))
|
2021-06-15 12:41:40 +08:00
|
|
|
var err error
|
|
|
|
|
|
|
|
switch l.req.LoadCondition {
|
|
|
|
case queryPb.TriggerCondition_handoff:
|
|
|
|
err = l.node.historical.loader.loadSegmentOfConditionHandOff(l.req)
|
|
|
|
case queryPb.TriggerCondition_loadBalance:
|
|
|
|
err = l.node.historical.loader.loadSegmentOfConditionLoadBalance(l.req)
|
|
|
|
case queryPb.TriggerCondition_grpcRequest:
|
|
|
|
err = l.node.historical.loader.loadSegmentOfConditionGRPC(l.req)
|
|
|
|
case queryPb.TriggerCondition_nodeDown:
|
|
|
|
err = l.node.historical.loader.loadSegmentOfConditionNodeDown(l.req)
|
2021-04-12 09:18:43 +08:00
|
|
|
}
|
2021-05-28 10:26:30 +08:00
|
|
|
|
2021-04-12 09:18:43 +08:00
|
|
|
if err != nil {
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Error(err.Error())
|
2021-04-12 09:18:43 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("LoadSegments done", zap.String("SegmentLoadInfos", fmt.Sprintln(l.req.Infos)))
|
2021-04-12 09:18:43 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *loadSegmentsTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// releaseCollectionTask
|
|
|
|
func (r *releaseCollectionTask) Timestamp() Timestamp {
|
2021-04-16 14:40:33 +08:00
|
|
|
if r.req.Base == nil {
|
|
|
|
log.Error("nil base req in releaseCollectionTask", zap.Any("collectionID", r.req.CollectionID))
|
|
|
|
return 0
|
|
|
|
}
|
2021-04-12 09:18:43 +08:00
|
|
|
return r.req.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *releaseCollectionTask) OnEnqueue() error {
|
|
|
|
if r.req == nil || r.req.Base == nil {
|
|
|
|
r.SetID(rand.Int63n(100000000000))
|
|
|
|
} else {
|
|
|
|
r.SetID(r.req.Base.MsgID)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *releaseCollectionTask) PreExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *releaseCollectionTask) Execute(ctx context.Context) error {
|
2021-06-09 11:37:55 +08:00
|
|
|
log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID))
|
|
|
|
collection, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err.Error())
|
2021-06-15 12:41:40 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
collection.setReleaseTime(r.req.Base.Timestamp)
|
|
|
|
|
|
|
|
const gracefulReleaseTime = 3
|
2021-06-16 15:31:57 +08:00
|
|
|
func() { // release synchronously
|
2021-06-15 12:41:40 +08:00
|
|
|
errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
|
|
|
|
time.Sleep(gracefulReleaseTime * time.Second)
|
|
|
|
|
|
|
|
r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID)
|
2021-06-09 11:37:55 +08:00
|
|
|
// remove all tSafes of the target collection
|
2021-06-15 20:06:10 +08:00
|
|
|
for _, channel := range collection.getVChannels() {
|
2021-06-09 11:37:55 +08:00
|
|
|
r.node.streaming.tSafeReplica.removeTSafe(channel)
|
|
|
|
}
|
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID)
|
2021-04-12 09:18:43 +08:00
|
|
|
r.node.searchService.stopSearchCollection(r.req.CollectionID)
|
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
hasCollectionInHistorical := r.node.historical.replica.hasCollection(r.req.CollectionID)
|
|
|
|
if hasCollectionInHistorical {
|
|
|
|
err := r.node.historical.replica.removeCollection(r.req.CollectionID)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(errMsg + err.Error())
|
|
|
|
return
|
|
|
|
}
|
2021-05-28 10:26:30 +08:00
|
|
|
}
|
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID)
|
|
|
|
if hasCollectionInStreaming {
|
|
|
|
err := r.node.streaming.replica.removeCollection(r.req.CollectionID)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(errMsg + err.Error())
|
|
|
|
return
|
|
|
|
}
|
2021-05-28 10:26:30 +08:00
|
|
|
}
|
2021-04-12 09:18:43 +08:00
|
|
|
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
|
|
|
|
}()
|
2021-06-09 11:37:55 +08:00
|
|
|
|
2021-04-12 09:18:43 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *releaseCollectionTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// releasePartitionsTask
|
|
|
|
func (r *releasePartitionsTask) Timestamp() Timestamp {
|
2021-04-16 14:40:33 +08:00
|
|
|
if r.req.Base == nil {
|
|
|
|
log.Error("nil base req in releasePartitionsTask", zap.Any("collectionID", r.req.CollectionID))
|
|
|
|
return 0
|
|
|
|
}
|
2021-04-12 09:18:43 +08:00
|
|
|
return r.req.Base.Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *releasePartitionsTask) OnEnqueue() error {
|
|
|
|
if r.req == nil || r.req.Base == nil {
|
|
|
|
r.SetID(rand.Int63n(100000000000))
|
|
|
|
} else {
|
|
|
|
r.SetID(r.req.Base.MsgID)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *releasePartitionsTask) PreExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *releasePartitionsTask) Execute(ctx context.Context) error {
|
2021-06-15 12:41:40 +08:00
|
|
|
log.Debug("receive release partition task",
|
|
|
|
zap.Any("collectionID", r.req.CollectionID),
|
|
|
|
zap.Any("partitionIDs", r.req.PartitionIDs))
|
|
|
|
|
2021-04-12 09:18:43 +08:00
|
|
|
for _, id := range r.req.PartitionIDs {
|
2021-06-15 12:41:40 +08:00
|
|
|
r.node.streaming.dataSyncService.removePartitionFlowGraph(id)
|
2021-05-28 10:26:30 +08:00
|
|
|
hasPartitionInHistorical := r.node.historical.replica.hasPartition(id)
|
|
|
|
if hasPartitionInHistorical {
|
|
|
|
err := r.node.historical.replica.removePartition(id)
|
|
|
|
if err != nil {
|
|
|
|
// not return, try to release all partitions
|
|
|
|
log.Error(err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id)
|
|
|
|
if hasPartitionInStreaming {
|
|
|
|
err := r.node.streaming.replica.removePartition(id)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err.Error())
|
|
|
|
}
|
2021-04-12 09:18:43 +08:00
|
|
|
}
|
|
|
|
}
|
2021-06-15 12:41:40 +08:00
|
|
|
|
|
|
|
log.Debug("release partition task done",
|
|
|
|
zap.Any("collectionID", r.req.CollectionID),
|
|
|
|
zap.Any("partitionIDs", r.req.PartitionIDs))
|
2021-04-12 09:18:43 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *releasePartitionsTask) PostExecute(ctx context.Context) error {
|
|
|
|
return nil
|
|
|
|
}
|