enhance: Decouple compaction from shard (#33138)

Decouple compaction from shard, remove dependencies on shards (e.g.
SyncSegments, injection).

issue: https://github.com/milvus-io/milvus/issues/32809

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-05-24 09:07:41 +08:00 committed by GitHub
parent 592d701617
commit 7730b910b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 842 additions and 875 deletions

2
go.sum
View File

@ -290,6 +290,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@ -516,6 +517,7 @@ github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYb
github.com/kataras/neffos v0.0.14/go.mod h1:8lqADm8PnbeFfL7CLXh1WHw53dG27MC3pgi2R1rmoTE=
github.com/kataras/pio v0.0.2/go.mod h1:hAoW0t9UmXi4R5Oyq5Z4irTbaTsOemSrDGUtaTl7Dro=
github.com/kataras/sitemap v0.0.5/go.mod h1:KY2eugMKiPwsJgx7+U103YZehfvNGOXURubcGyk0Bz8=
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=

View File

@ -45,11 +45,12 @@ const (
tsTimeout = uint64(1)
)
//go:generate mockery --name=compactionPlanContext --structname=MockCompactionPlanContext --output=./ --filename=mock_compaction_plan_context.go --with-expecter --inpackage
type compactionPlanContext interface {
start()
stop()
// execCompactionPlan start to execute plan and return immediately
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan)
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
@ -277,14 +278,8 @@ func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskO
}
}
func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
nodeID, err := c.chManager.FindWatcher(plan.GetChannel())
if err != nil {
log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err))
return err
}
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", nodeID))
func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) {
log := log.With(zap.Int64("planID", plan.GetPlanID()))
c.setSegmentsCompacting(plan, true)
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", plan.GetType()))
@ -293,7 +288,6 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data
triggerInfo: signal,
plan: plan,
state: pipelining,
dataNodeID: nodeID,
span: span,
}
c.mu.Lock()
@ -301,8 +295,7 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data
c.mu.Unlock()
c.scheduler.Submit(task)
log.Info("Compaction plan submited")
return nil
log.Info("Compaction plan submitted")
}
func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error {
@ -337,10 +330,14 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error {
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Level: info.GetLevel(),
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
SegmentID: info.GetID(),
FieldBinlogs: nil,
Field2StatslogPaths: info.GetStatslogs(),
Deltalogs: nil,
InsertChannel: info.GetInsertChannel(),
Level: info.GetLevel(),
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
}
})
@ -407,8 +404,8 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
}
// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
return c.enqueuePlan(signal, plan)
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) {
c.enqueuePlan(signal, plan)
}
func (c *compactionPlanHandler) setSegmentsCompacting(plan *datapb.CompactionPlan, compacting bool) {
@ -483,25 +480,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
log.Info("meta has already been changed, skip meta change and retry sync segments")
} else {
// Also prepare metric updates.
newSegments, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result)
_, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result)
if err != nil {
return err
}
// Apply metrics after successful meta update.
metricMutation.commit()
newSegmentInfo = newSegments[0]
}
nodeID := c.plans[plan.GetPlanID()].dataNodeID
req := &datapb.SyncSegmentsRequest{
PlanID: plan.PlanID,
CompactedTo: newSegmentInfo.GetID(),
CompactedFrom: newSegmentInfo.GetCompactionFrom(),
NumOfRows: newSegmentInfo.GetNumOfRows(),
StatsLogs: newSegmentInfo.GetStatslogs(),
ChannelName: plan.GetChannel(),
PartitionId: newSegmentInfo.GetPartitionID(),
CollectionId: newSegmentInfo.GetCollectionID(),
PlanID: plan.PlanID,
}
log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
@ -633,8 +622,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// without changing the meta
log.Info("compaction syncing unknown plan with node")
if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{
PlanID: planID,
ChannelName: plan.GetChannel(),
PlanID: planID,
}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
return err

View File

@ -64,75 +64,64 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) {
// Schedule pick 1 or 0 tasks for 1 node
func (s *CompactionScheduler) Schedule() []*compactionTask {
s.taskGuard.Lock()
nodeTasks := lo.GroupBy(s.queuingTasks, func(t *compactionTask) int64 {
return t.dataNodeID
})
s.taskGuard.Unlock()
if len(nodeTasks) == 0 {
s.taskGuard.RLock()
if len(s.queuingTasks) == 0 {
s.taskGuard.RUnlock()
return nil // To mitigate the need for frequent slot querying
}
s.taskGuard.RUnlock()
nodeSlots := s.cluster.QuerySlots()
executable := make(map[int64]*compactionTask)
l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()
pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask {
for _, task := range tasks {
// TODO: sheep, replace pickShardNode with pickAnyNode
if nodeID := s.pickShardNode(task.dataNodeID, nodeSlots); nodeID == NullNodeID {
log.Warn("cannot find datanode for compaction task", zap.Int64("planID", task.plan.PlanID), zap.String("vchannel", task.plan.Channel))
continue
for _, tasks := range s.parallelTasks {
for _, t := range tasks {
switch t.plan.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
l0ChannelExcludes.Insert(t.plan.GetChannel())
case datapb.CompactionType_MixCompaction:
mixChannelExcludes.Insert(t.plan.GetChannel())
}
if lo.Contains(exclusiveChannels, task.plan.GetChannel()) {
continue
}
if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
// Channel of LevelZeroCompaction task with no executing compactions
if !lo.Contains(executing, task.plan.GetChannel()) {
return task
}
// Don't schedule any tasks for channel with LevelZeroCompaction task
// when there're executing compactions
exclusiveChannels = append(exclusiveChannels, task.plan.GetChannel())
continue
}
return task
}
return nil
}
s.taskGuard.Lock()
defer s.taskGuard.Unlock()
// pick 1 or 0 task for 1 node
for node, tasks := range nodeTasks {
parallel := s.parallelTasks[node]
var (
executing = typeutil.NewSet[string]()
channelsExecPrior = typeutil.NewSet[string]()
)
for _, t := range parallel {
executing.Insert(t.plan.GetChannel())
if t.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
channelsExecPrior.Insert(t.plan.GetChannel())
}
picked := make([]*compactionTask, 0)
for _, t := range s.queuingTasks {
nodeID := s.pickAnyNode(nodeSlots)
if nodeID == NullNodeID {
log.Warn("cannot find datanode for compaction task",
zap.Int64("planID", t.plan.PlanID), zap.String("vchannel", t.plan.Channel))
continue
}
picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect())
if picked != nil {
executable[node] = picked
nodeSlots[node]--
switch t.plan.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
if l0ChannelExcludes.Contain(t.plan.GetChannel()) ||
mixChannelExcludes.Contain(t.plan.GetChannel()) {
continue
}
t.dataNodeID = nodeID
picked = append(picked, t)
l0ChannelExcludes.Insert(t.plan.GetChannel())
nodeSlots[nodeID]--
case datapb.CompactionType_MixCompaction:
if l0ChannelExcludes.Contain(t.plan.GetChannel()) {
continue
}
t.dataNodeID = nodeID
picked = append(picked, t)
mixChannelExcludes.Insert(t.plan.GetChannel())
nodeSlots[nodeID]--
}
}
var pickPlans []int64
for node, task := range executable {
for _, task := range picked {
node := task.dataNodeID
pickPlans = append(pickPlans, task.plan.PlanID)
if _, ok := s.parallelTasks[node]; !ok {
s.parallelTasks[node] = []*compactionTask{task}
@ -156,7 +145,7 @@ func (s *CompactionScheduler) Schedule() []*compactionTask {
}
}
return lo.Values(executable)
return picked
}
func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPlan) {

View File

@ -60,11 +60,11 @@ func (s *SchedulerSuite) TestScheduleParallelTaskFull() {
}{
{"with L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
@ -101,16 +101,16 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{10, 11}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-2", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{14}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{14, 13}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
@ -134,15 +134,6 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
return t.plan.PlanID
}))
// the second schedule returns empty for no slot
if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 0})
s.scheduler.cluster = cluster
}
gotTasks = s.scheduler.Schedule()
s.Empty(gotTasks)
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
})
}
@ -158,16 +149,16 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{10, 11}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{13}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
@ -192,17 +183,6 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
return t.plan.PlanID
}))
// the second schedule returns empty for no slot
if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 0})
s.scheduler.cluster = cluster
}
if len(gotTasks) > 0 {
gotTasks = s.scheduler.Schedule()
s.Empty(gotTasks)
}
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
})
}

View File

@ -431,43 +431,22 @@ func (s *CompactionPlanHandlerSuite) TestRefreshPlanMixCompaction() {
}
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.mockCm.EXPECT().FindWatcher(mock.Anything).RunAndReturn(func(channel string) (int64, error) {
if channel == "ch-1" {
return 0, errors.Errorf("mock error for ch-1")
}
return 1, nil
}).Twice()
s.mockSch.EXPECT().Submit(mock.Anything).Return().Once()
tests := []struct {
description string
channel string
hasError bool
}{
{"channel with error", "ch-1", true},
{"channel with no error", "ch-2", false},
}
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.scheduler = s.mockSch
for idx, test := range tests {
sig := &compactionSignal{id: int64(idx)}
plan := &datapb.CompactionPlan{
PlanID: int64(idx),
}
s.Run(test.description, func() {
plan.Channel = test.channel
err := handler.execCompactionPlan(sig, plan)
if test.hasError {
s.Error(err)
} else {
s.NoError(err)
}
})
sig := &compactionSignal{id: int64(1)}
plan := &datapb.CompactionPlan{
PlanID: int64(1),
}
plan.Channel = "ch-1"
handler.execCompactionPlan(sig, plan)
handler.mu.RLock()
defer handler.mu.RUnlock()
_, ok := handler.plans[int64(1)]
s.True(ok)
}
func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {

View File

@ -430,23 +430,14 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
break
}
start := time.Now()
if err := fillOriginPlan(t.allocator, plan); err != nil {
if err := fillOriginPlan(coll.Schema, t.allocator, plan); err != nil {
log.Warn("failed to fill plan",
zap.Int64("collectionID", signal.collectionID),
zap.Int64s("segmentIDs", segIDs),
zap.Error(err))
continue
}
err := t.compactionHandler.execCompactionPlan(signal, plan)
if err != nil {
log.Warn("failed to execute compaction plan",
zap.Int64("collectionID", signal.collectionID),
zap.Int64("planID", plan.PlanID),
zap.Int64s("segmentIDs", segIDs),
zap.Error(err))
continue
}
t.compactionHandler.execCompactionPlan(signal, plan)
log.Info("time cost of generating global compaction",
zap.Int64("planID", plan.PlanID),
zap.Int64("time cost", time.Since(start).Milliseconds()),
@ -530,18 +521,11 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
break
}
start := time.Now()
if err := fillOriginPlan(t.allocator, plan); err != nil {
if err := fillOriginPlan(coll.Schema, t.allocator, plan); err != nil {
log.Warn("failed to fill plan", zap.Error(err))
continue
}
if err := t.compactionHandler.execCompactionPlan(signal, plan); err != nil {
log.Warn("failed to execute compaction plan",
zap.Int64("collection", signal.collectionID),
zap.Int64("planID", plan.PlanID),
zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs())),
zap.Error(err))
continue
}
t.compactionHandler.execCompactionPlan(signal, plan)
log.Info("time cost of generating compaction",
zap.Int64("planID", plan.PlanID),
zap.Int64("time cost", time.Since(start).Milliseconds()),
@ -713,6 +697,7 @@ func segmentsToPlan(segments []*SegmentInfo, compactTime *compactTime) *datapb.C
}
log.Info("generate a plan for priority candidates", zap.Any("plan", plan),
zap.Int("len(segments)", len(plan.GetSegmentBinlogs())),
zap.Int64("target segment row", plan.TotalRows), zap.Int64("target segment size", size))
return plan
}

View File

@ -51,9 +51,8 @@ var _ compactionPlanContext = (*spyCompactionHandler)(nil)
func (h *spyCompactionHandler) removeTasksByChannel(channel string) {}
// execCompactionPlan start to execute plan and return immediately
func (h *spyCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
func (h *spyCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) {
h.spyChan <- plan
return nil
}
// completeCompaction record the result of a compaction
@ -106,6 +105,22 @@ func Test_compactionTrigger_force(t *testing.T) {
vecFieldID := int64(201)
indexID := int64(1001)
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}
tests := []struct {
name string
fields fields
@ -292,21 +307,8 @@ func Test_compactionTrigger_force(t *testing.T) {
},
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
ID: 2,
Schema: schema,
Properties: map[string]string{
common.CollectionTTLConfigKey: "0",
},
@ -469,6 +471,7 @@ func Test_compactionTrigger_force(t *testing.T) {
Type: datapb.CompactionType_MixCompaction,
Channel: "ch1",
TotalRows: 200,
Schema: schema,
},
},
},
@ -2386,7 +2389,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() {
},
},
}, nil)
s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return()
tr.handleSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
@ -2517,7 +2520,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() {
common.CollectionAutoCompactionKey: "false",
},
}, nil)
s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return()
tr.handleGlobalSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,

View File

@ -2,10 +2,12 @@ package datacoord
import (
"context"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
@ -33,16 +35,18 @@ type TriggerManager interface {
// 2. SystemIDLE & schedulerIDLE
// 3. Manual Compaction
type CompactionTriggerManager struct {
scheduler Scheduler
handler compactionPlanContext // TODO replace with scheduler
scheduler Scheduler
handler Handler
compactionHandler compactionPlanContext // TODO replace with scheduler
allocator allocator
}
func NewCompactionTriggerManager(alloc allocator, handler compactionPlanContext) *CompactionTriggerManager {
func NewCompactionTriggerManager(alloc allocator, handler Handler, compactionHandler compactionPlanContext) *CompactionTriggerManager {
m := &CompactionTriggerManager{
allocator: alloc,
handler: handler,
allocator: alloc,
handler: handler,
compactionHandler: compactionHandler,
}
return m
@ -51,7 +55,7 @@ func NewCompactionTriggerManager(alloc allocator, handler compactionPlanContext)
func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionTriggerType, views []CompactionView) {
log := log.With(zap.Int64("taskID", taskID))
for _, view := range views {
if m.handler.isFull() {
if m.compactionHandler.isFull() {
log.RatedInfo(1.0, "Skip trigger compaction for scheduler is full")
return
}
@ -103,7 +107,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(taskID int64, outView
// TODO, remove handler, use scheduler
// m.scheduler.Submit(plan)
m.handler.execCompactionPlan(signal, plan)
m.compactionHandler.execCompactionPlan(signal, plan)
log.Info("Finish to submit a LevelZeroCompaction plan",
zap.Int64("taskID", taskID),
zap.Int64("planID", plan.GetPlanID()),
@ -130,7 +134,14 @@ func (m *CompactionTriggerManager) buildL0CompactionPlan(view CompactionView) *d
Channel: view.GetGroupLabel().Channel,
}
if err := fillOriginPlan(m.allocator, plan); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID)
if err != nil {
return nil
}
if err := fillOriginPlan(collection.Schema, m.allocator, plan); err != nil {
return nil
}
@ -145,14 +156,16 @@ type chanPartSegments struct {
segments []*SegmentInfo
}
func fillOriginPlan(alloc allocator, plan *datapb.CompactionPlan) error {
// TODO context
id, err := alloc.allocID(context.TODO())
func fillOriginPlan(schema *schemapb.CollectionSchema, alloc allocator, plan *datapb.CompactionPlan) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
id, err := alloc.allocID(ctx)
if err != nil {
return err
}
plan.PlanID = id
plan.TimeoutInSeconds = Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32()
plan.Schema = schema
return nil
}

View File

@ -20,6 +20,7 @@ type CompactionTriggerManagerSuite struct {
suite.Suite
mockAlloc *NMockAllocator
handler Handler
mockPlanContext *MockCompactionPlanContext
testLabel *CompactionGroupLabel
meta *meta
@ -29,6 +30,7 @@ type CompactionTriggerManagerSuite struct {
func (s *CompactionTriggerManagerSuite) SetupTest() {
s.mockAlloc = NewNMockAllocator(s.T())
s.handler = NewNMockHandler(s.T())
s.mockPlanContext = NewMockCompactionPlanContext(s.T())
s.testLabel = &CompactionGroupLabel{
@ -42,7 +44,7 @@ func (s *CompactionTriggerManagerSuite) SetupTest() {
s.meta.segments.SetSegment(id, segment)
}
s.m = NewCompactionTriggerManager(s.mockAlloc, s.mockPlanContext)
s.m = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.mockPlanContext)
}
func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() {
@ -73,6 +75,10 @@ func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() {
}
func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
handler := NewNMockHandler(s.T())
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil)
s.m.handler = handler
viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator)
collSegs := s.meta.GetCompactableSegmentGroupByCollection()
@ -120,12 +126,16 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
s.ElementsMatch(expectedSegs, gotSegs)
log.Info("generated plan", zap.Any("plan", plan))
}).Return(nil).Once()
}).Return().Once()
s.m.Notify(19530, TriggerTypeLevelZeroViewIDLE, levelZeroView)
}
func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
handler := NewNMockHandler(s.T())
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil)
s.m.handler = handler
viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator)
collSegs := s.meta.GetCompactableSegmentGroupByCollection()
@ -168,7 +178,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
s.ElementsMatch(expectedSegs, gotSegs)
log.Info("generated plan", zap.Any("plan", plan))
}).Return(nil).Once()
}).Return().Once()
s.m.Notify(19530, TriggerTypeLevelZeroViewChange, levelZeroView)
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.30.1. DO NOT EDIT.
package datacoord
@ -21,17 +21,8 @@ func (_m *MockCompactionPlanContext) EXPECT() *MockCompactionPlanContext_Expecte
}
// execCompactionPlan provides a mock function with given fields: signal, plan
func (_m *MockCompactionPlanContext) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
ret := _m.Called(signal, plan)
var r0 error
if rf, ok := ret.Get(0).(func(*compactionSignal, *datapb.CompactionPlan) error); ok {
r0 = rf(signal, plan)
} else {
r0 = ret.Error(0)
}
return r0
func (_m *MockCompactionPlanContext) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) {
_m.Called(signal, plan)
}
// MockCompactionPlanContext_execCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'execCompactionPlan'
@ -40,8 +31,8 @@ type MockCompactionPlanContext_execCompactionPlan_Call struct {
}
// execCompactionPlan is a helper method to define mock.On call
// - signal *compactionSignal
// - plan *datapb.CompactionPlan
// - signal *compactionSignal
// - plan *datapb.CompactionPlan
func (_e *MockCompactionPlanContext_Expecter) execCompactionPlan(signal interface{}, plan interface{}) *MockCompactionPlanContext_execCompactionPlan_Call {
return &MockCompactionPlanContext_execCompactionPlan_Call{Call: _e.mock.On("execCompactionPlan", signal, plan)}
}
@ -53,12 +44,12 @@ func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Run(run func(signal
return _c
}
func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Return(_a0 error) *MockCompactionPlanContext_execCompactionPlan_Call {
_c.Call.Return(_a0)
func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Return() *MockCompactionPlanContext_execCompactionPlan_Call {
_c.Call.Return()
return _c
}
func (_c *MockCompactionPlanContext_execCompactionPlan_Call) RunAndReturn(run func(*compactionSignal, *datapb.CompactionPlan) error) *MockCompactionPlanContext_execCompactionPlan_Call {
func (_c *MockCompactionPlanContext_execCompactionPlan_Call) RunAndReturn(run func(*compactionSignal, *datapb.CompactionPlan)) *MockCompactionPlanContext_execCompactionPlan_Call {
_c.Call.Return(run)
return _c
}
@ -85,7 +76,7 @@ type MockCompactionPlanContext_getCompaction_Call struct {
}
// getCompaction is a helper method to define mock.On call
// - planID int64
// - planID int64
func (_e *MockCompactionPlanContext_Expecter) getCompaction(planID interface{}) *MockCompactionPlanContext_getCompaction_Call {
return &MockCompactionPlanContext_getCompaction_Call{Call: _e.mock.On("getCompaction", planID)}
}
@ -129,7 +120,7 @@ type MockCompactionPlanContext_getCompactionTasksBySignalID_Call struct {
}
// getCompactionTasksBySignalID is a helper method to define mock.On call
// - signalID int64
// - signalID int64
func (_e *MockCompactionPlanContext_Expecter) getCompactionTasksBySignalID(signalID interface{}) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call {
return &MockCompactionPlanContext_getCompactionTasksBySignalID_Call{Call: _e.mock.On("getCompactionTasksBySignalID", signalID)}
}
@ -203,7 +194,7 @@ type MockCompactionPlanContext_removeTasksByChannel_Call struct {
}
// removeTasksByChannel is a helper method to define mock.On call
// - channel string
// - channel string
func (_e *MockCompactionPlanContext_Expecter) removeTasksByChannel(channel interface{}) *MockCompactionPlanContext_removeTasksByChannel_Call {
return &MockCompactionPlanContext_removeTasksByChannel_Call{Call: _e.mock.On("removeTasksByChannel", channel)}
}
@ -309,7 +300,7 @@ type MockCompactionPlanContext_updateCompaction_Call struct {
}
// updateCompaction is a helper method to define mock.On call
// - ts uint64
// - ts uint64
func (_e *MockCompactionPlanContext_Expecter) updateCompaction(ts interface{}) *MockCompactionPlanContext_updateCompaction_Call {
return &MockCompactionPlanContext_updateCompaction_Call{Call: _e.mock.On("updateCompaction", ts)}
}

View File

@ -524,7 +524,7 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ
func (s *Server) createCompactionHandler() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator)
triggerv2 := NewCompactionTriggerManager(s.allocator, s.compactionHandler)
triggerv2 := NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler)
s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator)
}

View File

@ -21,10 +21,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
//go:generate mockery --name=Compactor --structname=MockCompactor --output=./ --filename=mock_compactor.go --with-expecter --inpackage
type Compactor interface {
Complete()
Compact() (*datapb.CompactionPlanResult, error)
InjectDone()
Stop()
GetPlanID() typeutil.UniqueID
GetCollection() typeutil.UniqueID

View File

@ -21,7 +21,6 @@ import (
"fmt"
sio "io"
"strconv"
"sync"
"time"
"github.com/cockroachdb/errors"
@ -33,15 +32,12 @@ import (
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -51,9 +47,6 @@ import (
// for MixCompaction only
type mixCompactionTask struct {
binlogIO io.BinlogIO
Compactor
metaCache metacache.MetaCache
syncMgr syncmgr.SyncManager
allocator.Allocator
currentTs typeutil.Timestamp
@ -62,9 +55,8 @@ type mixCompactionTask struct {
ctx context.Context
cancel context.CancelFunc
injectDoneOnce sync.Once
done chan struct{}
tr *timerecord.TimeRecorder
done chan struct{}
tr *timerecord.TimeRecorder
}
// make sure compactionTask implements compactor interface
@ -73,8 +65,6 @@ var _ Compactor = (*mixCompactionTask)(nil)
func NewMixCompactionTask(
ctx context.Context,
binlogIO io.BinlogIO,
metaCache metacache.MetaCache,
syncMgr syncmgr.SyncManager,
alloc allocator.Allocator,
plan *datapb.CompactionPlan,
) *mixCompactionTask {
@ -83,8 +73,6 @@ func NewMixCompactionTask(
ctx: ctx1,
cancel: cancel,
binlogIO: binlogIO,
syncMgr: syncMgr,
metaCache: metaCache,
Allocator: alloc,
plan: plan,
tr: timerecord.NewTimeRecorder("mix compaction"),
@ -100,7 +88,6 @@ func (t *mixCompactionTask) Complete() {
func (t *mixCompactionTask) Stop() {
t.cancel()
<-t.done
t.InjectDone()
}
func (t *mixCompactionTask) GetPlanID() typeutil.UniqueID {
@ -112,18 +99,16 @@ func (t *mixCompactionTask) GetChannelName() string {
}
// return num rows of all segment compaction from
func (t *mixCompactionTask) getNumRows() (int64, error) {
func (t *mixCompactionTask) getNumRows() int64 {
numRows := int64(0)
for _, binlog := range t.plan.SegmentBinlogs {
seg, ok := t.metaCache.GetSegmentByID(binlog.GetSegmentID())
if !ok {
return 0, merr.WrapErrSegmentNotFound(binlog.GetSegmentID(), "get compaction segments num rows failed")
if len(binlog.GetFieldBinlogs()) > 0 {
for _, ct := range binlog.GetFieldBinlogs()[0].GetBinlogs() {
numRows += ct.GetEntriesNum()
}
}
numRows += seg.NumOfRows()
}
return numRows, nil
return numRows
}
func (t *mixCompactionTask) mergeDeltalogs(ctx context.Context, dpaths map[typeutil.UniqueID][]string) (map[interface{}]typeutil.Timestamp, error) {
@ -417,7 +402,19 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("MixCompact-%d", t.GetPlanID()))
defer span.End()
log := log.Ctx(ctx).With(zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds()))
if len(t.plan.GetSegmentBinlogs()) < 1 {
log.Warn("compact wrong, there's no segments in segment binlogs", zap.Int64("planID", t.plan.GetPlanID()))
return nil, errors.New("compaction plan is illegal")
}
collectionID := t.plan.GetSegmentBinlogs()[0].GetCollectionID()
partitionID := t.plan.GetSegmentBinlogs()[0].GetPartitionID()
log := log.Ctx(ctx).With(zap.Int64("planID", t.plan.GetPlanID()),
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds()))
if ok := funcutil.CheckCtxValid(ctx); !ok {
log.Warn("compact wrong, task context done or timeout")
return nil, ctx.Err()
@ -427,10 +424,6 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
defer cancelAll()
log.Info("compact start")
if len(t.plan.GetSegmentBinlogs()) < 1 {
log.Warn("compact wrong, there's no segments in segment binlogs")
return nil, errors.New("compaction plan is illegal")
}
targetSegID, err := t.AllocOne()
if err != nil {
@ -438,15 +431,9 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
return nil, err
}
previousRowCount, err := t.getNumRows()
if err != nil {
log.Warn("compact wrong, unable to get previous numRows", zap.Error(err))
return nil, err
}
previousRowCount := t.getNumRows()
partID := t.plan.GetSegmentBinlogs()[0].GetPartitionID()
writer, err := NewSegmentWriter(t.metaCache.Schema(), previousRowCount, targetSegID, partID, t.metaCache.Collection())
writer, err := NewSegmentWriter(t.plan.GetSchema(), previousRowCount, targetSegID, partitionID, collectionID)
if err != nil {
log.Warn("compact wrong, unable to init segment writer", zap.Error(err))
return nil, err
@ -455,12 +442,6 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
segIDs := lo.Map(t.plan.GetSegmentBinlogs(), func(binlogs *datapb.CompactionSegmentBinlogs, _ int) int64 {
return binlogs.GetSegmentID()
})
// Inject to stop flush
// when compaction failed, these segments need to be Unblocked by injectDone in compaction_executor
// when compaction succeeded, these segments will be Unblocked by SyncSegments from DataCoord.
for _, segID := range segIDs {
t.syncMgr.Block(segID)
}
if err := binlog.DecompressCompactionBinlogs(t.plan.GetSegmentBinlogs()); err != nil {
log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err))
@ -541,16 +522,9 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
return planResult, nil
}
func (t *mixCompactionTask) InjectDone() {
t.injectDoneOnce.Do(func() {
for _, binlog := range t.plan.SegmentBinlogs {
t.syncMgr.Unblock(binlog.SegmentID)
}
})
}
func (t *mixCompactionTask) GetCollection() typeutil.UniqueID {
return t.metaCache.Collection()
// The length of SegmentBinlogs is checked before task enqueueing.
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
}
func (t *mixCompactionTask) isExpiredEntity(ts typeutil.Timestamp) bool {

View File

@ -32,12 +32,10 @@ import (
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -54,8 +52,6 @@ type MixCompactionTaskSuite struct {
mockBinlogIO *io.MockBinlogIO
mockAlloc *allocator.MockAllocator
mockMeta *metacache.MockMetaCache
mockSyncMgr *syncmgr.MockSyncManager
meta *etcdpb.CollectionMeta
segWriter *SegmentWriter
@ -71,10 +67,8 @@ func (s *MixCompactionTaskSuite) SetupSuite() {
func (s *MixCompactionTaskSuite) SetupTest() {
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockAlloc = allocator.NewMockAllocator(s.T())
s.mockMeta = metacache.NewMockMetaCache(s.T())
s.mockSyncMgr = syncmgr.NewMockSyncManager(s.T())
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.mockMeta, s.mockSyncMgr, s.mockAlloc, nil)
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil)
s.meta = genTestCollectionMeta()
@ -90,6 +84,7 @@ func (s *MixCompactionTaskSuite) SetupTest() {
}},
TimeoutInSeconds: 10,
Type: datapb.CompactionType_MixCompaction,
Schema: s.meta.GetSchema(),
}
s.task.plan = s.plan
}
@ -106,26 +101,10 @@ func getMilvusBirthday() time.Time {
return time.Date(2019, time.Month(5), 30, 0, 0, 0, 0, time.UTC)
}
func (s *MixCompactionTaskSuite) TestInjectDone() {
segmentIDs := []int64{100, 200, 300}
s.task.plan.SegmentBinlogs = lo.Map(segmentIDs, func(id int64, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{SegmentID: id}
})
for _, segmentID := range segmentIDs {
s.mockSyncMgr.EXPECT().Unblock(segmentID).Return().Once()
}
s.task.InjectDone()
s.task.InjectDone()
}
func (s *MixCompactionTaskSuite) TestCompactDupPK() {
// Test merge compactions, two segments with the same pk, one deletion pk=1
// The merged segment 19530 should remain 3 pk without pk=100
s.mockAlloc.EXPECT().AllocOne().Return(int64(19530), nil).Twice()
s.mockMeta.EXPECT().Schema().Return(s.meta.GetSchema()).Once()
s.mockMeta.EXPECT().Collection().Return(CollectionID).Once()
segments := []int64{7, 8, 9}
dblobs, err := getInt64DeltaBlobs(
1,
@ -153,12 +132,12 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() {
s.segWriter.writer.Flush()
s.Require().NoError(err)
statistic := &storage.PkStatistics{
PkFilter: s.segWriter.pkstats.BF,
MinPK: s.segWriter.pkstats.MinPk,
MaxPK: s.segWriter.pkstats.MaxPk,
}
bfs := metacache.NewBloomFilterSet(statistic)
//statistic := &storage.PkStatistics{
// PkFilter: s.segWriter.pkstats.BF,
// MinPK: s.segWriter.pkstats.MinPk,
// MaxPK: s.segWriter.pkstats.MaxPk,
//}
//bfs := metacache.NewBloomFilterSet(statistic)
kvs, fBinlogs, err := s.task.serializeWrite(context.TODO(), s.segWriter)
s.Require().NoError(err)
@ -167,17 +146,12 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() {
return len(left) == 0 && len(right) == 0
})).Return(lo.Values(kvs), nil).Once()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{
CollectionID: CollectionID,
PartitionID: PartitionID,
ID: segID,
NumOfRows: 1,
}, bfs)
s.mockMeta.EXPECT().GetSegmentByID(segID).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return seg, true
})
s.mockSyncMgr.EXPECT().Block(segID).Return().Once()
//seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{
// CollectionID: CollectionID,
// PartitionID: PartitionID,
// ID: segID,
// NumOfRows: 1,
//}, bfs)
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
@ -204,8 +178,6 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() {
func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
s.mockAlloc.EXPECT().AllocOne().Return(int64(19530), nil).Twice()
s.mockMeta.EXPECT().Schema().Return(s.meta.GetSchema()).Once()
s.mockMeta.EXPECT().Collection().Return(CollectionID).Once()
segments := []int64{5, 6, 7}
s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(7777777, 8888888, nil)
@ -213,12 +185,12 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0)
for _, segID := range segments {
s.initSegBuffer(segID)
statistic := &storage.PkStatistics{
PkFilter: s.segWriter.pkstats.BF,
MinPK: s.segWriter.pkstats.MinPk,
MaxPK: s.segWriter.pkstats.MaxPk,
}
bfs := metacache.NewBloomFilterSet(statistic)
//statistic := &storage.PkStatistics{
// PkFilter: s.segWriter.pkstats.BF,
// MinPK: s.segWriter.pkstats.MinPk,
// MaxPK: s.segWriter.pkstats.MaxPk,
//}
//bfs := metacache.NewBloomFilterSet(statistic)
kvs, fBinlogs, err := s.task.serializeWrite(context.TODO(), s.segWriter)
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool {
@ -226,17 +198,12 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
return len(left) == 0 && len(right) == 0
})).Return(lo.Values(kvs), nil).Once()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{
CollectionID: CollectionID,
PartitionID: PartitionID,
ID: segID,
NumOfRows: 1,
}, bfs)
s.mockMeta.EXPECT().GetSegmentByID(segID).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return seg, true
})
s.mockSyncMgr.EXPECT().Block(segID).Return().Once()
//seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{
// CollectionID: CollectionID,
// PartitionID: PartitionID,
// ID: segID,
// NumOfRows: 1,
//}, bfs)
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
@ -251,10 +218,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
ID: 99999,
NumOfRows: 0,
}, metacache.NewBloomFilterSet())
s.mockMeta.EXPECT().GetSegmentByID(seg.SegmentID()).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return seg, true
})
s.mockSyncMgr.EXPECT().Block(seg.SegmentID()).Return().Once()
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: seg.SegmentID(),
})
@ -531,15 +495,6 @@ func (s *MixCompactionTaskSuite) TestCompactFail() {
_, err := s.task.Compact()
s.Error(err)
})
s.Run("Test getNumRows error", func() {
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Once()
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false)
_, err := s.task.Compact()
s.Error(err)
s.ErrorIs(err, merr.ErrSegmentNotFound)
})
}
func (s *MixCompactionTaskSuite) TestIsExpiredEntity() {

View File

@ -70,12 +70,11 @@ func (c *compactionExecutor) toCompleteState(task compaction.Compactor) {
c.executing.GetAndRemove(task.GetPlanID())
}
func (c *compactionExecutor) injectDone(planID UniqueID) {
func (c *compactionExecutor) removeTask(planID UniqueID) {
c.completed.GetAndRemove(planID)
task, loaded := c.completedCompactor.GetAndRemove(planID)
if loaded {
log.Info("Compaction task inject done", zap.Int64("planID", planID), zap.String("channel", task.GetChannelName()))
task.InjectDone()
log.Info("Compaction task removed", zap.Int64("planID", planID), zap.String("channel", task.GetChannelName()))
}
}
@ -110,12 +109,11 @@ func (c *compactionExecutor) executeTask(task compaction.Compactor) {
result, err := task.Compact()
if err != nil {
task.InjectDone()
log.Warn("compaction task failed", zap.Error(err))
} else {
c.completed.Insert(result.GetPlanID(), result)
c.completedCompactor.Insert(result.GetPlanID(), task)
return
}
c.completed.Insert(result.GetPlanID(), result)
c.completedCompactor.Insert(result.GetPlanID(), task)
log.Info("end to execute compaction")
}
@ -152,7 +150,7 @@ func (c *compactionExecutor) discardPlan(channel string) {
// remove all completed plans of channel
c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool {
if result.GetChannel() == channel {
c.injectDone(planID)
c.removeTask(planID)
log.Info("remove compaction plan and results",
zap.String("channel", channel),
zap.Int64("planID", planID))

View File

@ -34,7 +34,6 @@ import (
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
@ -52,12 +51,8 @@ import (
)
type levelZeroCompactionTask struct {
compaction.Compactor
io.BinlogIO
allocator allocator.Allocator
metacache metacache.MetaCache
syncmgr syncmgr.SyncManager
cm storage.ChunkManager
plan *datapb.CompactionPlan
@ -76,8 +71,6 @@ func newLevelZeroCompactionTask(
ctx context.Context,
binlogIO io.BinlogIO,
alloc allocator.Allocator,
metaCache metacache.MetaCache,
syncmgr syncmgr.SyncManager,
cm storage.ChunkManager,
plan *datapb.CompactionPlan,
) *levelZeroCompactionTask {
@ -88,8 +81,6 @@ func newLevelZeroCompactionTask(
BinlogIO: binlogIO,
allocator: alloc,
metacache: metaCache,
syncmgr: syncmgr,
cm: cm,
plan: plan,
tr: timerecord.NewTimeRecorder("levelzero compaction"),
@ -115,12 +106,10 @@ func (t *levelZeroCompactionTask) GetChannelName() string {
}
func (t *levelZeroCompactionTask) GetCollection() int64 {
return t.metacache.Collection()
// The length of SegmentBinlogs is checked before task enqueueing.
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
}
// Do nothing for levelzero compaction
func (t *levelZeroCompactionTask) InjectDone() {}
func (t *levelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "L0Compact")
defer span.End()
@ -338,16 +327,20 @@ func (t *levelZeroCompactionTask) splitDelta(
}
func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storage.DeleteData) (map[string][]byte, *datapb.Binlog, error) {
segment, ok := lo.Find(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) bool {
return segment.GetSegmentID() == segmentID
})
if !ok {
return nil, nil, merr.WrapErrSegmentNotFound(segmentID, "cannot find segment in compaction plan")
}
var (
collID = t.metacache.Collection()
uploadKv = make(map[string][]byte)
collectionID = segment.GetCollectionID()
partitionID = segment.GetPartitionID()
uploadKv = make(map[string][]byte)
)
seg, ok := t.metacache.GetSegmentByID(segmentID)
if !ok {
return nil, nil, merr.WrapErrSegmentLack(segmentID)
}
blob, err := storage.NewDeleteCodec().Serialize(collID, seg.PartitionID(), segmentID, dData)
blob, err := storage.NewDeleteCodec().Serialize(collectionID, partitionID, segmentID, dData)
if err != nil {
return nil, nil, err
}
@ -357,7 +350,7 @@ func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storag
return nil, nil, err
}
blobKey := metautil.JoinIDPath(collID, seg.PartitionID(), segmentID, logID)
blobKey := metautil.JoinIDPath(collectionID, partitionID, segmentID, logID)
blobPath := t.BinlogIO.JoinFullPath(common.SegmentDeltaLogPath, blobKey)
uploadKv[blobPath] = blob.GetValue()
@ -447,7 +440,7 @@ func (t *levelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegm
_ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(),
segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
pks, err := loadStats(t.ctx, t.cm,
t.metacache.Schema(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
if err != nil {
log.Warn("failed to load segment stats log", zap.Error(err))
return err, err

View File

@ -51,7 +51,6 @@ type LevelZeroCompactionTaskSuite struct {
mockBinlogIO *io.MockBinlogIO
mockAlloc *allocator.MockAllocator
mockMeta *metacache.MockMetaCache
task *levelZeroCompactionTask
dData *storage.DeleteData
@ -61,9 +60,8 @@ type LevelZeroCompactionTaskSuite struct {
func (s *LevelZeroCompactionTaskSuite) SetupTest() {
s.mockAlloc = allocator.NewMockAllocator(s.T())
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockMeta = metacache.NewMockMetaCache(s.T())
// plan of the task is unset
s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, s.mockMeta, nil, nil, nil)
s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil, nil)
pk2ts := map[int64]uint64{
1: 20000,
@ -101,20 +99,19 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchLoadDeltaFail() {
},
{SegmentID: 200, Level: datapb.SegmentLevel_L1},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
},
}
s.task.plan = plan
s.task.tr = timerecord.NewTimeRecorder("test")
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, errors.New("mock download fail")).Twice()
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
targetSegments := lo.Filter(plan.SegmentBinlogs, func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L1
})
@ -154,6 +151,13 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() {
},
}},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
},
}
s.task.plan = plan
@ -170,15 +174,9 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() {
s.task.cm = cm
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Twice()
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
mockAlloc := allocator.NewMockAllocator(s.T())
mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc err"))
s.task.allocator = mockAlloc
targetSegments := lo.Filter(plan.SegmentBinlogs, func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L1
@ -200,7 +198,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{
CollectionID: 1,
SegmentID: 100, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogPath: "a/b/c1", LogSize: 100},
@ -212,7 +211,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
},
},
{
SegmentID: 101, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{
CollectionID: 1,
SegmentID: 101, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogPath: "a/d/c1", LogSize: 100},
@ -223,20 +223,33 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
},
},
},
{SegmentID: 200, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
{
CollectionID: 1,
SegmentID: 200, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
},
}},
{SegmentID: 201, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
{
CollectionID: 1,
SegmentID: 201, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
},
}},
},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
},
}
@ -254,18 +267,6 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
s.task.cm = cm
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything).
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true
})
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).
@ -357,6 +358,13 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
},
}},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
},
}
s.task.plan = plan
@ -373,18 +381,6 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
s.task.cm = cm
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once()
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything).
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true
})
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).
@ -430,11 +426,21 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() {
ctx := context.Background()
plan := &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
},
},
}
s.Run("uploadByCheck directly composeDeltalog failed", func() {
s.SetupTest()
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Once()
s.task.plan = plan
mockAlloc := allocator.NewMockAllocator(s.T())
mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc err"))
s.task.allocator = mockAlloc
segments := map[int64]*storage.DeleteData{100: s.dData}
results := make(map[int64]*datapb.CompactionSegment)
err := s.task.uploadByCheck(ctx, false, segments, results)
@ -444,13 +450,8 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() {
s.Run("uploadByCheck directly Upload failed", func() {
s.SetupTest()
s.task.plan = plan
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(errors.New("mock upload failed"))
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
@ -466,13 +467,8 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() {
s.Run("upload directly", func() {
s.SetupTest()
s.task.plan = plan
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
@ -503,16 +499,11 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() {
})
s.Run("check with upload", func() {
s.task.plan = plan
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
blobPath := path.Join(common.SegmentDeltaLogPath, blobKey)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath)
@ -539,20 +530,17 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() {
}
func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() {
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().
GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockMeta.EXPECT().
GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 101
}), mock.Anything).
Return(nil, false)
plan := &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
},
{
SegmentID: 101,
},
},
}
s.task.plan = plan
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
@ -568,8 +556,13 @@ func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() {
s.NotNil(v)
s.Equal(blobPath, binlog.LogPath)
_, _, err = s.task.composeDeltalog(101, s.dData)
s.Error(err)
kvs, _, err = s.task.composeDeltalog(101, s.dData)
s.NoError(err)
s.Equal(1, len(kvs))
v, ok = kvs[blobPath]
s.True(ok)
s.NotNil(v)
s.Equal(blobPath, binlog.LogPath)
}
func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
@ -684,6 +677,13 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadBF() {
},
}},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
},
}
s.task.plan = plan
@ -698,14 +698,6 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadBF() {
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil)
s.task.cm = cm
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
bfs, err := s.task.loadBF(plan.SegmentBinlogs)
s.NoError(err)
@ -730,18 +722,17 @@ func (s *LevelZeroCompactionTaskSuite) TestFailed() {
},
}},
},
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: false,
},
},
},
}
s.task.plan = plan
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: false,
},
},
})
_, err := s.task.loadBF(plan.SegmentBinlogs)
s.Error(err)
})

View File

@ -30,11 +30,8 @@ import (
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/datanode/importv2"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -205,29 +202,9 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
return merr.Status(err), nil
}
ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannel())
if !ok {
log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channelName", req.GetChannel()))
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil
}
if !node.compactionExecutor.isValidChannel(req.GetChannel()) {
log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channelName", req.GetChannel()))
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil
}
meta := ds.metacache
for _, segment := range req.GetSegmentBinlogs() {
if segment.GetLevel() == datapb.SegmentLevel_L0 {
continue
}
_, ok := meta.GetSegmentByID(segment.GetSegmentID(), metacache.WithSegmentState(commonpb.SegmentState_Flushed))
if !ok {
log.Warn("compaction plan contains segment which is not flushed",
zap.Int64("segmentID", segment.GetSegmentID()),
)
return merr.Status(merr.WrapErrSegmentNotFound(segment.GetSegmentID(), "segment with flushed state not found")), nil
}
if len(req.GetSegmentBinlogs()) == 0 {
log.Info("no segments to compact")
return merr.Success(), nil
}
/*
@ -244,8 +221,6 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
taskCtx,
binlogIO,
node.allocator,
ds.metacache,
node.syncMgr,
node.chunkManager,
req,
)
@ -253,8 +228,6 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
task = compaction.NewMixCompactionTask(
taskCtx,
binlogIO,
ds.metacache,
node.syncMgr,
node.allocator,
req,
)
@ -288,10 +261,6 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
log := log.Ctx(ctx).With(
zap.Int64("planID", req.GetPlanID()),
zap.Int64("nodeID", node.GetNodeID()),
zap.Int64("target segmentID", req.GetCompactedTo()),
zap.Int64s("compacted from", req.GetCompactedFrom()),
zap.Int64("numOfRows", req.GetNumOfRows()),
zap.String("channelName", req.GetChannelName()),
)
log.Info("DataNode receives SyncSegments")
@ -301,32 +270,8 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
return merr.Status(err), nil
}
if len(req.GetCompactedFrom()) <= 0 {
log.Info("SyncSegments with empty compactedFrom, clearing the plan")
node.compactionExecutor.injectDone(req.GetPlanID())
return merr.Success(), nil
}
ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName())
if !ok {
node.compactionExecutor.discardPlan(req.GetChannelName())
err := merr.WrapErrChannelNotFound(req.GetChannelName())
log.Warn("failed to sync segments", zap.Error(err))
return merr.Status(err), nil
}
err := binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), req.GetCompactedTo(), req.GetStatsLogs())
if err != nil {
log.Warn("failed to DecompressBinLog", zap.Error(err))
return merr.Status(err), nil
}
pks, err := loadStats(ctx, node.chunkManager, ds.metacache.Schema(), req.GetCompactedTo(), req.GetStatsLogs())
if err != nil {
log.Warn("failed to load segment statslog", zap.Error(err))
return merr.Status(err), nil
}
bfs := metacache.NewBloomFilterSet(pks...)
ds.metacache.CompactSegments(req.GetCompactedTo(), req.GetPartitionId(), req.GetNumOfRows(), bfs, req.GetCompactedFrom()...)
node.compactionExecutor.injectDone(req.GetPlanID())
// TODO: sheep, add a new DropCompaction interface, deprecate SyncSegments
node.compactionExecutor.removeTask(req.GetPlanID())
return merr.Success(), nil
}

View File

@ -210,50 +210,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
func (s *DataNodeServicesSuite) TestCompaction() {
dmChannelName := "by-dev-rootcoord-dml_0_100v0"
schema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64},
{FieldID: common.StartOfUserFieldID, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"},
{FieldID: common.StartOfUserFieldID + 1, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
}},
},
}
flushedSegmentID := int64(100)
growingSegmentID := int64(101)
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: dmChannelName,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}
err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, vchan, schema, genTestTickler())
s.Require().NoError(err)
fgservice, ok := s.node.flowgraphManager.GetFlowgraphService(dmChannelName)
s.Require().True(ok)
metaCache := metacache.NewMockMetaCache(s.T())
metaCache.EXPECT().Collection().Return(1).Maybe()
metaCache.EXPECT().Schema().Return(schema).Maybe()
s.node.writeBufferManager.Register(dmChannelName, metaCache, nil)
fgservice.metacache.AddSegment(&datapb.SegmentInfo{
ID: flushedSegmentID,
CollectionID: 1,
PartitionID: 2,
StartPosition: &msgpb.MsgPosition{},
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })
fgservice.metacache.AddSegment(&datapb.SegmentInfo{
ID: growingSegmentID,
CollectionID: 1,
PartitionID: 2,
StartPosition: &msgpb.MsgPosition{},
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })
s.Run("service_not_ready", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -269,40 +226,7 @@ func (s *DataNodeServicesSuite) TestCompaction() {
s.False(merr.Ok(resp))
})
s.Run("channel_not_match", func() {
node := s.node
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &datapb.CompactionPlan{
PlanID: 1000,
Channel: dmChannelName + "other",
}
resp, err := node.Compaction(ctx, req)
s.NoError(err)
s.False(merr.Ok(resp))
})
s.Run("channel_dropped", func() {
node := s.node
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node.compactionExecutor.dropped.Insert(dmChannelName)
defer node.compactionExecutor.dropped.Remove(dmChannelName)
req := &datapb.CompactionPlan{
PlanID: 1000,
Channel: dmChannelName,
}
resp, err := node.Compaction(ctx, req)
s.NoError(err)
s.False(merr.Ok(resp))
})
s.Run("compact_growing_segment", func() {
s.Run("unknown CompactionType", func() {
node := s.node
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -312,7 +236,7 @@ func (s *DataNodeServicesSuite) TestCompaction() {
Channel: dmChannelName,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 102, Level: datapb.SegmentLevel_L0},
{SegmentID: growingSegmentID, Level: datapb.SegmentLevel_L1},
{SegmentID: 103, Level: datapb.SegmentLevel_L1},
},
}
@ -506,126 +430,6 @@ func (s *DataNodeServicesSuite) TestGetMetrics() {
zap.String("response", resp.Response))
}
func (s *DataNodeServicesSuite) TestSyncSegments() {
chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1"
schema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64},
{FieldID: common.StartOfUserFieldID, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"},
{FieldID: common.StartOfUserFieldID + 1, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
}},
},
}
err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: chanName,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{100, 200, 300},
}, schema, genTestTickler())
s.Require().NoError(err)
fg, ok := s.node.flowgraphManager.GetFlowgraphService(chanName)
s.Assert().True(ok)
fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 100, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)
fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 101, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)
fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 200, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)
fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 201, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)
fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 300, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)
s.Run("empty compactedFrom", func() {
req := &datapb.SyncSegmentsRequest{
CompactedTo: 400,
NumOfRows: 100,
}
req.CompactedFrom = []UniqueID{}
status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().True(merr.Ok(status))
})
s.Run("invalid compacted from", func() {
req := &datapb.SyncSegmentsRequest{
CompactedTo: 400,
NumOfRows: 100,
CompactedFrom: []UniqueID{101, 201},
}
req.CompactedFrom = []UniqueID{101, 201}
status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().False(merr.Ok(status))
})
s.Run("valid request numRows>0", func() {
req := &datapb.SyncSegmentsRequest{
CompactedFrom: []UniqueID{100, 200, 101, 201},
CompactedTo: 102,
NumOfRows: 100,
ChannelName: chanName,
CollectionId: 1,
}
status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().True(merr.Ok(status))
_, result := fg.metacache.GetSegmentByID(req.GetCompactedTo(), metacache.WithSegmentState(commonpb.SegmentState_Flushed))
s.True(result)
for _, compactFrom := range req.GetCompactedFrom() {
seg, result := fg.metacache.GetSegmentByID(compactFrom, metacache.WithSegmentState(commonpb.SegmentState_Flushed))
s.True(result)
s.Equal(req.CompactedTo, seg.CompactTo())
}
status, err = s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().True(merr.Ok(status))
})
s.Run("without_channel_meta", func() {
fg.metacache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushed),
metacache.WithSegmentIDs(100, 200, 300))
req := &datapb.SyncSegmentsRequest{
CompactedFrom: []int64{100, 200},
CompactedTo: 101,
NumOfRows: 0,
}
status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().False(merr.Ok(status))
})
s.Run("valid_request_with_meta_num=0", func() {
fg.metacache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushed),
metacache.WithSegmentIDs(100, 200, 300))
req := &datapb.SyncSegmentsRequest{
CompactedFrom: []int64{100, 200},
CompactedTo: 301,
NumOfRows: 0,
ChannelName: chanName,
CollectionId: 1,
}
status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().True(merr.Ok(status))
seg, result := fg.metacache.GetSegmentByID(100, metacache.WithSegmentState(commonpb.SegmentState_Flushed))
s.True(result)
s.Equal(metacache.NullSegment, seg.CompactTo())
seg, result = fg.metacache.GetSegmentByID(200, metacache.WithSegmentState(commonpb.SegmentState_Flushed))
s.True(result)
s.Equal(metacache.NullSegment, seg.CompactTo())
_, result = fg.metacache.GetSegmentByID(301, metacache.WithSegmentState(commonpb.SegmentState_Flushed))
s.False(result)
})
}
func (s *DataNodeServicesSuite) TestResendSegmentStats() {
req := &datapb.ResendSegmentStatsRequest{
Base: &commonpb.MsgBase{},

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.30.1. DO NOT EDIT.
package syncmgr
@ -25,39 +25,6 @@ func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter {
return &MockSyncManager_Expecter{mock: &_m.Mock}
}
// Block provides a mock function with given fields: segmentID
func (_m *MockSyncManager) Block(segmentID int64) {
_m.Called(segmentID)
}
// MockSyncManager_Block_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Block'
type MockSyncManager_Block_Call struct {
*mock.Call
}
// Block is a helper method to define mock.On call
// - segmentID int64
func (_e *MockSyncManager_Expecter) Block(segmentID interface{}) *MockSyncManager_Block_Call {
return &MockSyncManager_Block_Call{Call: _e.mock.On("Block", segmentID)}
}
func (_c *MockSyncManager_Block_Call) Run(run func(segmentID int64)) *MockSyncManager_Block_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockSyncManager_Block_Call) Return() *MockSyncManager_Block_Call {
_c.Call.Return()
return _c
}
func (_c *MockSyncManager_Block_Call) RunAndReturn(run func(int64)) *MockSyncManager_Block_Call {
_c.Call.Return(run)
return _c
}
// GetEarliestPosition provides a mock function with given fields: channel
func (_m *MockSyncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
ret := _m.Called(channel)
@ -90,7 +57,7 @@ type MockSyncManager_GetEarliestPosition_Call struct {
}
// GetEarliestPosition is a helper method to define mock.On call
// - channel string
// - channel string
func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call {
return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)}
}
@ -134,8 +101,8 @@ type MockSyncManager_SyncData_Call struct {
}
// SyncData is a helper method to define mock.On call
// - ctx context.Context
// - task Task
// - ctx context.Context
// - task Task
func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call {
return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)}
}
@ -157,39 +124,6 @@ func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context,
return _c
}
// Unblock provides a mock function with given fields: segmentID
func (_m *MockSyncManager) Unblock(segmentID int64) {
_m.Called(segmentID)
}
// MockSyncManager_Unblock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Unblock'
type MockSyncManager_Unblock_Call struct {
*mock.Call
}
// Unblock is a helper method to define mock.On call
// - segmentID int64
func (_e *MockSyncManager_Expecter) Unblock(segmentID interface{}) *MockSyncManager_Unblock_Call {
return &MockSyncManager_Unblock_Call{Call: _e.mock.On("Unblock", segmentID)}
}
func (_c *MockSyncManager_Unblock_Call) Run(run func(segmentID int64)) *MockSyncManager_Unblock_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockSyncManager_Unblock_Call) Return() *MockSyncManager_Unblock_Call {
_c.Call.Return()
return _c
}
func (_c *MockSyncManager_Unblock_Call) RunAndReturn(run func(int64)) *MockSyncManager_Unblock_Call {
_c.Call.Return(run)
return _c
}
// NewMockSyncManager creates a new instance of MockSyncManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockSyncManager(t interface {

View File

@ -40,19 +40,15 @@ type SyncMeta struct {
metacache metacache.MetaCache
}
// SyncMangger is the interface for sync manager.
// SyncManager is the interface for sync manager.
// it processes the sync tasks inside and changes the meta.
//
//go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage
type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task) *conc.Future[struct{}]
// GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel.
GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition)
// Block allows caller to block tasks of provided segment id.
// normally used by compaction task.
// if levelzero delta policy is enabled, this shall be an empty operation.
Block(segmentID int64)
// Unblock is the reverse method for `Block`.
Unblock(segmentID int64)
}
type syncManager struct {
@ -184,11 +180,3 @@ func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPo
})
return segmentID, cp
}
func (mgr *syncManager) Block(segmentID int64) {
mgr.keyLock.Lock(segmentID)
}
func (mgr *syncManager) Unblock(segmentID int64) {
mgr.keyLock.Unlock(segmentID)
}

View File

@ -208,52 +208,6 @@ func (s *SyncManagerSuite) TestCompacted() {
s.EqualValues(1001, segmentID.Load())
}
func (s *SyncManagerSuite) TestBlock() {
sig := make(chan struct{})
counter := atomic.NewInt32(0)
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{
ID: s.segmentID,
}, bfs)
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).
RunAndReturn(func(...metacache.SegmentFilter) []*metacache.SegmentInfo {
return []*metacache.SegmentInfo{seg}
})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(_ metacache.SegmentAction, filters ...metacache.SegmentFilter) {
if counter.Inc() == 2 {
close(sig)
}
})
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)
// block
manager.Block(s.segmentID)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
go manager.SyncData(context.Background(), task)
select {
case <-sig:
s.FailNow("sync task done during block")
case <-time.After(time.Second):
}
manager.Unblock(s.segmentID)
<-sig
}
func (s *SyncManagerSuite) TestResizePool() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

View File

@ -528,6 +528,7 @@ message CompactionPlan {
string channel = 7;
int64 collection_ttl = 8;
int64 total_rows = 9;
schema.CollectionSchema schema = 10;
}
message CompactionSegment {

View File

@ -0,0 +1,47 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compaction
import (
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
type CompactionSuite struct {
integration.MiniClusterSuite
}
func (s *CompactionSuite) SetupSuite() {
s.MiniClusterSuite.SetupSuite()
paramtable.Init()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1")
}
func (s *CompactionSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
}
func TestCompaction(t *testing.T) {
suite.Run(t, new(CompactionSuite))
}

View File

@ -0,0 +1,238 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compaction
import (
"context"
"fmt"
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
func (s *CompactionSuite) TestL0Compaction() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
c := s.Cluster
const (
dim = 128
dbName = ""
rowNum = 100000
deleteCnt = 50000
indexType = integration.IndexFaissIvfFlat
metricType = metric.L2
vecType = schemapb.DataType_FloatVector
)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key)
collectionName := "TestCompaction_" + funcutil.GenRandomStr()
schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, false, vecType)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
// create collection
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: common.DefaultShardsNum,
ConsistencyLevel: commonpb.ConsistencyLevel_Strong,
})
err = merr.CheckRPCCall(createCollectionStatus, err)
s.NoError(err)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
// show collection
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
err = merr.CheckRPCCall(showCollectionsResp, err)
s.NoError(err)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
// insert
pkColumn := integration.NewInt64FieldData(integration.Int64Field, rowNum)
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
})
err = merr.CheckRPCCall(insertResult, err)
s.NoError(err)
s.Equal(int64(rowNum), insertResult.GetInsertCnt())
// flush
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
err = merr.CheckRPCCall(flushResp, err)
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
s.Equal(1, len(segments))
s.Equal(int64(rowNum), segments[0].GetNumOfRows())
// load
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
err = merr.CheckRPCCall(loadStatus, err)
s.NoError(err)
s.WaitForLoad(ctx, collectionName)
// delete
deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{
DbName: dbName,
CollectionName: collectionName,
Expr: fmt.Sprintf("%s < %d", integration.Int64Field, deleteCnt),
})
err = merr.CheckRPCCall(deleteResult, err)
s.NoError(err)
// flush l0
flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
err = merr.CheckRPCCall(flushResp, err)
s.NoError(err)
flushTs, has = flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// query
queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{
DbName: dbName,
CollectionName: collectionName,
Expr: "",
OutputFields: []string{"count(*)"},
})
err = merr.CheckRPCCall(queryResult, err)
s.NoError(err)
s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0])
// wait for l0 compaction completed
showSegments := func() bool {
segments, err = c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
log.Info("ShowSegments result", zap.Any("segments", segments))
flushed := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Flushed
})
if len(flushed) == 1 &&
flushed[0].GetLevel() == datapb.SegmentLevel_L1 &&
flushed[0].GetNumOfRows() == rowNum {
log.Info("l0 compaction done, wait for single compaction")
}
return len(flushed) == 1 &&
flushed[0].GetLevel() == datapb.SegmentLevel_L1 &&
flushed[0].GetNumOfRows() == rowNum-deleteCnt
}
for !showSegments() {
select {
case <-ctx.Done():
s.Fail("waiting for compaction timeout")
return
case <-time.After(1 * time.Second):
}
}
// search
expr := fmt.Sprintf("%s > 0", integration.Int64Field)
nq := 10
topk := 10
roundDecimal := -1
params := integration.GetSearchParams(indexType, metricType)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, vecType, nil, metricType, params, nq, dim, topk, roundDecimal)
searchResult, err := c.Proxy.Search(ctx, searchReq)
err = merr.CheckRPCCall(searchResult, err)
s.NoError(err)
s.Equal(nq*topk, len(searchResult.GetResults().GetScores()))
// query
queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{
DbName: dbName,
CollectionName: collectionName,
Expr: "",
OutputFields: []string{"count(*)"},
})
err = merr.CheckRPCCall(queryResult, err)
s.NoError(err)
s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0])
// release collection
status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
// drop collection
status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
log.Info("Test compaction succeed")
}

View File

@ -0,0 +1,205 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compaction
import (
"context"
"fmt"
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/tests/integration"
)
func (s *CompactionSuite) TestMixCompaction() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
c := s.Cluster
const (
dim = 128
dbName = ""
rowNum = 10000
batch = 1000
indexType = integration.IndexFaissIvfFlat
metricType = metric.L2
vecType = schemapb.DataType_FloatVector
)
collectionName := "TestCompaction_" + funcutil.GenRandomStr()
schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, true, vecType)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
// create collection
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: common.DefaultShardsNum,
ConsistencyLevel: commonpb.ConsistencyLevel_Strong,
})
err = merr.CheckRPCCall(createCollectionStatus, err)
s.NoError(err)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
// show collection
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
err = merr.CheckRPCCall(showCollectionsResp, err)
s.NoError(err)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
for i := 0; i < rowNum/batch; i++ {
// insert
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, batch, dim)
hashKeys := integration.GenerateHashKeys(batch)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(batch),
})
err = merr.CheckRPCCall(insertResult, err)
s.NoError(err)
s.Equal(int64(batch), insertResult.GetInsertCnt())
// flush
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
err = merr.CheckRPCCall(flushResp, err)
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
log.Info("insert done", zap.Int("i", i))
}
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType),
})
err = merr.CheckRPCCall(createIndexStatus, err)
s.NoError(err)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
s.Equal(rowNum/batch, len(segments))
for _, segment := range segments {
log.Info("show segment result", zap.String("segment", segment.String()))
}
// wait for compaction completed
showSegments := func() bool {
segments, err = c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
compactFromSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Dropped
})
compactToSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Flushed
})
log.Info("ShowSegments result", zap.Int("len(compactFromSegments)", len(compactFromSegments)),
zap.Int("len(compactToSegments)", len(compactToSegments)))
return len(compactToSegments) == 1
}
for !showSegments() {
select {
case <-ctx.Done():
s.Fail("waiting for compaction timeout")
return
case <-time.After(1 * time.Second):
}
}
// load
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
err = merr.CheckRPCCall(loadStatus, err)
s.NoError(err)
s.WaitForLoad(ctx, collectionName)
// search
expr := fmt.Sprintf("%s > 0", integration.Int64Field)
nq := 10
topk := 10
roundDecimal := -1
params := integration.GetSearchParams(indexType, metricType)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, vecType, nil, metricType, params, nq, dim, topk, roundDecimal)
searchResult, err := c.Proxy.Search(ctx, searchReq)
err = merr.CheckRPCCall(searchResult, err)
s.NoError(err)
s.Equal(nq*topk, len(searchResult.GetResults().GetScores()))
// query
queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{
DbName: dbName,
CollectionName: collectionName,
Expr: "",
OutputFields: []string{"count(*)"},
})
err = merr.CheckRPCCall(queryResult, err)
s.NoError(err)
s.Equal(int64(rowNum), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0])
// release collection
status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
// drop collection
status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
log.Info("Test compaction succeed")
}