From 14391fe9a2909e1c3b41134abee94fec82f6dae3 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 27 Aug 2021 17:03:56 +0800 Subject: [PATCH] Release flowgraphs properly (#7266) Signed-off-by: bigsheeper --- internal/querynode/task.go | 194 ++++++++++++++++++++----------------- 1 file changed, 104 insertions(+), 90 deletions(-) diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 2fc868f2c2..e7588a12fb 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -437,58 +437,73 @@ func (r *releaseCollectionTask) PreExecute(ctx context.Context) error { func (r *releaseCollectionTask) Execute(ctx context.Context) error { log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID)) + errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = " collection, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID) if err != nil { - log.Warn(err.Error()) + err = errors.New(errMsg + err.Error()) return err } + + // set release time collection.setReleaseTime(r.req.Base.Timestamp) + // sleep to wait for query tasks done const gracefulReleaseTime = 1 - func() { // release synchronously - errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = " - time.Sleep(gracefulReleaseTime * time.Second) + time.Sleep(gracefulReleaseTime * time.Second) + log.Debug("starting release collection...", + zap.Any("collectionID", r.req.CollectionID), + ) - log.Debug("starting release collection...", + // remove collection flow graph + r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID) + + // remove partition flow graphs which partitions belong to the target collection + partitionIDs, err := r.node.streaming.replica.getPartitionIDs(r.req.CollectionID) + if err != nil { + err = errors.New(errMsg + err.Error()) + return err + } + for _, partitionID := range partitionIDs { + r.node.streaming.dataSyncService.removePartitionFlowGraph(partitionID) + } + + // remove all tSafes of the target collection + for _, channel := range collection.getVChannels() { + log.Debug("releasing tSafe in releaseCollectionTask...", zap.Any("collectionID", r.req.CollectionID), + zap.Any("vChannel", channel), ) - r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID) - // remove all tSafes of the target collection - for _, channel := range collection.getVChannels() { - log.Debug("releasing tSafe in releaseCollectionTask...", - zap.Any("collectionID", r.req.CollectionID), - zap.Any("vChannel", channel), - ) - r.node.streaming.tSafeReplica.removeTSafe(channel) + r.node.streaming.tSafeReplica.removeTSafe(channel) + } + + // remove excludedSegments record + r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID) + + // remove query collection + r.node.queryService.stopQueryCollection(r.req.CollectionID) + + // remove collection metas in streaming and historical + hasCollectionInHistorical := r.node.historical.replica.hasCollection(r.req.CollectionID) + if hasCollectionInHistorical { + err = r.node.historical.replica.removeCollection(r.req.CollectionID) + if err != nil { + err = errors.New(errMsg + err.Error()) + return err } - - r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID) - r.node.queryService.stopQueryCollection(r.req.CollectionID) - - hasCollectionInHistorical := r.node.historical.replica.hasCollection(r.req.CollectionID) - if hasCollectionInHistorical { - err := r.node.historical.replica.removeCollection(r.req.CollectionID) - if err != nil { - log.Warn(errMsg + err.Error()) - return - } + } + hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID) + if hasCollectionInStreaming { + err = r.node.streaming.replica.removeCollection(r.req.CollectionID) + if err != nil { + err = errors.New(errMsg + err.Error()) + return err } + } - hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID) - if hasCollectionInStreaming { - err := r.node.streaming.replica.removeCollection(r.req.CollectionID) - if err != nil { - log.Warn(errMsg + err.Error()) - return - } - } - - // release global segment info - r.node.historical.removeGlobalSegmentIDsByCollectionID(r.req.CollectionID) - - log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID)) - }() + // release global segment info + r.node.historical.removeGlobalSegmentIDsByCollectionID(r.req.CollectionID) + log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID)) return nil } @@ -522,66 +537,65 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { log.Debug("receive release partition task", zap.Any("collectionID", r.req.CollectionID), zap.Any("partitionIDs", r.req.PartitionIDs)) + errMsg := "release partitions failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = " + // sleep to wait for query tasks done const gracefulReleaseTime = 1 - func() { // release synchronously - errMsg := "release partitions failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = " - time.Sleep(gracefulReleaseTime * time.Second) + time.Sleep(gracefulReleaseTime * time.Second) - hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID) - if err != nil { - log.Warn(errMsg + err.Error()) - return + // get collection from streaming and historical + hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID) + if err != nil { + err = errors.New(errMsg + err.Error()) + return err + } + sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID) + if err != nil { + err = errors.New(errMsg + err.Error()) + return err + } + + // release partitions + vChannels := sCol.getVChannels() + for _, id := range r.req.PartitionIDs { + if _, err = r.node.streaming.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil { + r.node.streaming.dataSyncService.removePartitionFlowGraph(id) + // remove all tSafes of the target partition + for _, channel := range vChannels { + log.Debug("releasing tSafe in releasePartitionTask...", + zap.Any("collectionID", r.req.CollectionID), + zap.Any("partitionID", id), + zap.Any("vChannel", channel), + ) + r.node.streaming.tSafeReplica.removeTSafe(channel) + } } - sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID) - if err != nil { - log.Warn(errMsg + err.Error()) - return + // remove partition from streaming and historical + hasPartitionInHistorical := r.node.historical.replica.hasPartition(id) + if hasPartitionInHistorical { + err = r.node.historical.replica.removePartition(id) + if err != nil { + // not return, try to release all partitions + log.Warn(errMsg + err.Error()) + } + } + hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id) + if hasPartitionInStreaming { + err = r.node.streaming.replica.removePartition(id) + if err != nil { + // not return, try to release all partitions + log.Warn(errMsg + err.Error()) + } } - vChannels := sCol.getVChannels() - for _, id := range r.req.PartitionIDs { - if _, err = r.node.streaming.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil { - r.node.streaming.dataSyncService.removePartitionFlowGraph(id) - // remove all tSafes of the target partition - for _, channel := range vChannels { - log.Debug("releasing tSafe in releasePartitionTask...", - zap.Any("collectionID", r.req.CollectionID), - zap.Any("partitionID", id), - zap.Any("vChannel", channel), - ) - r.node.streaming.tSafeReplica.removeTSafe(channel) - } - } + // add released partition record + hCol.addReleasedPartition(id) + sCol.addReleasedPartition(id) + } - hasPartitionInHistorical := r.node.historical.replica.hasPartition(id) - if hasPartitionInHistorical { - err = r.node.historical.replica.removePartition(id) - if err != nil { - // not return, try to release all partitions - log.Warn(errMsg + err.Error()) - } - } - hCol.addReleasedPartition(id) - - hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id) - if hasPartitionInStreaming { - err = r.node.streaming.replica.removePartition(id) - if err != nil { - log.Warn(errMsg + err.Error()) - } - } - sCol.addReleasedPartition(id) - } - - // release global segment info - r.node.historical.removeGlobalSegmentIDsByPartitionIds(r.req.PartitionIDs) - - log.Debug("release partition task done", - zap.Any("collectionID", r.req.CollectionID), - zap.Any("partitionIDs", r.req.PartitionIDs)) - }() + // release global segment info + r.node.historical.removeGlobalSegmentIDsByPartitionIds(r.req.PartitionIDs) log.Debug("release partition task done", zap.Any("collectionID", r.req.CollectionID),