2023-03-27 00:42:00 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package delegator
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"math/rand"
|
|
|
|
|
|
|
|
"github.com/cockroachdb/errors"
|
2023-04-06 19:14:32 +08:00
|
|
|
"github.com/samber/lo"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
2023-06-09 01:28:37 +08:00
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
2023-03-27 00:42:00 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
|
|
|
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
|
|
|
|
"github.com/milvus-io/milvus/internal/querynodev2/delegator/deletebuffer"
|
|
|
|
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
|
|
|
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
|
|
|
"github.com/milvus-io/milvus/internal/storage"
|
2023-04-06 19:14:32 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/common"
|
2023-07-24 14:09:00 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/log"
|
2023-07-21 15:30:59 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/metrics"
|
2023-04-06 19:14:32 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
|
|
|
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/retry"
|
2023-07-21 15:30:59 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
2023-04-06 19:14:32 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
2023-03-27 00:42:00 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
// delegator data related part
|
|
|
|
|
|
|
|
// InsertData
|
|
|
|
type InsertData struct {
|
|
|
|
RowIDs []int64
|
|
|
|
PrimaryKeys []storage.PrimaryKey
|
|
|
|
Timestamps []uint64
|
|
|
|
InsertRecord *segcorepb.InsertRecord
|
|
|
|
StartPosition *msgpb.MsgPosition
|
|
|
|
PartitionID int64
|
|
|
|
}
|
|
|
|
|
|
|
|
type DeleteData struct {
|
|
|
|
PartitionID int64
|
|
|
|
PrimaryKeys []storage.PrimaryKey
|
|
|
|
Timestamps []uint64
|
|
|
|
RowCount int64
|
|
|
|
}
|
|
|
|
|
|
|
|
// Append appends another delete data into this one.
|
|
|
|
func (d *DeleteData) Append(ad DeleteData) {
|
|
|
|
d.PrimaryKeys = append(d.PrimaryKeys, ad.PrimaryKeys...)
|
|
|
|
d.Timestamps = append(d.Timestamps, ad.Timestamps...)
|
|
|
|
d.RowCount += ad.RowCount
|
|
|
|
}
|
|
|
|
|
|
|
|
// ProcessInsert handles insert data in delegator.
|
|
|
|
func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
2023-07-21 15:30:59 +08:00
|
|
|
method := "ProcessInsert"
|
|
|
|
tr := timerecord.NewTimeRecorder(method)
|
2023-03-27 00:42:00 +08:00
|
|
|
log := sd.getLogger(context.Background())
|
|
|
|
for segmentID, insertData := range insertRecords {
|
|
|
|
growing := sd.segmentManager.GetGrowing(segmentID)
|
|
|
|
if growing == nil {
|
2023-10-26 10:10:10 +08:00
|
|
|
var err error
|
|
|
|
growing, err = segments.NewSegment(sd.collection, segmentID, insertData.PartitionID, sd.collectionID, sd.vchannelName,
|
|
|
|
segments.SegmentTypeGrowing, 0, insertData.StartPosition, insertData.StartPosition)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("failed to create new segment",
|
|
|
|
zap.Int64("segmentID", segmentID),
|
|
|
|
zap.Error(err))
|
|
|
|
panic(err)
|
|
|
|
}
|
2023-03-27 00:42:00 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
err := growing.Insert(insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("failed to insert data into growing segment",
|
|
|
|
zap.Int64("segmentID", segmentID),
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
2023-07-12 19:48:29 +08:00
|
|
|
if errors.IsAny(err, merr.ErrSegmentNotLoaded, merr.ErrSegmentNotFound) {
|
|
|
|
log.Warn("try to insert data into released segment, skip it", zap.Error(err))
|
|
|
|
continue
|
|
|
|
}
|
2023-03-27 00:42:00 +08:00
|
|
|
// panic here, insert failure
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
growing.UpdateBloomFilter(insertData.PrimaryKeys)
|
|
|
|
|
2023-10-26 10:10:10 +08:00
|
|
|
if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) {
|
|
|
|
// register created growing segment after insert, avoid to add empty growing to delegator
|
|
|
|
sd.pkOracle.Register(growing, paramtable.GetNodeID())
|
|
|
|
sd.segmentManager.Put(segments.SegmentTypeGrowing, growing)
|
|
|
|
sd.addGrowing(SegmentEntry{
|
|
|
|
NodeID: paramtable.GetNodeID(),
|
|
|
|
SegmentID: segmentID,
|
|
|
|
PartitionID: insertData.PartitionID,
|
|
|
|
Version: 0,
|
|
|
|
TargetVersion: initialTargetVersion,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2023-03-27 00:42:00 +08:00
|
|
|
log.Debug("insert into growing segment",
|
|
|
|
zap.Int64("collectionID", growing.Collection()),
|
|
|
|
zap.Int64("segmentID", segmentID),
|
|
|
|
zap.Int("rowCount", len(insertData.RowIDs)),
|
|
|
|
zap.Uint64("maxTimestamp", insertData.Timestamps[len(insertData.Timestamps)-1]),
|
|
|
|
)
|
|
|
|
}
|
2023-07-21 15:30:59 +08:00
|
|
|
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).
|
2023-07-25 15:07:01 +08:00
|
|
|
Observe(float64(tr.ElapseSpan().Milliseconds()))
|
2023-03-27 00:42:00 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// ProcessDelete handles delete data in delegator.
|
|
|
|
// delegator puts deleteData into buffer first,
|
|
|
|
// then dispatch data to segments acoording to the result of pkOracle.
|
|
|
|
func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
|
2023-07-21 15:30:59 +08:00
|
|
|
method := "ProcessDelete"
|
|
|
|
tr := timerecord.NewTimeRecorder(method)
|
2023-03-27 00:42:00 +08:00
|
|
|
// block load segment handle delete buffer
|
|
|
|
sd.deleteMut.Lock()
|
|
|
|
defer sd.deleteMut.Unlock()
|
|
|
|
|
|
|
|
log := sd.getLogger(context.Background())
|
|
|
|
|
|
|
|
log.Debug("start to process delete", zap.Uint64("ts", ts))
|
|
|
|
// add deleteData into buffer.
|
|
|
|
cacheItems := make([]deletebuffer.BufferItem, 0, len(deleteData))
|
|
|
|
for _, entry := range deleteData {
|
|
|
|
cacheItems = append(cacheItems, deletebuffer.BufferItem{
|
|
|
|
PartitionID: entry.PartitionID,
|
|
|
|
DeleteData: storage.DeleteData{
|
|
|
|
Pks: entry.PrimaryKeys,
|
|
|
|
Tss: entry.Timestamps,
|
|
|
|
RowCount: entry.RowCount,
|
|
|
|
},
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
sd.deleteBuffer.Put(&deletebuffer.Item{
|
|
|
|
Ts: ts,
|
|
|
|
Data: cacheItems,
|
|
|
|
})
|
|
|
|
|
|
|
|
// segment => delete data
|
|
|
|
delRecords := make(map[int64]DeleteData)
|
|
|
|
for _, data := range deleteData {
|
|
|
|
for i, pk := range data.PrimaryKeys {
|
|
|
|
segmentIDs, err := sd.pkOracle.Get(pk, pkoracle.WithPartitionID(data.PartitionID))
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("failed to get delete candidates for pk", zap.Any("pk", pk.GetValue()))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
for _, segmentID := range segmentIDs {
|
|
|
|
delRecord := delRecords[segmentID]
|
|
|
|
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pk)
|
|
|
|
delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[i])
|
|
|
|
delRecord.RowCount++
|
|
|
|
delRecords[segmentID] = delRecord
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
offlineSegments := typeutil.NewConcurrentSet[int64]()
|
|
|
|
|
2023-11-01 20:04:24 +08:00
|
|
|
sealed, growing, version := sd.distribution.PinOnlineSegments()
|
2023-03-27 00:42:00 +08:00
|
|
|
|
|
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
|
|
for _, entry := range sealed {
|
|
|
|
entry := entry
|
|
|
|
eg.Go(func() error {
|
2023-09-05 10:05:48 +08:00
|
|
|
worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID)
|
2023-03-27 00:42:00 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Warn("failed to get worker",
|
|
|
|
zap.Int64("nodeID", paramtable.GetNodeID()),
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
|
|
|
// skip if node down
|
|
|
|
// delete will be processed after loaded again
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments)...)
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
if len(growing) > 0 {
|
|
|
|
eg.Go(func() error {
|
2023-09-05 10:05:48 +08:00
|
|
|
worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID())
|
2023-03-27 00:42:00 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Error("failed to get worker(local)",
|
|
|
|
zap.Int64("nodeID", paramtable.GetNodeID()),
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
|
|
|
// panic here, local worker shall not have error
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing)...)
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// not error return in apply delete
|
|
|
|
_ = eg.Wait()
|
|
|
|
|
2023-11-01 20:04:24 +08:00
|
|
|
sd.distribution.Unpin(version)
|
2023-03-27 00:42:00 +08:00
|
|
|
offlineSegIDs := offlineSegments.Collect()
|
|
|
|
if len(offlineSegIDs) > 0 {
|
|
|
|
log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs))
|
|
|
|
sd.markSegmentOffline(offlineSegIDs...)
|
|
|
|
}
|
2023-07-21 15:30:59 +08:00
|
|
|
|
|
|
|
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
|
2023-07-25 15:07:01 +08:00
|
|
|
Observe(float64(tr.ElapseSpan().Milliseconds()))
|
2023-03-27 00:42:00 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// applyDelete handles delete record and apply them to corresponding workers.
|
|
|
|
func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry) []int64 {
|
|
|
|
var offlineSegments []int64
|
|
|
|
log := sd.getLogger(ctx)
|
|
|
|
for _, segmentEntry := range entries {
|
|
|
|
log := log.With(
|
|
|
|
zap.Int64("segmentID", segmentEntry.SegmentID),
|
|
|
|
zap.Int64("workerID", nodeID),
|
|
|
|
)
|
|
|
|
delRecord, ok := delRecords[segmentEntry.SegmentID]
|
|
|
|
if ok {
|
|
|
|
log.Debug("delegator plan to applyDelete via worker")
|
|
|
|
err := retry.Do(ctx, func() error {
|
2023-08-23 10:10:22 +08:00
|
|
|
if sd.Stopped() {
|
2023-09-04 09:57:09 +08:00
|
|
|
return retry.Unrecoverable(merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing"))
|
2023-08-23 10:10:22 +08:00
|
|
|
}
|
|
|
|
|
2023-03-27 00:42:00 +08:00
|
|
|
err := worker.Delete(ctx, &querypb.DeleteRequest{
|
|
|
|
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
|
|
|
|
CollectionId: sd.collectionID,
|
|
|
|
PartitionId: segmentEntry.PartitionID,
|
|
|
|
VchannelName: sd.vchannelName,
|
|
|
|
SegmentId: segmentEntry.SegmentID,
|
|
|
|
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
|
|
|
|
Timestamps: delRecord.Timestamps,
|
|
|
|
})
|
2023-11-03 10:14:16 +08:00
|
|
|
if errors.Is(err, merr.ErrNodeNotFound) {
|
|
|
|
log.Warn("try to delete data on non-exist node")
|
|
|
|
return retry.Unrecoverable(err)
|
|
|
|
} else if errors.Is(err, merr.ErrSegmentNotFound) {
|
2023-03-27 00:42:00 +08:00
|
|
|
log.Warn("try to delete data of released segment")
|
|
|
|
return nil
|
|
|
|
} else if err != nil {
|
|
|
|
log.Warn("worker failed to delete on segment",
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}, retry.Attempts(10))
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("apply delete for segment failed, marking it offline")
|
|
|
|
offlineSegments = append(offlineSegments, segmentEntry.SegmentID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return offlineSegments
|
|
|
|
}
|
|
|
|
|
|
|
|
// markSegmentOffline makes segment go offline and waits for QueryCoord to fix.
|
|
|
|
func (sd *shardDelegator) markSegmentOffline(segmentIDs ...int64) {
|
|
|
|
sd.distribution.AddOfflines(segmentIDs...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// addGrowing add growing segment record for delegator.
|
|
|
|
func (sd *shardDelegator) addGrowing(entries ...SegmentEntry) {
|
|
|
|
log := sd.getLogger(context.Background())
|
|
|
|
log.Info("add growing segments to delegator", zap.Int64s("segmentIDs", lo.Map(entries, func(entry SegmentEntry, _ int) int64 {
|
|
|
|
return entry.SegmentID
|
|
|
|
})))
|
|
|
|
sd.distribution.AddGrowing(entries...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// LoadGrowing load growing segments locally.
|
|
|
|
func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error {
|
|
|
|
log := sd.getLogger(ctx)
|
|
|
|
|
2023-05-11 15:33:24 +08:00
|
|
|
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })
|
|
|
|
log.Info("loading growing segments...", zap.Int64s("segmentIDs", segmentIDs))
|
2023-03-27 00:42:00 +08:00
|
|
|
loaded, err := sd.loader.Load(ctx, sd.collectionID, segments.SegmentTypeGrowing, version, infos...)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("failed to load growing segment", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2023-05-11 15:33:24 +08:00
|
|
|
|
|
|
|
segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() })
|
|
|
|
log.Info("load growing segments done", zap.Int64s("segmentIDs", segmentIDs))
|
|
|
|
|
2023-03-27 00:42:00 +08:00
|
|
|
for _, candidate := range loaded {
|
|
|
|
sd.pkOracle.Register(candidate, paramtable.GetNodeID())
|
|
|
|
}
|
|
|
|
sd.addGrowing(lo.Map(loaded, func(segment segments.Segment, _ int) SegmentEntry {
|
|
|
|
return SegmentEntry{
|
2023-06-27 11:48:45 +08:00
|
|
|
NodeID: paramtable.GetNodeID(),
|
|
|
|
SegmentID: segment.ID(),
|
|
|
|
PartitionID: segment.Partition(),
|
|
|
|
Version: version,
|
|
|
|
TargetVersion: sd.distribution.getTargetVersion(),
|
2023-03-27 00:42:00 +08:00
|
|
|
}
|
|
|
|
})...)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// LoadSegments load segments local or remotely depends on the target node.
|
|
|
|
func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
|
|
|
|
log := sd.getLogger(ctx)
|
|
|
|
|
|
|
|
targetNodeID := req.GetDstNodeID()
|
|
|
|
// add common log fields
|
|
|
|
log = log.With(
|
|
|
|
zap.Int64("workID", req.GetDstNodeID()),
|
|
|
|
zap.Int64s("segments", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })),
|
|
|
|
)
|
|
|
|
|
2023-09-05 10:05:48 +08:00
|
|
|
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
|
2023-03-27 00:42:00 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Warn("delegator failed to find worker", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// load bloom filter only when candidate not exists
|
|
|
|
infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool {
|
|
|
|
return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID)
|
|
|
|
})
|
|
|
|
candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("failed to load bloom filter set for segment", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
req.Base.TargetID = req.GetDstNodeID()
|
2023-09-18 10:57:20 +08:00
|
|
|
log.Debug("worker loads segments...")
|
2023-09-10 07:41:18 +08:00
|
|
|
|
|
|
|
sLoad := func(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
|
|
|
|
segmentID := req.GetInfos()[0].GetSegmentID()
|
|
|
|
nodeID := req.GetDstNodeID()
|
|
|
|
_, err, _ := sd.sf.Do(fmt.Sprintf("%d-%d", nodeID, segmentID), func() (struct{}, error) {
|
|
|
|
err := worker.LoadSegments(ctx, req)
|
|
|
|
return struct{}{}, err
|
|
|
|
})
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// separate infos into different load task
|
|
|
|
if len(req.GetInfos()) > 1 {
|
|
|
|
var reqs []*querypb.LoadSegmentsRequest
|
|
|
|
for _, info := range req.GetInfos() {
|
|
|
|
newReq := typeutil.Clone(req)
|
|
|
|
newReq.Infos = []*querypb.SegmentLoadInfo{info}
|
|
|
|
reqs = append(reqs, newReq)
|
|
|
|
}
|
|
|
|
|
|
|
|
group, ctx := errgroup.WithContext(ctx)
|
|
|
|
for _, req := range reqs {
|
|
|
|
req := req
|
|
|
|
group.Go(func() error {
|
|
|
|
return sLoad(ctx, req)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
err = group.Wait()
|
|
|
|
} else {
|
|
|
|
err = sLoad(ctx, req)
|
|
|
|
}
|
|
|
|
|
2023-03-27 00:42:00 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Warn("worker failed to load segments", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2023-09-18 10:57:20 +08:00
|
|
|
log.Debug("work loads segments done")
|
2023-03-27 00:42:00 +08:00
|
|
|
|
2023-07-18 10:51:19 +08:00
|
|
|
// load index need no stream delete and distribution change
|
|
|
|
if req.GetLoadScope() == querypb.LoadScope_Index {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-03-27 00:42:00 +08:00
|
|
|
entries := lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) SegmentEntry {
|
|
|
|
return SegmentEntry{
|
2023-08-08 11:17:08 +08:00
|
|
|
SegmentID: info.GetSegmentID(),
|
|
|
|
PartitionID: info.GetPartitionID(),
|
|
|
|
NodeID: req.GetDstNodeID(),
|
|
|
|
Version: req.GetVersion(),
|
2023-03-27 00:42:00 +08:00
|
|
|
}
|
|
|
|
})
|
2023-09-18 10:57:20 +08:00
|
|
|
log.Debug("load delete...")
|
2023-08-09 13:05:15 +08:00
|
|
|
err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("load stream delete failed", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2023-03-27 00:42:00 +08:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-05-09 19:10:41 +08:00
|
|
|
func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
|
|
|
|
candidates []*pkoracle.BloomFilterSet,
|
|
|
|
infos []*querypb.SegmentLoadInfo,
|
|
|
|
deltaPositions []*msgpb.MsgPosition,
|
2023-08-09 13:05:15 +08:00
|
|
|
targetNodeID int64,
|
|
|
|
worker cluster.Worker,
|
|
|
|
entries []SegmentEntry,
|
|
|
|
) error {
|
2023-03-27 00:42:00 +08:00
|
|
|
log := sd.getLogger(ctx)
|
|
|
|
|
|
|
|
idCandidates := lo.SliceToMap(candidates, func(candidate *pkoracle.BloomFilterSet) (int64, *pkoracle.BloomFilterSet) {
|
|
|
|
return candidate.ID(), candidate
|
|
|
|
})
|
|
|
|
|
2023-07-17 09:30:34 +08:00
|
|
|
sd.deleteMut.Lock()
|
|
|
|
defer sd.deleteMut.Unlock()
|
2023-03-27 00:42:00 +08:00
|
|
|
// apply buffered delete for new segments
|
|
|
|
// no goroutines here since qnv2 has no load merging logic
|
2023-10-13 16:31:35 +08:00
|
|
|
for _, info := range infos {
|
2023-03-27 00:42:00 +08:00
|
|
|
candidate := idCandidates[info.GetSegmentID()]
|
2023-05-09 19:10:41 +08:00
|
|
|
position := info.GetDeltaPosition()
|
|
|
|
if position == nil { // for compatibility of rolling upgrade from 2.2.x to 2.3
|
2023-10-13 16:31:35 +08:00
|
|
|
// During rolling upgrade, Querynode(2.3) may receive merged LoadSegmentRequest
|
|
|
|
// from QueryCoord(2.2); In version 2.2.x, only segments with the same dmlChannel
|
|
|
|
// can be merged, and deltaPositions will be merged into a single deltaPosition,
|
|
|
|
// so we should use `deltaPositions[0]` as the seek position for all the segments
|
|
|
|
// within the same LoadSegmentRequest.
|
|
|
|
position = deltaPositions[0]
|
2023-05-09 19:10:41 +08:00
|
|
|
}
|
2023-03-27 00:42:00 +08:00
|
|
|
|
|
|
|
deleteData := &storage.DeleteData{}
|
|
|
|
// start position is dml position for segment
|
|
|
|
// if this position is before deleteBuffer's safe ts, it means some delete shall be read from msgstream
|
2023-05-09 19:10:41 +08:00
|
|
|
if position.GetTimestamp() < sd.deleteBuffer.SafeTs() {
|
2023-03-27 00:42:00 +08:00
|
|
|
log.Info("load delete from stream...")
|
|
|
|
var err error
|
2023-05-09 19:10:41 +08:00
|
|
|
deleteData, err = sd.readDeleteFromMsgstream(ctx, position, sd.deleteBuffer.SafeTs(), candidate)
|
2023-03-27 00:42:00 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Warn("failed to read delete data from msgstream", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
log.Info("load delete from stream done")
|
|
|
|
}
|
|
|
|
|
|
|
|
// list buffered delete
|
2023-05-09 19:10:41 +08:00
|
|
|
deleteRecords := sd.deleteBuffer.ListAfter(position.GetTimestamp())
|
2023-03-27 00:42:00 +08:00
|
|
|
for _, entry := range deleteRecords {
|
|
|
|
for _, record := range entry.Data {
|
|
|
|
if record.PartitionID != common.InvalidPartitionID && candidate.Partition() != record.PartitionID {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
for i, pk := range record.DeleteData.Pks {
|
|
|
|
if candidate.MayPkExist(pk) {
|
|
|
|
deleteData.Pks = append(deleteData.Pks, pk)
|
|
|
|
deleteData.Tss = append(deleteData.Tss, record.DeleteData.Tss[i])
|
|
|
|
deleteData.RowCount++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// if delete count not empty, apply
|
|
|
|
if deleteData.RowCount > 0 {
|
|
|
|
log.Info("forward delete to worker...", zap.Int64("deleteRowNum", deleteData.RowCount))
|
|
|
|
err := worker.Delete(ctx, &querypb.DeleteRequest{
|
|
|
|
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
|
|
|
|
CollectionId: info.GetCollectionID(),
|
|
|
|
PartitionId: info.GetPartitionID(),
|
|
|
|
SegmentId: info.GetSegmentID(),
|
|
|
|
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
|
|
|
|
Timestamps: deleteData.Tss,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// add candidate after load success
|
|
|
|
for _, candidate := range candidates {
|
|
|
|
log.Info("register sealed segment bfs into pko candidates",
|
|
|
|
zap.Int64("segmentID", candidate.ID()),
|
|
|
|
)
|
|
|
|
sd.pkOracle.Register(candidate, targetNodeID)
|
|
|
|
}
|
2023-08-09 13:05:15 +08:00
|
|
|
log.Info("load delete done")
|
|
|
|
// alter distribution
|
|
|
|
sd.distribution.AddDistributions(entries...)
|
2023-03-27 00:42:00 +08:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position *msgpb.MsgPosition, safeTs uint64, candidate *pkoracle.BloomFilterSet) (*storage.DeleteData, error) {
|
|
|
|
log := sd.getLogger(ctx).With(
|
|
|
|
zap.String("channel", position.ChannelName),
|
|
|
|
zap.Int64("segmentID", candidate.ID()),
|
|
|
|
)
|
|
|
|
stream, err := sd.factory.NewTtMsgStream(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-07-12 17:26:30 +08:00
|
|
|
defer stream.Close()
|
2023-03-27 00:42:00 +08:00
|
|
|
vchannelName := position.ChannelName
|
|
|
|
pChannelName := funcutil.ToPhysicalChannel(vchannelName)
|
|
|
|
position.ChannelName = pChannelName
|
|
|
|
|
|
|
|
ts, _ := tsoutil.ParseTS(position.Timestamp)
|
|
|
|
|
|
|
|
// Random the subname in case we trying to load same delta at the same time
|
|
|
|
subName := fmt.Sprintf("querynode-delta-loader-%d-%d-%d", paramtable.GetNodeID(), sd.collectionID, rand.Int())
|
2023-07-25 10:43:04 +08:00
|
|
|
log.Info("from dml check point load delete", zap.Any("position", position), zap.String("vChannel", vchannelName), zap.String("subName", subName), zap.Time("positionTs", ts))
|
2023-09-08 09:51:17 +08:00
|
|
|
err = stream.AsConsumer(context.TODO(), []string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-03-27 00:42:00 +08:00
|
|
|
|
2023-09-08 09:51:17 +08:00
|
|
|
err = stream.Seek(context.TODO(), []*msgpb.MsgPosition{position})
|
2023-03-27 00:42:00 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
result := &storage.DeleteData{}
|
|
|
|
hasMore := true
|
|
|
|
for hasMore {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
log.Debug("read delta msg from seek position done", zap.Error(ctx.Err()))
|
|
|
|
return nil, ctx.Err()
|
|
|
|
case msgPack, ok := <-stream.Chan():
|
|
|
|
if !ok {
|
|
|
|
err = fmt.Errorf("stream channel closed, pChannelName=%v, msgID=%v", pChannelName, position.GetMsgID())
|
|
|
|
log.Warn("fail to read delta msg",
|
|
|
|
zap.String("pChannelName", pChannelName),
|
|
|
|
zap.Binary("msgID", position.GetMsgID()),
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if msgPack == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, tsMsg := range msgPack.Msgs {
|
|
|
|
if tsMsg.Type() == commonpb.MsgType_Delete {
|
|
|
|
dmsg := tsMsg.(*msgstream.DeleteMsg)
|
|
|
|
if dmsg.CollectionID != sd.collectionID || dmsg.GetPartitionID() != candidate.Partition() {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for idx, pk := range storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) {
|
|
|
|
if candidate.MayPkExist(pk) {
|
|
|
|
result.Pks = append(result.Pks, pk)
|
|
|
|
result.Tss = append(result.Tss, dmsg.Timestamps[idx])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// reach safe ts
|
|
|
|
if safeTs <= msgPack.EndPositions[0].GetTimestamp() {
|
|
|
|
hasMore = false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return result, nil
|
|
|
|
}
|
|
|
|
|
2023-04-07 19:32:29 +08:00
|
|
|
// ReleaseSegments releases segments local or remotely depending on the target node.
|
2023-03-27 00:42:00 +08:00
|
|
|
func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error {
|
|
|
|
log := sd.getLogger(ctx)
|
|
|
|
|
|
|
|
targetNodeID := req.GetNodeID()
|
|
|
|
// add common log fields
|
|
|
|
log = log.With(
|
|
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
|
|
|
zap.Int64("nodeID", req.GetNodeID()),
|
|
|
|
zap.String("scope", req.GetScope().String()),
|
|
|
|
zap.Bool("force", force))
|
|
|
|
|
|
|
|
log.Info("delegator start to release segments")
|
|
|
|
// alter distribution first
|
|
|
|
var sealed, growing []SegmentEntry
|
|
|
|
convertSealed := func(segmentID int64, _ int) SegmentEntry {
|
|
|
|
return SegmentEntry{
|
|
|
|
SegmentID: segmentID,
|
|
|
|
NodeID: targetNodeID,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
convertGrowing := func(segmentID int64, _ int) SegmentEntry {
|
|
|
|
return SegmentEntry{
|
|
|
|
SegmentID: segmentID,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
switch req.GetScope() {
|
|
|
|
case querypb.DataScope_All:
|
|
|
|
sealed = lo.Map(req.GetSegmentIDs(), convertSealed)
|
|
|
|
growing = lo.Map(req.GetSegmentIDs(), convertGrowing)
|
|
|
|
case querypb.DataScope_Streaming:
|
|
|
|
growing = lo.Map(req.GetSegmentIDs(), convertGrowing)
|
|
|
|
case querypb.DataScope_Historical:
|
|
|
|
sealed = lo.Map(req.GetSegmentIDs(), convertSealed)
|
|
|
|
}
|
|
|
|
|
|
|
|
signal := sd.distribution.RemoveDistributions(sealed, growing)
|
|
|
|
// wait cleared signal
|
|
|
|
<-signal
|
|
|
|
if len(sealed) > 0 {
|
|
|
|
sd.pkOracle.Remove(
|
|
|
|
pkoracle.WithSegmentIDs(lo.Map(sealed, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...),
|
|
|
|
pkoracle.WithSegmentType(commonpb.SegmentState_Sealed),
|
|
|
|
pkoracle.WithWorkerID(targetNodeID),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
if len(growing) > 0 {
|
|
|
|
sd.pkOracle.Remove(
|
|
|
|
pkoracle.WithSegmentIDs(lo.Map(growing, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...),
|
|
|
|
pkoracle.WithSegmentType(commonpb.SegmentState_Growing),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
if !force {
|
2023-09-05 10:05:48 +08:00
|
|
|
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
|
2023-03-27 00:42:00 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Warn("delegator failed to find worker",
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
|
|
|
return err
|
|
|
|
}
|
2023-04-17 18:46:30 +08:00
|
|
|
req.Base.TargetID = targetNodeID
|
2023-03-27 00:42:00 +08:00
|
|
|
err = worker.ReleaseSegments(ctx, req)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("worker failed to release segments",
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2023-06-27 11:48:45 +08:00
|
|
|
|
2023-07-24 14:09:00 +08:00
|
|
|
func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64,
|
2023-09-21 09:45:27 +08:00
|
|
|
sealedInTarget []int64, droppedInTarget []int64,
|
|
|
|
) {
|
2023-08-29 23:12:27 +08:00
|
|
|
growings := sd.segmentManager.GetBy(
|
|
|
|
segments.WithType(segments.SegmentTypeGrowing),
|
|
|
|
segments.WithChannel(sd.vchannelName),
|
|
|
|
)
|
2023-07-24 14:09:00 +08:00
|
|
|
|
|
|
|
sealedSet := typeutil.NewUniqueSet(sealedInTarget...)
|
|
|
|
growingSet := typeutil.NewUniqueSet(growingInTarget...)
|
|
|
|
droppedSet := typeutil.NewUniqueSet(droppedInTarget...)
|
2023-08-29 23:12:27 +08:00
|
|
|
redundantGrowing := typeutil.NewUniqueSet()
|
2023-07-24 14:09:00 +08:00
|
|
|
for _, s := range growings {
|
|
|
|
if growingSet.Contain(s.ID()) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// sealed segment already exists, make growing segment redundant
|
|
|
|
if sealedSet.Contain(s.ID()) {
|
2023-08-29 23:12:27 +08:00
|
|
|
redundantGrowing.Insert(s.ID())
|
2023-07-24 14:09:00 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// sealed segment already dropped, make growing segment redundant
|
|
|
|
if droppedSet.Contain(s.ID()) {
|
2023-08-29 23:12:27 +08:00
|
|
|
redundantGrowing.Insert(s.ID())
|
2023-07-24 14:09:00 +08:00
|
|
|
}
|
|
|
|
}
|
2023-08-29 23:12:27 +08:00
|
|
|
redundantGrowingIDs := redundantGrowing.Collect()
|
2023-07-24 14:09:00 +08:00
|
|
|
if len(redundantGrowing) > 0 {
|
|
|
|
log.Warn("found redundant growing segments",
|
2023-08-29 23:12:27 +08:00
|
|
|
zap.Int64s("growingSegments", redundantGrowingIDs))
|
2023-07-24 14:09:00 +08:00
|
|
|
}
|
2023-08-29 23:12:27 +08:00
|
|
|
sd.distribution.SyncTargetVersion(newVersion, growingInTarget, sealedInTarget, redundantGrowingIDs)
|
2023-06-27 11:48:45 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (sd *shardDelegator) GetTargetVersion() int64 {
|
|
|
|
return sd.distribution.getTargetVersion()
|
|
|
|
}
|