milvus/internal/querynode/task.go

223 lines
4.9 KiB
Go
Raw Normal View History

package querynode
import (
"context"
"errors"
"fmt"
"math/rand"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
type task interface {
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
Timestamp() Timestamp
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
OnEnqueue() error
}
type baseTask struct {
done chan error
ctx context.Context
id UniqueID
}
type loadSegmentsTask struct {
baseTask
req *queryPb.LoadSegmentsRequest
node *QueryNode
}
type releaseCollectionTask struct {
baseTask
req *queryPb.ReleaseCollectionRequest
node *QueryNode
}
type releasePartitionsTask struct {
baseTask
req *queryPb.ReleasePartitionsRequest
node *QueryNode
}
func (b *baseTask) ID() UniqueID {
return b.id
}
func (b *baseTask) SetID(uid UniqueID) {
b.id = uid
}
func (b *baseTask) WaitToFinish() error {
select {
case <-b.ctx.Done():
return errors.New("task timeout")
case err := <-b.done:
return err
}
}
func (b *baseTask) Notify(err error) {
b.done <- err
}
// loadSegmentsTask
func (l *loadSegmentsTask) Timestamp() Timestamp {
return l.req.Base.Timestamp
}
func (l *loadSegmentsTask) OnEnqueue() error {
if l.req == nil || l.req.Base == nil {
l.SetID(rand.Int63n(100000000000))
} else {
l.SetID(l.req.Base.MsgID)
}
return nil
}
func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
return nil
}
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
// TODO: support db
collectionID := l.req.CollectionID
partitionID := l.req.PartitionID
segmentIDs := l.req.SegmentIDs
fieldIDs := l.req.FieldIDs
schema := l.req.Schema
log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(l.req)))
hasCollection := l.node.replica.hasCollection(collectionID)
hasPartition := l.node.replica.hasPartition(partitionID)
if !hasCollection {
// loading init
err := l.node.replica.addCollection(collectionID, schema)
if err != nil {
return err
}
l.node.replica.initExcludedSegments(collectionID)
newDS := newDataSyncService(l.node.queryNodeLoopCtx, l.node.replica, l.node.msFactory, collectionID)
// ignore duplicated dataSyncService error
_ = l.node.addDataSyncService(collectionID, newDS)
ds, err := l.node.getDataSyncService(collectionID)
if err != nil {
return err
}
go ds.start()
l.node.searchService.startSearchCollection(collectionID)
}
if !hasPartition {
err := l.node.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
err := l.node.replica.enablePartition(partitionID)
if err != nil {
return err
}
if len(segmentIDs) == 0 {
return nil
}
err = l.node.loadService.loadSegmentPassively(collectionID, partitionID, segmentIDs, fieldIDs)
if err != nil {
return err
}
log.Debug("LoadSegments done", zap.String("segmentIDs", fmt.Sprintln(l.req.SegmentIDs)))
return nil
}
func (l *loadSegmentsTask) PostExecute(ctx context.Context) error {
return nil
}
// releaseCollectionTask
func (r *releaseCollectionTask) Timestamp() Timestamp {
return r.req.Base.Timestamp
}
func (r *releaseCollectionTask) OnEnqueue() error {
if r.req == nil || r.req.Base == nil {
r.SetID(rand.Int63n(100000000000))
} else {
r.SetID(r.req.Base.MsgID)
}
return nil
}
func (r *releaseCollectionTask) PreExecute(ctx context.Context) error {
return nil
}
func (r *releaseCollectionTask) Execute(ctx context.Context) error {
ds, err := r.node.getDataSyncService(r.req.CollectionID)
if err == nil && ds != nil {
ds.close()
r.node.removeDataSyncService(r.req.CollectionID)
r.node.replica.removeTSafe(r.req.CollectionID)
r.node.replica.removeExcludedSegments(r.req.CollectionID)
}
if r.node.searchService.hasSearchCollection(r.req.CollectionID) {
r.node.searchService.stopSearchCollection(r.req.CollectionID)
}
err = r.node.replica.removeCollection(r.req.CollectionID)
if err != nil {
return err
}
log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
return nil
}
func (r *releaseCollectionTask) PostExecute(ctx context.Context) error {
return nil
}
// releasePartitionsTask
func (r *releasePartitionsTask) Timestamp() Timestamp {
return r.req.Base.Timestamp
}
func (r *releasePartitionsTask) OnEnqueue() error {
if r.req == nil || r.req.Base == nil {
r.SetID(rand.Int63n(100000000000))
} else {
r.SetID(r.req.Base.MsgID)
}
return nil
}
func (r *releasePartitionsTask) PreExecute(ctx context.Context) error {
return nil
}
func (r *releasePartitionsTask) Execute(ctx context.Context) error {
for _, id := range r.req.PartitionIDs {
err := r.node.loadService.segLoader.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Error(err.Error())
}
}
return nil
}
func (r *releasePartitionsTask) PostExecute(ctx context.Context) error {
return nil
}