mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-05 05:18:52 +08:00
a0cec4047a
fix #29642 Signed-off-by: yah01 <yang.cen@zilliz.com>
973 lines
31 KiB
Go
973 lines
31 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package delegator
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
bloom "github.com/bits-and-blooms/bloom/v3"
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/samber/lo"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/suite"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
"github.com/milvus-io/milvus/pkg/common"
|
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/util/metric"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
)
|
|
|
|
type DelegatorDataSuite struct {
|
|
suite.Suite
|
|
|
|
collectionID int64
|
|
replicaID int64
|
|
vchannelName string
|
|
version int64
|
|
workerManager *cluster.MockManager
|
|
manager *segments.Manager
|
|
tsafeManager tsafe.Manager
|
|
loader *segments.MockLoader
|
|
mq *msgstream.MockMsgStream
|
|
|
|
delegator *shardDelegator
|
|
}
|
|
|
|
func (s *DelegatorDataSuite) SetupSuite() {
|
|
paramtable.Init()
|
|
paramtable.SetNodeID(1)
|
|
}
|
|
|
|
func (s *DelegatorDataSuite) SetupTest() {
|
|
s.collectionID = 1000
|
|
s.replicaID = 65535
|
|
s.vchannelName = "rootcoord-dml_1000_v0"
|
|
s.version = 2000
|
|
s.workerManager = &cluster.MockManager{}
|
|
s.manager = segments.NewManager()
|
|
s.tsafeManager = tsafe.NewTSafeReplica()
|
|
s.loader = &segments.MockLoader{}
|
|
|
|
// init schema
|
|
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
|
|
Name: "TestCollection",
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
Name: "id",
|
|
FieldID: 100,
|
|
IsPrimaryKey: true,
|
|
DataType: schemapb.DataType_Int64,
|
|
AutoID: true,
|
|
},
|
|
{
|
|
Name: "vector",
|
|
FieldID: 101,
|
|
IsPrimaryKey: false,
|
|
DataType: schemapb.DataType_BinaryVector,
|
|
TypeParams: []*commonpb.KeyValuePair{
|
|
{
|
|
Key: common.DimKey,
|
|
Value: "128",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}, &segcorepb.CollectionIndexMeta{
|
|
MaxIndexRowCount: 100,
|
|
IndexMetas: []*segcorepb.FieldIndexMeta{
|
|
{
|
|
FieldID: 101,
|
|
CollectionID: s.collectionID,
|
|
IndexName: "binary_index",
|
|
TypeParams: []*commonpb.KeyValuePair{
|
|
{
|
|
Key: common.DimKey,
|
|
Value: "128",
|
|
},
|
|
},
|
|
IndexParams: []*commonpb.KeyValuePair{
|
|
{
|
|
Key: common.IndexTypeKey,
|
|
Value: "BIN_IVF_FLAT",
|
|
},
|
|
{
|
|
Key: common.MetricTypeKey,
|
|
Value: metric.JACCARD,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}, &querypb.LoadMetaInfo{
|
|
LoadType: querypb.LoadType_LoadCollection,
|
|
})
|
|
|
|
s.mq = &msgstream.MockMsgStream{}
|
|
|
|
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
|
|
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
|
|
return s.mq, nil
|
|
},
|
|
}, 10000, nil)
|
|
s.Require().NoError(err)
|
|
sd, ok := delegator.(*shardDelegator)
|
|
s.Require().True(ok)
|
|
s.delegator = sd
|
|
}
|
|
|
|
func (s *DelegatorDataSuite) TestProcessInsert() {
|
|
s.Run("normal_insert", func() {
|
|
s.delegator.ProcessInsert(map[int64]*InsertData{
|
|
100: {
|
|
RowIDs: []int64{0, 1},
|
|
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(2)},
|
|
Timestamps: []uint64{10, 10},
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{},
|
|
InsertRecord: &segcorepb.InsertRecord{
|
|
FieldsData: []*schemapb.FieldData{
|
|
{
|
|
Type: schemapb.DataType_Int64,
|
|
FieldName: "id",
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{1, 2},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
FieldId: 100,
|
|
},
|
|
{
|
|
Type: schemapb.DataType_FloatVector,
|
|
FieldName: "vector",
|
|
Field: &schemapb.FieldData_Vectors{
|
|
Vectors: &schemapb.VectorField{
|
|
Dim: 128,
|
|
Data: &schemapb.VectorField_FloatVector{
|
|
FloatVector: &schemapb.FloatArray{Data: make([]float32, 128*2)},
|
|
},
|
|
},
|
|
},
|
|
FieldId: 101,
|
|
},
|
|
},
|
|
NumRows: 2,
|
|
},
|
|
},
|
|
})
|
|
|
|
s.NotNil(s.manager.Segment.GetGrowing(100))
|
|
})
|
|
|
|
s.Run("insert_bad_data", func() {
|
|
s.Panics(func() {
|
|
s.delegator.ProcessInsert(map[int64]*InsertData{
|
|
100: {
|
|
RowIDs: []int64{0, 1},
|
|
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(2)},
|
|
Timestamps: []uint64{10, 10},
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{},
|
|
InsertRecord: &segcorepb.InsertRecord{
|
|
FieldsData: []*schemapb.FieldData{
|
|
{
|
|
Type: schemapb.DataType_Int64,
|
|
FieldName: "id",
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{1, 2},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
FieldId: 100,
|
|
},
|
|
},
|
|
NumRows: 2,
|
|
},
|
|
},
|
|
})
|
|
})
|
|
})
|
|
}
|
|
|
|
func (s *DelegatorDataSuite) TestProcessDelete() {
|
|
s.loader.EXPECT().
|
|
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
|
|
Call.Return(func(ctx context.Context, collectionID int64, segmentType segments.SegmentType, version int64, infos ...*querypb.SegmentLoadInfo) []segments.Segment {
|
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) segments.Segment {
|
|
ms := &segments.MockSegment{}
|
|
ms.EXPECT().ID().Return(info.GetSegmentID())
|
|
ms.EXPECT().Type().Return(segments.SegmentTypeGrowing)
|
|
ms.EXPECT().Collection().Return(info.GetCollectionID())
|
|
ms.EXPECT().Partition().Return(info.GetPartitionID())
|
|
ms.EXPECT().Indexes().Return(nil)
|
|
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
|
|
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
|
|
return pk.EQ(storage.NewInt64PrimaryKey(10))
|
|
})
|
|
return ms
|
|
})
|
|
}, nil)
|
|
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
|
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
|
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
|
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
|
|
pks := &storage.PkStatistics{
|
|
PkFilter: bf,
|
|
}
|
|
pks.UpdatePKRange(&storage.Int64FieldData{
|
|
Data: []int64{10, 20, 30},
|
|
})
|
|
bfs.AddHistoricalStats(pks)
|
|
return bfs
|
|
})
|
|
}, func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) error {
|
|
return nil
|
|
})
|
|
|
|
workers := make(map[int64]*cluster.MockWorker)
|
|
worker1 := &cluster.MockWorker{}
|
|
workers[1] = worker1
|
|
|
|
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
|
Return(nil)
|
|
worker1.EXPECT().Delete(mock.Anything, mock.AnythingOfType("*querypb.DeleteRequest")).Return(nil)
|
|
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Call.Return(func(_ context.Context, nodeID int64) cluster.Worker {
|
|
return workers[nodeID]
|
|
}, nil)
|
|
// load growing
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
err := s.delegator.LoadGrowing(ctx, []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 1001,
|
|
CollectionID: s.collectionID,
|
|
PartitionID: 500,
|
|
},
|
|
}, 0)
|
|
s.Require().NoError(err)
|
|
// load sealed
|
|
s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 1000,
|
|
CollectionID: s.collectionID,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
},
|
|
},
|
|
})
|
|
s.Require().NoError(err)
|
|
|
|
s.delegator.ProcessDelete([]*DeleteData{
|
|
{
|
|
PartitionID: 500,
|
|
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
|
|
Timestamps: []uint64{10},
|
|
RowCount: 1,
|
|
},
|
|
}, 10)
|
|
|
|
// load sealed
|
|
s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 1000,
|
|
CollectionID: s.collectionID,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 5000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 5000},
|
|
},
|
|
},
|
|
})
|
|
s.Require().NoError(err)
|
|
|
|
s.delegator.ProcessDelete([]*DeleteData{
|
|
{
|
|
PartitionID: 500,
|
|
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
|
|
Timestamps: []uint64{10},
|
|
RowCount: 1,
|
|
},
|
|
}, 10)
|
|
s.True(s.delegator.distribution.Serviceable())
|
|
|
|
// test worker return segment not loaded
|
|
worker1.ExpectedCalls = nil
|
|
worker1.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.ErrSegmentNotLoaded)
|
|
s.delegator.ProcessDelete([]*DeleteData{
|
|
{
|
|
PartitionID: 500,
|
|
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
|
|
Timestamps: []uint64{10},
|
|
RowCount: 1,
|
|
},
|
|
}, 10)
|
|
s.True(s.delegator.distribution.Serviceable(), "segment not loaded shall not trigger offline")
|
|
|
|
// test worker offline
|
|
worker1.ExpectedCalls = nil
|
|
worker1.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.ErrNodeNotFound)
|
|
s.delegator.ProcessDelete([]*DeleteData{
|
|
{
|
|
PartitionID: 500,
|
|
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
|
|
Timestamps: []uint64{10},
|
|
RowCount: 1,
|
|
},
|
|
}, 10)
|
|
|
|
s.False(s.delegator.distribution.Serviceable())
|
|
}
|
|
|
|
func (s *DelegatorDataSuite) TestLoadSegments() {
|
|
s.Run("normal_run", func() {
|
|
defer func() {
|
|
s.workerManager.ExpectedCalls = nil
|
|
s.loader.ExpectedCalls = nil
|
|
}()
|
|
|
|
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
|
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
|
return pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
|
})
|
|
}, func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) error {
|
|
return nil
|
|
})
|
|
|
|
workers := make(map[int64]*cluster.MockWorker)
|
|
worker1 := &cluster.MockWorker{}
|
|
workers[1] = worker1
|
|
|
|
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
|
Return(nil)
|
|
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Call.Return(func(_ context.Context, nodeID int64) cluster.Worker {
|
|
return workers[nodeID]
|
|
}, nil)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
err := s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 100,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
},
|
|
},
|
|
})
|
|
|
|
s.NoError(err)
|
|
sealed, _ := s.delegator.GetSegmentInfo(false)
|
|
s.Require().Equal(1, len(sealed))
|
|
s.Equal(int64(1), sealed[0].NodeID)
|
|
s.ElementsMatch([]SegmentEntry{
|
|
{
|
|
SegmentID: 100,
|
|
NodeID: 1,
|
|
PartitionID: 500,
|
|
TargetVersion: unreadableTargetVersion,
|
|
},
|
|
}, sealed[0].Segments)
|
|
})
|
|
|
|
s.Run("load_segments_with_delete", func() {
|
|
defer func() {
|
|
s.workerManager.ExpectedCalls = nil
|
|
s.loader.ExpectedCalls = nil
|
|
}()
|
|
|
|
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
|
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
|
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
|
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
|
|
pks := &storage.PkStatistics{
|
|
PkFilter: bf,
|
|
}
|
|
pks.UpdatePKRange(&storage.Int64FieldData{
|
|
Data: []int64{10, 20, 30},
|
|
})
|
|
bfs.AddHistoricalStats(pks)
|
|
return bfs
|
|
})
|
|
}, func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) error {
|
|
return nil
|
|
})
|
|
|
|
workers := make(map[int64]*cluster.MockWorker)
|
|
worker1 := &cluster.MockWorker{}
|
|
workers[1] = worker1
|
|
|
|
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
|
Return(nil)
|
|
worker1.EXPECT().Delete(mock.Anything, mock.AnythingOfType("*querypb.DeleteRequest")).Return(nil)
|
|
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Call.Return(func(_ context.Context, nodeID int64) cluster.Worker {
|
|
return workers[nodeID]
|
|
}, nil)
|
|
|
|
s.delegator.ProcessDelete([]*DeleteData{
|
|
{
|
|
PartitionID: 500,
|
|
PrimaryKeys: []storage.PrimaryKey{
|
|
storage.NewInt64PrimaryKey(1),
|
|
storage.NewInt64PrimaryKey(10),
|
|
},
|
|
Timestamps: []uint64{10, 10},
|
|
RowCount: 2,
|
|
},
|
|
}, 10)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
err := s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 200,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
Deltalogs: []*datapb.FieldBinlog{},
|
|
Level: datapb.SegmentLevel_L0,
|
|
},
|
|
},
|
|
})
|
|
s.NoError(err)
|
|
|
|
err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 200,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
},
|
|
},
|
|
})
|
|
|
|
s.NoError(err)
|
|
sealed, _ := s.delegator.GetSegmentInfo(false)
|
|
s.Require().Equal(1, len(sealed))
|
|
s.Equal(int64(1), sealed[0].NodeID)
|
|
s.ElementsMatch([]SegmentEntry{
|
|
{
|
|
SegmentID: 100,
|
|
NodeID: 1,
|
|
PartitionID: 500,
|
|
TargetVersion: unreadableTargetVersion,
|
|
},
|
|
{
|
|
SegmentID: 200,
|
|
NodeID: 1,
|
|
PartitionID: 500,
|
|
TargetVersion: unreadableTargetVersion,
|
|
},
|
|
}, sealed[0].Segments)
|
|
})
|
|
|
|
s.Run("load_segments_with_l0_delete_failed", func() {
|
|
s.T().Skip("skip this test for now")
|
|
defer func() {
|
|
s.workerManager.ExpectedCalls = nil
|
|
s.loader.ExpectedCalls = nil
|
|
}()
|
|
|
|
delegator, err := NewShardDelegator(
|
|
context.Background(),
|
|
s.collectionID,
|
|
s.replicaID,
|
|
s.vchannelName,
|
|
s.version,
|
|
s.workerManager,
|
|
s.manager,
|
|
s.tsafeManager,
|
|
s.loader,
|
|
&msgstream.MockMqFactory{
|
|
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
|
|
return s.mq, nil
|
|
},
|
|
}, 10000, nil)
|
|
s.NoError(err)
|
|
|
|
growing0 := segments.NewMockSegment(s.T())
|
|
growing0.EXPECT().ID().Return(1)
|
|
growing0.EXPECT().Partition().Return(10)
|
|
growing0.EXPECT().Type().Return(segments.SegmentTypeGrowing)
|
|
growing0.EXPECT().Release()
|
|
|
|
growing1 := segments.NewMockSegment(s.T())
|
|
growing1.EXPECT().ID().Return(2)
|
|
growing1.EXPECT().Partition().Return(10)
|
|
growing1.EXPECT().Type().Return(segments.SegmentTypeGrowing)
|
|
growing1.EXPECT().Release()
|
|
|
|
mockErr := merr.WrapErrServiceInternal("mock")
|
|
|
|
growing0.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
growing1.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
|
|
|
|
s.loader.EXPECT().Load(
|
|
mock.Anything,
|
|
mock.Anything,
|
|
segments.SegmentTypeGrowing,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return([]segments.Segment{growing0, growing1}, nil)
|
|
|
|
err = delegator.LoadGrowing(context.Background(), []*querypb.SegmentLoadInfo{{}, {}}, 100)
|
|
s.ErrorIs(err, mockErr)
|
|
})
|
|
|
|
s.Run("load_segments_with_streaming_delete_failed", func() {
|
|
defer func() {
|
|
s.workerManager.ExpectedCalls = nil
|
|
s.loader.ExpectedCalls = nil
|
|
}()
|
|
|
|
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
|
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
|
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
|
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
|
|
pks := &storage.PkStatistics{
|
|
PkFilter: bf,
|
|
}
|
|
pks.UpdatePKRange(&storage.Int64FieldData{
|
|
Data: []int64{10, 20, 30},
|
|
})
|
|
bfs.AddHistoricalStats(pks)
|
|
return bfs
|
|
})
|
|
}, func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) error {
|
|
return nil
|
|
})
|
|
|
|
workers := make(map[int64]*cluster.MockWorker)
|
|
worker1 := &cluster.MockWorker{}
|
|
workers[1] = worker1
|
|
|
|
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
|
Return(nil)
|
|
worker1.EXPECT().Delete(mock.Anything, mock.AnythingOfType("*querypb.DeleteRequest")).Return(nil)
|
|
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Call.Return(func(_ context.Context, nodeID int64) cluster.Worker {
|
|
return workers[nodeID]
|
|
}, nil)
|
|
|
|
s.delegator.ProcessDelete([]*DeleteData{
|
|
{
|
|
PartitionID: 500,
|
|
PrimaryKeys: []storage.PrimaryKey{
|
|
storage.NewInt64PrimaryKey(1),
|
|
storage.NewInt64PrimaryKey(10),
|
|
},
|
|
Timestamps: []uint64{10, 10},
|
|
RowCount: 2,
|
|
},
|
|
}, 10)
|
|
|
|
s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
s.mq.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil)
|
|
s.mq.EXPECT().Close()
|
|
ch := make(chan *msgstream.MsgPack, 10)
|
|
close(ch)
|
|
|
|
s.mq.EXPECT().Chan().Return(ch)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
err := s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 300,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 2},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 2},
|
|
},
|
|
},
|
|
})
|
|
s.Error(err)
|
|
})
|
|
|
|
s.Run("get_worker_fail", func() {
|
|
defer func() {
|
|
s.workerManager.ExpectedCalls = nil
|
|
s.loader.ExpectedCalls = nil
|
|
}()
|
|
|
|
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Return(nil, errors.New("mock error"))
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
err := s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 100,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
},
|
|
},
|
|
})
|
|
|
|
s.Error(err)
|
|
})
|
|
|
|
s.Run("loader_bfs_fail", func() {
|
|
defer func() {
|
|
s.workerManager.ExpectedCalls = nil
|
|
s.loader.ExpectedCalls = nil
|
|
}()
|
|
|
|
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
|
|
Return(nil, errors.New("mocked error"))
|
|
|
|
workers := make(map[int64]*cluster.MockWorker)
|
|
worker1 := &cluster.MockWorker{}
|
|
workers[1] = worker1
|
|
|
|
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
|
Return(nil)
|
|
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Call.Return(func(_ context.Context, nodeID int64) cluster.Worker {
|
|
return workers[nodeID]
|
|
}, nil)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
err := s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 100,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
},
|
|
},
|
|
})
|
|
|
|
s.Error(err)
|
|
})
|
|
|
|
s.Run("worker_load_fail", func() {
|
|
defer func() {
|
|
s.workerManager.ExpectedCalls = nil
|
|
s.loader.ExpectedCalls = nil
|
|
}()
|
|
|
|
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
|
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
|
return pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
|
})
|
|
}, func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) error {
|
|
return nil
|
|
})
|
|
|
|
workers := make(map[int64]*cluster.MockWorker)
|
|
worker1 := &cluster.MockWorker{}
|
|
workers[1] = worker1
|
|
|
|
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
|
Return(errors.New("mocked error"))
|
|
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Call.Return(func(_ context.Context, nodeID int64) cluster.Worker {
|
|
return workers[nodeID]
|
|
}, nil)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
err := s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 100,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
},
|
|
},
|
|
})
|
|
|
|
s.Error(err)
|
|
})
|
|
}
|
|
|
|
func (s *DelegatorDataSuite) TestReleaseSegment() {
|
|
s.loader.EXPECT().
|
|
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
|
|
Call.Return(func(ctx context.Context, collectionID int64, segmentType segments.SegmentType, version int64, infos ...*querypb.SegmentLoadInfo) []segments.Segment {
|
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) segments.Segment {
|
|
ms := &segments.MockSegment{}
|
|
ms.EXPECT().ID().Return(info.GetSegmentID())
|
|
ms.EXPECT().Type().Return(segments.SegmentTypeGrowing)
|
|
ms.EXPECT().Partition().Return(info.GetPartitionID())
|
|
ms.EXPECT().Collection().Return(info.GetCollectionID())
|
|
ms.EXPECT().Indexes().Return(nil)
|
|
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
|
|
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
|
|
return pk.EQ(storage.NewInt64PrimaryKey(10))
|
|
})
|
|
return ms
|
|
})
|
|
}, nil)
|
|
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
|
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
|
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
|
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
|
|
pks := &storage.PkStatistics{
|
|
PkFilter: bf,
|
|
}
|
|
pks.UpdatePKRange(&storage.Int64FieldData{
|
|
Data: []int64{10, 20, 30},
|
|
})
|
|
bfs.AddHistoricalStats(pks)
|
|
return bfs
|
|
})
|
|
}, func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) error {
|
|
return nil
|
|
})
|
|
|
|
workers := make(map[int64]*cluster.MockWorker)
|
|
worker1 := &cluster.MockWorker{}
|
|
workers[1] = worker1
|
|
worker2 := &cluster.MockWorker{}
|
|
workers[2] = worker2
|
|
|
|
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
|
Return(nil)
|
|
worker1.EXPECT().ReleaseSegments(mock.Anything, mock.AnythingOfType("*querypb.ReleaseSegmentsRequest")).Return(nil)
|
|
worker2.EXPECT().ReleaseSegments(mock.Anything, mock.AnythingOfType("*querypb.ReleaseSegmentsRequest")).Return(nil)
|
|
s.workerManager.EXPECT().GetWorker(mock.Anything, mock.AnythingOfType("int64")).Call.Return(func(_ context.Context, nodeID int64) cluster.Worker {
|
|
return workers[nodeID]
|
|
}, nil)
|
|
// load growing
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
err := s.delegator.LoadGrowing(ctx, []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 1001,
|
|
CollectionID: s.collectionID,
|
|
PartitionID: 500,
|
|
},
|
|
}, 0)
|
|
s.Require().NoError(err)
|
|
// load sealed
|
|
s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
DstNodeID: 1,
|
|
CollectionID: s.collectionID,
|
|
Infos: []*querypb.SegmentLoadInfo{
|
|
{
|
|
SegmentID: 1000,
|
|
CollectionID: s.collectionID,
|
|
PartitionID: 500,
|
|
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
|
},
|
|
},
|
|
})
|
|
s.Require().NoError(err)
|
|
|
|
sealed, growing := s.delegator.GetSegmentInfo(false)
|
|
s.Require().Equal(1, len(sealed))
|
|
s.Equal(int64(1), sealed[0].NodeID)
|
|
s.ElementsMatch([]SegmentEntry{
|
|
{
|
|
SegmentID: 1000,
|
|
NodeID: 1,
|
|
PartitionID: 500,
|
|
TargetVersion: unreadableTargetVersion,
|
|
},
|
|
}, sealed[0].Segments)
|
|
|
|
s.ElementsMatch([]SegmentEntry{
|
|
{
|
|
SegmentID: 1001,
|
|
NodeID: 1,
|
|
PartitionID: 500,
|
|
},
|
|
}, growing)
|
|
|
|
err = s.delegator.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
NodeID: 1,
|
|
SegmentIDs: []int64{1000},
|
|
Scope: querypb.DataScope_Historical,
|
|
}, false)
|
|
|
|
s.NoError(err)
|
|
sealed, _ = s.delegator.GetSegmentInfo(false)
|
|
s.Equal(0, len(sealed))
|
|
|
|
err = s.delegator.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
NodeID: 1,
|
|
SegmentIDs: []int64{1001},
|
|
Scope: querypb.DataScope_Streaming,
|
|
}, false)
|
|
|
|
s.NoError(err)
|
|
_, growing = s.delegator.GetSegmentInfo(false)
|
|
s.Equal(0, len(growing))
|
|
|
|
err = s.delegator.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
NodeID: 1,
|
|
SegmentIDs: []int64{1000},
|
|
Scope: querypb.DataScope_All,
|
|
}, true)
|
|
s.NoError(err)
|
|
|
|
// test transfer
|
|
req := &querypb.ReleaseSegmentsRequest{
|
|
Base: commonpbutil.NewMsgBase(),
|
|
NodeID: 2,
|
|
SegmentIDs: []int64{1000},
|
|
Scope: querypb.DataScope_All,
|
|
NeedTransfer: true,
|
|
}
|
|
req.Base.TargetID = 1
|
|
err = s.delegator.ReleaseSegments(ctx, req, false)
|
|
s.NoError(err)
|
|
}
|
|
|
|
func (s *DelegatorDataSuite) TestSyncTargetVersion() {
|
|
for i := int64(0); i < 5; i++ {
|
|
ms := &segments.MockSegment{}
|
|
ms.EXPECT().ID().Return(i)
|
|
ms.EXPECT().StartPosition().Return(&msgpb.MsgPosition{
|
|
Timestamp: uint64(i),
|
|
})
|
|
ms.EXPECT().Type().Return(segments.SegmentTypeGrowing)
|
|
ms.EXPECT().Collection().Return(1)
|
|
ms.EXPECT().Partition().Return(1)
|
|
ms.EXPECT().InsertCount().Return(0)
|
|
ms.EXPECT().Indexes().Return(nil)
|
|
ms.EXPECT().Shard().Return(s.vchannelName)
|
|
ms.EXPECT().Level().Return(datapb.SegmentLevel_L1)
|
|
s.manager.Segment.Put(segments.SegmentTypeGrowing, ms)
|
|
}
|
|
|
|
s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{})
|
|
s.Equal(int64(5), s.delegator.GetTargetVersion())
|
|
}
|
|
|
|
func (s *DelegatorDataSuite) TestLevel0Deletions() {
|
|
delegator := s.delegator
|
|
partitionID := int64(10)
|
|
partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100})
|
|
allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101})
|
|
delegator.level0Deletions[partitionID] = partitionDeleteData
|
|
|
|
pks, _ := delegator.GetLevel0Deletions(partitionID)
|
|
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
|
|
|
|
pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
|
|
s.Empty(pks)
|
|
|
|
delegator.level0Deletions[common.InvalidPartitionID] = allPartitionDeleteData
|
|
pks, _ = delegator.GetLevel0Deletions(partitionID)
|
|
s.Len(pks, 2)
|
|
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
|
|
s.True(pks[1].EQ(allPartitionDeleteData.Pks[0]))
|
|
|
|
delete(delegator.level0Deletions, partitionID)
|
|
pks, _ = delegator.GetLevel0Deletions(partitionID)
|
|
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
|
|
|
|
// exchange the order
|
|
delegator.level0Deletions = make(map[int64]*storage.DeleteData)
|
|
partitionDeleteData, allPartitionDeleteData = allPartitionDeleteData, partitionDeleteData
|
|
delegator.level0Deletions[partitionID] = partitionDeleteData
|
|
|
|
pks, _ = delegator.GetLevel0Deletions(partitionID)
|
|
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
|
|
|
|
pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
|
|
s.Empty(pks)
|
|
|
|
delegator.level0Deletions[common.InvalidPartitionID] = allPartitionDeleteData
|
|
pks, _ = delegator.GetLevel0Deletions(partitionID)
|
|
s.Len(pks, 2)
|
|
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
|
|
s.True(pks[1].EQ(partitionDeleteData.Pks[0]))
|
|
|
|
delete(delegator.level0Deletions, partitionID)
|
|
pks, _ = delegator.GetLevel0Deletions(partitionID)
|
|
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
|
|
}
|
|
|
|
func TestDelegatorDataSuite(t *testing.T) {
|
|
suite.Run(t, new(DelegatorDataSuite))
|
|
}
|