mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 10:28:41 +08:00
fix: Use correct policy merging growing&l0 and add unit tests (#37950)
Related to #37574 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
8c9dab5cf7
commit
1e76d2b9f1
@ -86,14 +86,14 @@ func (sd *shardDelegator) forwardStreamingDeletion(ctx context.Context, deleteDa
|
||||
}
|
||||
|
||||
func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments.Segment) error {
|
||||
switch policy := paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.GetValue(); policy {
|
||||
case ForwardPolicyDefault, StreamingForwardPolicyBF:
|
||||
switch sd.l0ForwardPolicy {
|
||||
case ForwardPolicyDefault, L0ForwardPolicyBF:
|
||||
return sd.addL0GrowingBF(ctx, segment)
|
||||
case StreamingForwardPolicyDirect:
|
||||
case L0ForwardPolicyRemoteLoad:
|
||||
// forward streaming deletion without bf filtering
|
||||
return sd.addL0ForGrowingLoad(ctx, segment)
|
||||
default:
|
||||
log.Fatal("unsupported streaming forward policy", zap.String("policy", policy))
|
||||
log.Fatal("unsupported l0 forward policy", zap.String("policy", sd.l0ForwardPolicy))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -18,14 +18,17 @@ package delegator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
|
||||
@ -265,3 +268,231 @@ func (s *StreamingForwardSuite) TestDirectStreamingForward() {
|
||||
func TestStreamingForward(t *testing.T) {
|
||||
suite.Run(t, new(StreamingForwardSuite))
|
||||
}
|
||||
|
||||
type GrowingMergeL0Suite struct {
|
||||
suite.Suite
|
||||
|
||||
collectionID int64
|
||||
partitionIDs []int64
|
||||
replicaID int64
|
||||
vchannelName string
|
||||
version int64
|
||||
schema *schemapb.CollectionSchema
|
||||
workerManager *cluster.MockManager
|
||||
manager *segments.Manager
|
||||
tsafeManager tsafe.Manager
|
||||
loader *segments.MockLoader
|
||||
mq *msgstream.MockMsgStream
|
||||
|
||||
delegator *shardDelegator
|
||||
chunkManager storage.ChunkManager
|
||||
rootPath string
|
||||
}
|
||||
|
||||
func (s *GrowingMergeL0Suite) SetupSuite() {
|
||||
paramtable.Init()
|
||||
paramtable.SetNodeID(1)
|
||||
}
|
||||
|
||||
func (s *GrowingMergeL0Suite) SetupTest() {
|
||||
s.collectionID = 1000
|
||||
s.partitionIDs = []int64{500, 501}
|
||||
s.replicaID = 65535
|
||||
s.vchannelName = "rootcoord-dml_1000v0"
|
||||
s.version = 2000
|
||||
s.workerManager = &cluster.MockManager{}
|
||||
s.manager = segments.NewManager()
|
||||
s.tsafeManager = tsafe.NewTSafeReplica()
|
||||
s.loader = &segments.MockLoader{}
|
||||
s.loader.EXPECT().
|
||||
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
|
||||
Call.Return(func(ctx context.Context, collectionID int64, segmentType segments.SegmentType, version int64, infos ...*querypb.SegmentLoadInfo) []segments.Segment {
|
||||
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) segments.Segment {
|
||||
ms := &segments.MockSegment{}
|
||||
ms.EXPECT().ID().Return(info.GetSegmentID())
|
||||
ms.EXPECT().Type().Return(segments.SegmentTypeGrowing)
|
||||
ms.EXPECT().Partition().Return(info.GetPartitionID())
|
||||
ms.EXPECT().Collection().Return(info.GetCollectionID())
|
||||
ms.EXPECT().Indexes().Return(nil)
|
||||
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
|
||||
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
return ms
|
||||
})
|
||||
}, nil)
|
||||
|
||||
// init schema
|
||||
s.schema = &schemapb.CollectionSchema{
|
||||
Name: "TestCollection",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "id",
|
||||
FieldID: 100,
|
||||
IsPrimaryKey: true,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
AutoID: true,
|
||||
},
|
||||
{
|
||||
Name: "vector",
|
||||
FieldID: 101,
|
||||
IsPrimaryKey: false,
|
||||
DataType: schemapb.DataType_BinaryVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.DimKey,
|
||||
Value: "128",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
s.manager.Collection.PutOrRef(s.collectionID, s.schema, &segcorepb.CollectionIndexMeta{
|
||||
MaxIndexRowCount: 100,
|
||||
IndexMetas: []*segcorepb.FieldIndexMeta{
|
||||
{
|
||||
FieldID: 101,
|
||||
CollectionID: s.collectionID,
|
||||
IndexName: "binary_index",
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.DimKey,
|
||||
Value: "128",
|
||||
},
|
||||
},
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.IndexTypeKey,
|
||||
Value: "BIN_IVF_FLAT",
|
||||
},
|
||||
{
|
||||
Key: common.MetricTypeKey,
|
||||
Value: metric.JACCARD,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, &querypb.LoadMetaInfo{
|
||||
PartitionIDs: s.partitionIDs,
|
||||
})
|
||||
|
||||
s.mq = &msgstream.MockMsgStream{}
|
||||
s.rootPath = "delegator_test"
|
||||
|
||||
// init chunkManager
|
||||
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
|
||||
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
|
||||
|
||||
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
|
||||
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
|
||||
return s.mq, nil
|
||||
},
|
||||
}, 10000, nil, s.chunkManager)
|
||||
s.Require().NoError(err)
|
||||
|
||||
sd, ok := delegator.(*shardDelegator)
|
||||
s.Require().True(ok)
|
||||
s.delegator = sd
|
||||
}
|
||||
|
||||
func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() {
|
||||
sd := s.delegator
|
||||
sd.l0ForwardPolicy = L0ForwardPolicyBF
|
||||
|
||||
seg := segments.NewMockSegment(s.T())
|
||||
coll := s.manager.Collection.Get(s.collectionID)
|
||||
l0Segment, err := segments.NewL0Segment(coll, segments.SegmentTypeSealed, s.version, &querypb.SegmentLoadInfo{
|
||||
SegmentID: 10001,
|
||||
CollectionID: s.collectionID,
|
||||
PartitionID: common.AllPartitionsID,
|
||||
InsertChannel: s.vchannelName,
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
|
||||
n := 10
|
||||
deltaData := storage.NewDeltaData(int64(n))
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
deltaData.Append(storage.NewInt64PrimaryKey(rand.Int63()), 0)
|
||||
}
|
||||
err = l0Segment.LoadDeltaData(context.Background(), deltaData)
|
||||
s.Require().NoError(err)
|
||||
s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment)
|
||||
|
||||
seg.EXPECT().ID().Return(10000)
|
||||
seg.EXPECT().Partition().Return(100)
|
||||
seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error {
|
||||
s.Equal(deltaData.DeletePks(), pk)
|
||||
s.Equal(deltaData.DeleteTimestamps(), u)
|
||||
return nil
|
||||
}).Once()
|
||||
|
||||
err = sd.addL0ForGrowing(context.Background(), seg)
|
||||
s.NoError(err)
|
||||
|
||||
seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error {
|
||||
return errors.New("mocked")
|
||||
}).Once()
|
||||
err = sd.addL0ForGrowing(context.Background(), seg)
|
||||
s.Error(err)
|
||||
}
|
||||
|
||||
func (s *GrowingMergeL0Suite) TestAddL0ForGrowingLoad() {
|
||||
sd := s.delegator
|
||||
sd.l0ForwardPolicy = L0ForwardPolicyRemoteLoad
|
||||
|
||||
seg := segments.NewMockSegment(s.T())
|
||||
coll := s.manager.Collection.Get(s.collectionID)
|
||||
l0Segment, err := segments.NewL0Segment(coll, segments.SegmentTypeSealed, s.version, &querypb.SegmentLoadInfo{
|
||||
SegmentID: 10001,
|
||||
CollectionID: s.collectionID,
|
||||
PartitionID: common.AllPartitionsID,
|
||||
InsertChannel: s.vchannelName,
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{Binlogs: []*datapb.Binlog{
|
||||
{LogPath: "mocked_log_path"},
|
||||
}},
|
||||
},
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
|
||||
n := 10
|
||||
deltaData := storage.NewDeltaData(int64(n))
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
deltaData.Append(storage.NewInt64PrimaryKey(rand.Int63()), 0)
|
||||
}
|
||||
err = l0Segment.LoadDeltaData(context.Background(), deltaData)
|
||||
s.Require().NoError(err)
|
||||
s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment)
|
||||
|
||||
seg.EXPECT().ID().Return(10000)
|
||||
seg.EXPECT().Partition().Return(100)
|
||||
s.loader.EXPECT().LoadDeltaLogs(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, seg segments.Segment, fb []*datapb.FieldBinlog) error {
|
||||
s.ElementsMatch([]string{"mocked_log_path"}, lo.Flatten(lo.Map(fb, func(fbl *datapb.FieldBinlog, _ int) []string {
|
||||
return lo.Map(fbl.Binlogs, func(bl *datapb.Binlog, _ int) string { return bl.LogPath })
|
||||
})))
|
||||
return nil
|
||||
}).Once()
|
||||
|
||||
err = sd.addL0ForGrowing(context.Background(), seg)
|
||||
s.NoError(err)
|
||||
|
||||
s.loader.EXPECT().LoadDeltaLogs(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, seg segments.Segment, fb []*datapb.FieldBinlog) error {
|
||||
return errors.New("mocked")
|
||||
}).Once()
|
||||
err = sd.addL0ForGrowing(context.Background(), seg)
|
||||
s.Error(err)
|
||||
}
|
||||
|
||||
func (s *GrowingMergeL0Suite) TestAddL0ForGrowingInvalid() {
|
||||
sd := s.delegator
|
||||
sd.l0ForwardPolicy = "invalid_policy"
|
||||
|
||||
seg := segments.NewMockSegment(s.T())
|
||||
s.Panics(func() {
|
||||
sd.addL0ForGrowing(context.Background(), seg)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGrowingMergeL0(t *testing.T) {
|
||||
suite.Run(t, new(GrowingMergeL0Suite))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user