Release flowgraphs properly (#7266)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-08-27 17:03:56 +08:00 committed by GitHub
parent 1e60cccaf9
commit 14391fe9a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -437,58 +437,73 @@ func (r *releaseCollectionTask) PreExecute(ctx context.Context) error {
func (r *releaseCollectionTask) Execute(ctx context.Context) error {
log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID))
errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
collection, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Warn(err.Error())
err = errors.New(errMsg + err.Error())
return err
}
// set release time
collection.setReleaseTime(r.req.Base.Timestamp)
// sleep to wait for query tasks done
const gracefulReleaseTime = 1
func() { // release synchronously
errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
time.Sleep(gracefulReleaseTime * time.Second)
time.Sleep(gracefulReleaseTime * time.Second)
log.Debug("starting release collection...",
zap.Any("collectionID", r.req.CollectionID),
)
log.Debug("starting release collection...",
// remove collection flow graph
r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID)
// remove partition flow graphs which partitions belong to the target collection
partitionIDs, err := r.node.streaming.replica.getPartitionIDs(r.req.CollectionID)
if err != nil {
err = errors.New(errMsg + err.Error())
return err
}
for _, partitionID := range partitionIDs {
r.node.streaming.dataSyncService.removePartitionFlowGraph(partitionID)
}
// remove all tSafes of the target collection
for _, channel := range collection.getVChannels() {
log.Debug("releasing tSafe in releaseCollectionTask...",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("vChannel", channel),
)
r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID)
// remove all tSafes of the target collection
for _, channel := range collection.getVChannels() {
log.Debug("releasing tSafe in releaseCollectionTask...",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("vChannel", channel),
)
r.node.streaming.tSafeReplica.removeTSafe(channel)
r.node.streaming.tSafeReplica.removeTSafe(channel)
}
// remove excludedSegments record
r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID)
// remove query collection
r.node.queryService.stopQueryCollection(r.req.CollectionID)
// remove collection metas in streaming and historical
hasCollectionInHistorical := r.node.historical.replica.hasCollection(r.req.CollectionID)
if hasCollectionInHistorical {
err = r.node.historical.replica.removeCollection(r.req.CollectionID)
if err != nil {
err = errors.New(errMsg + err.Error())
return err
}
r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID)
r.node.queryService.stopQueryCollection(r.req.CollectionID)
hasCollectionInHistorical := r.node.historical.replica.hasCollection(r.req.CollectionID)
if hasCollectionInHistorical {
err := r.node.historical.replica.removeCollection(r.req.CollectionID)
if err != nil {
log.Warn(errMsg + err.Error())
return
}
}
hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID)
if hasCollectionInStreaming {
err = r.node.streaming.replica.removeCollection(r.req.CollectionID)
if err != nil {
err = errors.New(errMsg + err.Error())
return err
}
}
hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID)
if hasCollectionInStreaming {
err := r.node.streaming.replica.removeCollection(r.req.CollectionID)
if err != nil {
log.Warn(errMsg + err.Error())
return
}
}
// release global segment info
r.node.historical.removeGlobalSegmentIDsByCollectionID(r.req.CollectionID)
log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
}()
// release global segment info
r.node.historical.removeGlobalSegmentIDsByCollectionID(r.req.CollectionID)
log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
return nil
}
@ -522,66 +537,65 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
log.Debug("receive release partition task",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
errMsg := "release partitions failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
// sleep to wait for query tasks done
const gracefulReleaseTime = 1
func() { // release synchronously
errMsg := "release partitions failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
time.Sleep(gracefulReleaseTime * time.Second)
time.Sleep(gracefulReleaseTime * time.Second)
hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Warn(errMsg + err.Error())
return
// get collection from streaming and historical
hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
err = errors.New(errMsg + err.Error())
return err
}
sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
err = errors.New(errMsg + err.Error())
return err
}
// release partitions
vChannels := sCol.getVChannels()
for _, id := range r.req.PartitionIDs {
if _, err = r.node.streaming.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil {
r.node.streaming.dataSyncService.removePartitionFlowGraph(id)
// remove all tSafes of the target partition
for _, channel := range vChannels {
log.Debug("releasing tSafe in releasePartitionTask...",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionID", id),
zap.Any("vChannel", channel),
)
r.node.streaming.tSafeReplica.removeTSafe(channel)
}
}
sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Warn(errMsg + err.Error())
return
// remove partition from streaming and historical
hasPartitionInHistorical := r.node.historical.replica.hasPartition(id)
if hasPartitionInHistorical {
err = r.node.historical.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Warn(errMsg + err.Error())
}
}
hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id)
if hasPartitionInStreaming {
err = r.node.streaming.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Warn(errMsg + err.Error())
}
}
vChannels := sCol.getVChannels()
for _, id := range r.req.PartitionIDs {
if _, err = r.node.streaming.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil {
r.node.streaming.dataSyncService.removePartitionFlowGraph(id)
// remove all tSafes of the target partition
for _, channel := range vChannels {
log.Debug("releasing tSafe in releasePartitionTask...",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionID", id),
zap.Any("vChannel", channel),
)
r.node.streaming.tSafeReplica.removeTSafe(channel)
}
}
// add released partition record
hCol.addReleasedPartition(id)
sCol.addReleasedPartition(id)
}
hasPartitionInHistorical := r.node.historical.replica.hasPartition(id)
if hasPartitionInHistorical {
err = r.node.historical.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Warn(errMsg + err.Error())
}
}
hCol.addReleasedPartition(id)
hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id)
if hasPartitionInStreaming {
err = r.node.streaming.replica.removePartition(id)
if err != nil {
log.Warn(errMsg + err.Error())
}
}
sCol.addReleasedPartition(id)
}
// release global segment info
r.node.historical.removeGlobalSegmentIDsByPartitionIds(r.req.PartitionIDs)
log.Debug("release partition task done",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
}()
// release global segment info
r.node.historical.removeGlobalSegmentIDsByPartitionIds(r.req.PartitionIDs)
log.Debug("release partition task done",
zap.Any("collectionID", r.req.CollectionID),