feat: Add levelzero compaction in DN (#28470)

See also: #27606

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2023-11-30 14:30:28 +08:00 committed by GitHub
parent d69440524b
commit aae7e62729
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1215 additions and 37 deletions

View File

@ -437,6 +437,7 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage
generate-mockery-metastore: getdeps generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks $(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks

View File

@ -54,7 +54,6 @@ type iterator = storage.Iterator
type compactor interface { type compactor interface {
complete() complete()
// compact() (*datapb.CompactionResult, error)
compact() (*datapb.CompactionPlanResult, error) compact() (*datapb.CompactionPlanResult, error)
injectDone() injectDone()
stop() stop()
@ -81,7 +80,6 @@ type compactionTask struct {
done chan struct{} done chan struct{}
tr *timerecord.TimeRecorder tr *timerecord.TimeRecorder
chunkManager storage.ChunkManager
} }
func newCompactionTask( func newCompactionTask(
@ -92,7 +90,6 @@ func newCompactionTask(
syncMgr syncmgr.SyncManager, syncMgr syncmgr.SyncManager,
alloc allocator.Allocator, alloc allocator.Allocator,
plan *datapb.CompactionPlan, plan *datapb.CompactionPlan,
chunkManager storage.ChunkManager,
) *compactionTask { ) *compactionTask {
ctx1, cancel := context.WithCancel(ctx) ctx1, cancel := context.WithCancel(ctx)
return &compactionTask{ return &compactionTask{
@ -105,8 +102,7 @@ func newCompactionTask(
metaCache: metaCache, metaCache: metaCache,
Allocator: alloc, Allocator: alloc,
plan: plan, plan: plan,
tr: timerecord.NewTimeRecorder("compactionTask"), tr: timerecord.NewTimeRecorder("levelone compaction"),
chunkManager: chunkManager,
done: make(chan struct{}, 1), done: make(chan struct{}, 1),
} }
} }

View File

@ -965,7 +965,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Channel: "channelname", Channel: "channelname",
} }
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan, nil) task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan)
result, err := task.compact() result, err := task.compact()
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, result) assert.NotNil(t, result)
@ -1103,7 +1103,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Channel: "channelname", Channel: "channelname",
} }
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan, nil) task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan)
result, err := task.compact() result, err := task.compact()
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, result) assert.NotNil(t, result)

View File

@ -0,0 +1,80 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package io
import (
"context"
"path"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/retry"
)
type BinlogIO interface {
Download(ctx context.Context, paths []string) ([][]byte, error)
Upload(ctx context.Context, kvs map[string][]byte) error
// JoinFullPath returns the full path by join the paths with the chunkmanager's rootpath
JoinFullPath(paths ...string) string
}
type BinlogIoImpl struct {
storage.ChunkManager
pool *conc.Pool[any]
}
func NewBinlogIO(cm storage.ChunkManager, ioPool *conc.Pool[any]) BinlogIO {
return &BinlogIoImpl{cm, ioPool}
}
func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, error) {
future := b.pool.Submit(func() (any, error) {
var vs [][]byte
var err error
err = retry.Do(ctx, func() error {
vs, err = b.MultiRead(ctx, paths)
return err
})
return vs, err
})
vs, err := future.Await()
if err != nil {
return nil, err
}
return vs.([][]byte), nil
}
func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error {
future := b.pool.Submit(func() (any, error) {
err := retry.Do(ctx, func() error {
return b.MultiWrite(ctx, kvs)
})
return nil, err
})
_, err := future.Await()
return err
}
func (b *BinlogIoImpl) JoinFullPath(paths ...string) string {
return path.Join(b.ChunkManager.RootPath(), path.Join(paths...))
}

View File

@ -0,0 +1,73 @@
package io
import (
"path"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"golang.org/x/net/context"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/conc"
)
const binlogIOTestDir = "/tmp/milvus_test/binlog_io"
func TestBinlogIO(t *testing.T) {
suite.Run(t, new(BinlogIOSuite))
}
type BinlogIOSuite struct {
suite.Suite
cm storage.ChunkManager
b BinlogIO
}
func (s *BinlogIOSuite) SetupTest() {
pool := conc.NewDefaultPool[any]()
s.cm = storage.NewLocalChunkManager(storage.RootPath(binlogIOTestDir))
s.b = NewBinlogIO(s.cm, pool)
}
func (s *BinlogIOSuite) TeardownTest() {
ctx := context.Background()
s.cm.RemoveWithPrefix(ctx, s.cm.RootPath())
}
func (s *BinlogIOSuite) TestUploadDownload() {
kvs := map[string][]byte{
path.Join(binlogIOTestDir, "a/b/c"): {1, 255, 255},
path.Join(binlogIOTestDir, "a/b/d"): {1, 255, 255},
}
ctx := context.Background()
err := s.b.Upload(ctx, kvs)
s.NoError(err)
vs, err := s.b.Download(ctx, lo.Keys(kvs))
s.NoError(err)
s.ElementsMatch(lo.Values(kvs), vs)
}
func (s *BinlogIOSuite) TestJoinFullPath() {
tests := []struct {
description string
inPaths []string
outPath string
}{
{"no input", nil, path.Join(binlogIOTestDir)},
{"input one", []string{"a"}, path.Join(binlogIOTestDir, "a")},
{"input two", []string{"a", "b"}, path.Join(binlogIOTestDir, "a/b")},
}
for _, test := range tests {
s.Run(test.description, func() {
out := s.b.JoinFullPath(test.inPaths...)
s.Equal(test.outPath, out)
})
}
}

View File

@ -0,0 +1,189 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package io
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockBinlogIO is an autogenerated mock type for the BinlogIO type
type MockBinlogIO struct {
mock.Mock
}
type MockBinlogIO_Expecter struct {
mock *mock.Mock
}
func (_m *MockBinlogIO) EXPECT() *MockBinlogIO_Expecter {
return &MockBinlogIO_Expecter{mock: &_m.Mock}
}
// Download provides a mock function with given fields: ctx, paths
func (_m *MockBinlogIO) Download(ctx context.Context, paths []string) ([][]byte, error) {
ret := _m.Called(ctx, paths)
var r0 [][]byte
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, []string) ([][]byte, error)); ok {
return rf(ctx, paths)
}
if rf, ok := ret.Get(0).(func(context.Context, []string) [][]byte); ok {
r0 = rf(ctx, paths)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([][]byte)
}
}
if rf, ok := ret.Get(1).(func(context.Context, []string) error); ok {
r1 = rf(ctx, paths)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockBinlogIO_Download_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Download'
type MockBinlogIO_Download_Call struct {
*mock.Call
}
// Download is a helper method to define mock.On call
// - ctx context.Context
// - paths []string
func (_e *MockBinlogIO_Expecter) Download(ctx interface{}, paths interface{}) *MockBinlogIO_Download_Call {
return &MockBinlogIO_Download_Call{Call: _e.mock.On("Download", ctx, paths)}
}
func (_c *MockBinlogIO_Download_Call) Run(run func(ctx context.Context, paths []string)) *MockBinlogIO_Download_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]string))
})
return _c
}
func (_c *MockBinlogIO_Download_Call) Return(_a0 [][]byte, _a1 error) *MockBinlogIO_Download_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockBinlogIO_Download_Call) RunAndReturn(run func(context.Context, []string) ([][]byte, error)) *MockBinlogIO_Download_Call {
_c.Call.Return(run)
return _c
}
// JoinFullPath provides a mock function with given fields: paths
func (_m *MockBinlogIO) JoinFullPath(paths ...string) string {
_va := make([]interface{}, len(paths))
for _i := range paths {
_va[_i] = paths[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 string
if rf, ok := ret.Get(0).(func(...string) string); ok {
r0 = rf(paths...)
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockBinlogIO_JoinFullPath_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'JoinFullPath'
type MockBinlogIO_JoinFullPath_Call struct {
*mock.Call
}
// JoinFullPath is a helper method to define mock.On call
// - paths ...string
func (_e *MockBinlogIO_Expecter) JoinFullPath(paths ...interface{}) *MockBinlogIO_JoinFullPath_Call {
return &MockBinlogIO_JoinFullPath_Call{Call: _e.mock.On("JoinFullPath",
append([]interface{}{}, paths...)...)}
}
func (_c *MockBinlogIO_JoinFullPath_Call) Run(run func(paths ...string)) *MockBinlogIO_JoinFullPath_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]string, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(string)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockBinlogIO_JoinFullPath_Call) Return(_a0 string) *MockBinlogIO_JoinFullPath_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBinlogIO_JoinFullPath_Call) RunAndReturn(run func(...string) string) *MockBinlogIO_JoinFullPath_Call {
_c.Call.Return(run)
return _c
}
// Upload provides a mock function with given fields: ctx, kvs
func (_m *MockBinlogIO) Upload(ctx context.Context, kvs map[string][]byte) error {
ret := _m.Called(ctx, kvs)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) error); ok {
r0 = rf(ctx, kvs)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockBinlogIO_Upload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Upload'
type MockBinlogIO_Upload_Call struct {
*mock.Call
}
// Upload is a helper method to define mock.On call
// - ctx context.Context
// - kvs map[string][]byte
func (_e *MockBinlogIO_Expecter) Upload(ctx interface{}, kvs interface{}) *MockBinlogIO_Upload_Call {
return &MockBinlogIO_Upload_Call{Call: _e.mock.On("Upload", ctx, kvs)}
}
func (_c *MockBinlogIO_Upload_Call) Run(run func(ctx context.Context, kvs map[string][]byte)) *MockBinlogIO_Upload_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(map[string][]byte))
})
return _c
}
func (_c *MockBinlogIO_Upload_Call) Return(_a0 error) *MockBinlogIO_Upload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBinlogIO_Upload_Call) RunAndReturn(run func(context.Context, map[string][]byte) error) *MockBinlogIO_Upload_Call {
_c.Call.Return(run)
return _c
}
// NewMockBinlogIO creates a new instance of MockBinlogIO. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockBinlogIO(t interface {
mock.TestingT
Cleanup(func())
}) *MockBinlogIO {
mock := &MockBinlogIO{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -74,7 +74,7 @@ func (i *BinlogIterator) Next() (*LabeledRowData, error) {
row := &InsertRow{ row := &InsertRow{
ID: i.data.Data[common.RowIDField].GetRow(i.pos).(int64), ID: i.data.Data[common.RowIDField].GetRow(i.pos).(int64),
Timestamp: uint64(i.data.Data[common.TimeStampField].GetRow(i.pos).(int64)), Timestamp: uint64(i.data.Data[common.TimeStampField].GetRow(i.pos).(int64)),
PK: pk, Pk: pk,
Value: fields, Value: fields,
} }
i.pos++ i.pos++

View File

@ -79,10 +79,16 @@ func (s *InsertBinlogIteratorSuite) TestBinlogIterator() {
rows = append(rows, labeled.data) rows = append(rows, labeled.data)
label := labeled.GetLabel()
s.NotNil(label)
s.EqualValues(19530, label.segmentID)
s.EqualValues(19530, labeled.GetSegmentID())
insertRow, ok := labeled.data.(*InsertRow) insertRow, ok := labeled.data.(*InsertRow)
s.True(ok) s.True(ok)
s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.PK.GetValue().(int64)) s.EqualValues(insertData.Data[TimestampField].GetRow(idx).(int64), labeled.GetTimestamp())
s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), labeled.GetPk().GetValue().(int64))
s.Equal(insertData.Data[RowIDField].GetRow(idx).(int64), insertRow.ID) s.Equal(insertData.Data[RowIDField].GetRow(idx).(int64), insertRow.ID)
s.Equal(insertData.Data[BoolField].GetRow(idx).(bool), insertRow.Value[BoolField].(bool)) s.Equal(insertData.Data[BoolField].GetRow(idx).(bool), insertRow.Value[BoolField].(bool))
s.Equal(insertData.Data[Int8Field].GetRow(idx).(int8), insertRow.Value[Int8Field].(int8)) s.Equal(insertData.Data[Int8Field].GetRow(idx).(int8), insertRow.Value[Int8Field].(int8))

View File

@ -17,6 +17,12 @@ type DeltalogIteratorSuite struct {
} }
func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() { func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() {
s.Run("invalid blobs", func() {
iter, err := NewDeltalogIterator([][]byte{}, nil)
s.Error(err)
s.Nil(iter)
})
testpks := []int64{1, 2, 3, 4} testpks := []int64{1, 2, 3, 4}
testtss := []uint64{43757345, 43757346, 43757347, 43757348} testtss := []uint64{43757345, 43757346, 43757347, 43757348}
@ -43,8 +49,8 @@ func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() {
s.NoError(err) s.NoError(err)
s.Equal(labeled.GetSegmentID(), int64(100)) s.Equal(labeled.GetSegmentID(), int64(100))
gotpks = append(gotpks, labeled.data.(*DeltalogRow).Pk.GetValue().(int64)) gotpks = append(gotpks, labeled.GetPk().GetValue().(int64))
gottss = append(gottss, labeled.data.(*DeltalogRow).Timestamp) gottss = append(gottss, labeled.GetTimestamp())
} }
s.ElementsMatch(gotpks, testpks) s.ElementsMatch(gotpks, testpks)

View File

@ -16,20 +16,39 @@ var (
const InvalidID int64 = -1 const InvalidID int64 = -1
type Row interface{} type Row interface {
GetPk() storage.PrimaryKey
GetTimestamp() uint64
}
type InsertRow struct { type InsertRow struct {
ID int64 ID int64
PK storage.PrimaryKey Pk storage.PrimaryKey
Timestamp typeutil.Timestamp Timestamp typeutil.Timestamp
Value map[storage.FieldID]interface{} Value map[storage.FieldID]interface{}
} }
func (r *InsertRow) GetPk() storage.PrimaryKey {
return r.Pk
}
func (r *InsertRow) GetTimestamp() uint64 {
return r.Timestamp
}
type DeltalogRow struct { type DeltalogRow struct {
Pk storage.PrimaryKey Pk storage.PrimaryKey
Timestamp typeutil.Timestamp Timestamp typeutil.Timestamp
} }
func (r *DeltalogRow) GetPk() storage.PrimaryKey {
return r.Pk
}
func (r *DeltalogRow) GetTimestamp() uint64 {
return r.Timestamp
}
type Label struct { type Label struct {
segmentID typeutil.UniqueID segmentID typeutil.UniqueID
} }
@ -39,11 +58,19 @@ type LabeledRowData struct {
data Row data Row
} }
func (l *LabeledRowData) GetSegmentID() typeutil.UniqueID { func (l *LabeledRowData) GetLabel() *Label {
if l.label == nil { return l.label
return InvalidID
} }
func (l *LabeledRowData) GetPk() storage.PrimaryKey {
return l.data.GetPk()
}
func (l *LabeledRowData) GetTimestamp() uint64 {
return l.data.GetTimestamp()
}
func (l *LabeledRowData) GetSegmentID() typeutil.UniqueID {
return l.label.segmentID return l.label.segmentID
} }

View File

@ -0,0 +1,346 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
type levelZeroCompactionTask struct {
compactor
io.BinlogIO
allocator allocator.Allocator
metacache metacache.MetaCache
syncmgr syncmgr.SyncManager
plan *datapb.CompactionPlan
ctx context.Context
cancel context.CancelFunc
done chan struct{}
tr *timerecord.TimeRecorder
}
func newLevelZeroCompactionTask(
ctx context.Context,
binlogIO io.BinlogIO,
alloc allocator.Allocator,
metaCache metacache.MetaCache,
syncmgr syncmgr.SyncManager,
plan *datapb.CompactionPlan,
) *levelZeroCompactionTask {
ctx, cancel := context.WithCancel(ctx)
return &levelZeroCompactionTask{
ctx: ctx,
cancel: cancel,
BinlogIO: binlogIO,
allocator: alloc,
metacache: metaCache,
syncmgr: syncmgr,
plan: plan,
tr: timerecord.NewTimeRecorder("levelzero compaction"),
done: make(chan struct{}, 1),
}
}
func (t *levelZeroCompactionTask) complete() {
t.done <- struct{}{}
}
func (t *levelZeroCompactionTask) stop() {
t.cancel()
<-t.done
}
func (t *levelZeroCompactionTask) getPlanID() UniqueID {
return t.plan.GetPlanID()
}
func (t *levelZeroCompactionTask) getChannelName() string {
return t.plan.GetChannel()
}
func (t *levelZeroCompactionTask) getCollection() int64 {
return t.metacache.Collection()
}
// Do nothing for levelzero compaction
func (t *levelZeroCompactionTask) injectDone() {}
func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error) {
log := log.With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String()))
log.Info("L0 compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan()))
if !funcutil.CheckCtxValid(t.ctx) {
log.Warn("compact wrong, task context done or timeout")
return nil, errContext
}
ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()
l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L0
})
targetSegIDs := lo.FilterMap(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) (int64, bool) {
if s.Level == datapb.SegmentLevel_L1 {
return s.GetSegmentID(), true
}
return 0, false
})
if len(targetSegIDs) == 0 {
log.Warn("compact wrong, not target sealed segments")
return nil, errIllegalCompactionPlan
}
var (
totalSize int64
totalDeltalogs = make(map[UniqueID][]string)
)
for _, s := range l0Segments {
paths := []string{}
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
paths = append(paths, l.GetLogPath())
totalSize += l.GetLogSize()
}
}
if len(paths) > 0 {
totalDeltalogs[s.GetSegmentID()] = paths
}
}
// TODO
// batchProcess := func() ([]*datapb.CompactionSegment, error) {
// resultSegments := make(map[int64]*datapb.CompactionSegment)
//
// iters, err := t.loadDelta(ctxTimeout, lo.Values(totalDeltalogs)...)
// if err != nil {
// return nil, err
// }
// log.Info("Batch L0 compaction load delta into memeory", zap.Duration("elapse", t.tr.RecordSpan()))
//
// alteredSegments := make(map[int64]*storage.DeleteData)
// err = t.splitDelta(iters, alteredSegments, targetSegIDs)
// if err != nil {
// return nil, err
// }
// log.Info("Batch L0 compaction split delta into segments", zap.Duration("elapse", t.tr.RecordSpan()))
//
// err = t.uploadByCheck(ctxTimeout, false, alteredSegments, resultSegments)
// log.Info("Batch L0 compaction upload all", zap.Duration("elapse", t.tr.RecordSpan()))
//
// return lo.Values(resultSegments), nil
// }
linearProcess := func() ([]*datapb.CompactionSegment, error) {
var (
resultSegments = make(map[int64]*datapb.CompactionSegment)
alteredSegments = make(map[int64]*storage.DeleteData)
)
for segID, deltaLogs := range totalDeltalogs {
log := log.With(zap.Int64("levelzero segment", segID))
log.Info("Linear L0 compaction processing segment", zap.Int64s("target segmentIDs", targetSegIDs))
allIters, err := t.loadDelta(ctxTimeout, deltaLogs)
if err != nil {
log.Warn("Linear L0 compaction loadDelta fail", zap.Error(err))
return nil, err
}
err = t.splitDelta(allIters, alteredSegments, targetSegIDs)
if err != nil {
log.Warn("Linear L0 compaction splitDelta fail", zap.Error(err))
return nil, err
}
err = t.uploadByCheck(ctxTimeout, true, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload buffer fail", zap.Error(err))
return nil, err
}
}
err := t.uploadByCheck(ctxTimeout, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload all buffer fail", zap.Error(err))
return nil, err
}
log.Warn("Linear L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
return lo.Values(resultSegments), nil
}
var (
resultSegments []*datapb.CompactionSegment
err error
)
// if totalSize*3 < int64(hardware.GetFreeMemoryCount()) {
// resultSegments, err = batchProcess()
// }
resultSegments, err = linearProcess()
if err != nil {
return nil, err
}
result := &datapb.CompactionPlanResult{
PlanID: t.plan.GetPlanID(),
State: commonpb.CompactionState_Completed,
Segments: resultSegments,
Channel: t.plan.GetChannel(),
}
log.Info("L0 compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()))
return result, nil
}
func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*iter.DeltalogIterator, error) {
allIters := make([]*iter.DeltalogIterator, 0)
for _, paths := range deltaLogs {
blobs, err := t.Download(ctx, paths)
if err != nil {
return nil, err
}
deltaIter, err := iter.NewDeltalogIterator(blobs, nil)
if err != nil {
return nil, err
}
allIters = append(allIters, deltaIter)
}
return allIters, nil
}
func (t *levelZeroCompactionTask) splitDelta(
allIters []*iter.DeltalogIterator,
targetSegBuffer map[int64]*storage.DeleteData,
targetSegIDs []int64,
) error {
// spilt all delete data to segments
for _, deltaIter := range allIters {
for deltaIter.HasNext() {
labeled, err := deltaIter.Next()
if err != nil {
return err
}
predicted, found := t.metacache.PredictSegments(labeled.GetPk(), metacache.WithSegmentIDs(targetSegIDs...))
if !found {
continue
}
for _, gotSeg := range predicted {
delBuffer, ok := targetSegBuffer[gotSeg]
if !ok {
delBuffer = &storage.DeleteData{}
targetSegBuffer[gotSeg] = delBuffer
}
delBuffer.Append(labeled.GetPk(), labeled.GetTimestamp())
}
}
}
return nil
}
func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storage.DeleteData) (map[string][]byte, *datapb.Binlog, error) {
var (
collID = t.metacache.Collection()
uploadKv = make(map[string][]byte)
)
seg, ok := t.metacache.GetSegmentByID(segmentID)
if !ok {
return nil, nil, merr.WrapErrSegmentLack(segmentID)
}
blob, err := storage.NewDeleteCodec().Serialize(collID, seg.PartitionID(), segmentID, dData)
if err != nil {
return nil, nil, err
}
logID, err := t.allocator.AllocOne()
if err != nil {
return nil, nil, err
}
blobKey := metautil.JoinIDPath(collID, seg.PartitionID(), segmentID, logID)
blobPath := t.BinlogIO.JoinFullPath(common.SegmentDeltaLogPath, blobKey)
uploadKv[blobPath] = blob.GetValue()
// TODO Timestamp?
deltalog := &datapb.Binlog{
LogSize: int64(len(blob.GetValue())),
LogPath: blobPath,
LogID: logID,
}
return uploadKv, deltalog, nil
}
func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireCheck bool, alteredSegments map[int64]*storage.DeleteData, resultSegments map[int64]*datapb.CompactionSegment) error {
for segID, dData := range alteredSegments {
if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) {
blobs, binlog, err := t.composeDeltalog(segID, dData)
if err != nil {
return err
}
err = t.Upload(ctx, blobs)
if err != nil {
return err
}
if _, ok := resultSegments[segID]; !ok {
resultSegments[segID] = &datapb.CompactionSegment{
SegmentID: segID,
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}},
Channel: t.plan.GetChannel(),
}
} else {
resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog)
}
delete(alteredSegments, segID)
}
}
return nil
}

View File

@ -0,0 +1,428 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"path"
"testing"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
func TestLevelZeroCompactionTaskSuite(t *testing.T) {
suite.Run(t, new(LevelZeroCompactionTaskSuite))
}
type LevelZeroCompactionTaskSuite struct {
suite.Suite
mockBinlogIO *io.MockBinlogIO
mockAlloc *allocator.MockAllocator
mockMeta *metacache.MockMetaCache
task *levelZeroCompactionTask
dData *storage.DeleteData
dBlob []byte
}
func (s *LevelZeroCompactionTaskSuite) SetupTest() {
s.mockAlloc = allocator.NewMockAllocator(s.T())
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockMeta = metacache.NewMockMetaCache(s.T())
// plan of the task is unset
s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, s.mockMeta, nil, nil)
pk2ts := map[int64]uint64{
1: 20000,
2: 20001,
3: 20002,
}
s.dData = storage.NewDeleteData([]storage.PrimaryKey{}, []Timestamp{})
for pk, ts := range pk2ts {
s.dData.Append(storage.NewInt64PrimaryKey(pk), ts)
}
dataCodec := storage.NewDeleteCodec()
blob, err := dataCodec.Serialize(0, 0, 0, s.dData)
s.Require().NoError(err)
s.dBlob = blob.GetValue()
}
func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogPath: "a/b/c1", LogSize: 100},
{LogPath: "a/b/c2", LogSize: 100},
{LogPath: "a/b/c3", LogSize: 100},
{LogPath: "a/b/c4", LogSize: 100},
},
},
},
},
{
SegmentID: 101, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogPath: "a/d/c1", LogSize: 100},
{LogPath: "a/d/c2", LogSize: 100},
{LogPath: "a/d/c3", LogSize: 100},
{LogPath: "a/d/c4", LogSize: 100},
},
},
},
},
{SegmentID: 200, Level: datapb.SegmentLevel_L1},
{SegmentID: 201, Level: datapb.SegmentLevel_L1},
},
}
s.task.plan = plan
s.task.tr = timerecord.NewTimeRecorder("test")
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
s.mockMeta.EXPECT().PredictSegments(mock.Anything, mock.Anything).Return([]int64{200, 201}, true)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything).
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true
})
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).
RunAndReturn(func(paths ...string) string {
return path.Join(paths...)
}).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2)
s.Require().Equal(plan.GetPlanID(), s.task.getPlanID())
s.Require().Equal(plan.GetChannel(), s.task.getChannelName())
s.Require().EqualValues(1, s.task.getCollection())
result, err := s.task.compact()
s.NoError(err)
s.NotNil(result)
s.Equal(commonpb.CompactionState_Completed, result.GetState())
s.Equal(plan.GetChannel(), result.GetChannel())
s.Equal(2, len(result.GetSegments()))
s.ElementsMatch([]int64{200, 201},
lo.Map(result.GetSegments(), func(seg *datapb.CompactionSegment, _ int) int64 {
return seg.GetSegmentID()
}))
s.EqualValues(plan.GetPlanID(), result.GetPlanID())
log.Info("test segment results", zap.Any("result", result))
s.task.complete()
s.task.stop()
}
func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
s.T().Skip()
plan := &datapb.CompactionPlan{
PlanID: 19530,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogPath: "a/b/c1", LogSize: 100},
{LogPath: "a/b/c2", LogSize: 100},
{LogPath: "a/b/c3", LogSize: 100},
{LogPath: "a/b/c4", LogSize: 100},
},
},
},
},
{
SegmentID: 101, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogPath: "a/d/c1", LogSize: 100},
{LogPath: "a/d/c2", LogSize: 100},
{LogPath: "a/d/c3", LogSize: 100},
{LogPath: "a/d/c4", LogSize: 100},
},
},
},
},
{SegmentID: 200, Level: datapb.SegmentLevel_L1},
{SegmentID: 201, Level: datapb.SegmentLevel_L1},
},
}
s.task.plan = plan
s.task.tr = timerecord.NewTimeRecorder("test")
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
s.mockMeta.EXPECT().PredictSegments(mock.Anything, mock.Anything).Return([]int64{200, 201}, true)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything).
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true
})
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).
RunAndReturn(func(paths ...string) string {
return path.Join(paths...)
}).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2)
result, err := s.task.compact()
s.NoError(err)
s.NotNil(result)
s.Equal(commonpb.CompactionState_Completed, result.GetState())
s.Equal(plan.GetChannel(), result.GetChannel())
s.Equal(2, len(result.GetSegments()))
s.ElementsMatch([]int64{200, 201},
lo.Map(result.GetSegments(), func(seg *datapb.CompactionSegment, _ int) int64 {
return seg.GetSegmentID()
}))
s.EqualValues(plan.GetPlanID(), result.GetPlanID())
log.Info("test segment results", zap.Any("result", result))
}
func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() {
ctx := context.Background()
s.Run("upload directly", func() {
s.SetupTest()
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
blobPath := path.Join(common.SegmentDeltaLogPath, blobKey)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath)
segments := map[int64]*storage.DeleteData{100: s.dData}
results := make(map[int64]*datapb.CompactionSegment)
err := s.task.uploadByCheck(ctx, false, segments, results)
s.NoError(err)
s.Equal(1, len(results))
seg1, ok := results[100]
s.True(ok)
s.EqualValues(100, seg1.GetSegmentID())
s.Equal(1, len(seg1.GetDeltalogs()))
s.Equal(1, len(seg1.GetDeltalogs()[0].GetBinlogs()))
})
s.Run("check without upload", func() {
s.SetupTest()
segments := map[int64]*storage.DeleteData{100: s.dData}
results := make(map[int64]*datapb.CompactionSegment)
s.Require().Empty(results)
err := s.task.uploadByCheck(ctx, true, segments, results)
s.NoError(err)
s.Empty(results)
})
s.Run("check with upload", func() {
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
blobPath := path.Join(common.SegmentDeltaLogPath, blobKey)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath)
segments := map[int64]*storage.DeleteData{100: s.dData}
results := map[int64]*datapb.CompactionSegment{
100: {SegmentID: 100, Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{LogID: 1}}}}},
}
s.Require().Equal(1, len(results))
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.Key)
err := s.task.uploadByCheck(ctx, true, segments, results)
s.NoError(err)
s.NotEmpty(results)
s.Equal(1, len(results))
seg1, ok := results[100]
s.True(ok)
s.EqualValues(100, seg1.GetSegmentID())
s.Equal(1, len(seg1.GetDeltalogs()))
s.Equal(2, len(seg1.GetDeltalogs()[0].GetBinlogs()))
})
}
func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() {
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().
GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockMeta.EXPECT().
GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 101
}), mock.Anything).
Return(nil, false)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
blobPath := path.Join(common.SegmentDeltaLogPath, blobKey)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath)
kvs, binlog, err := s.task.composeDeltalog(100, s.dData)
s.NoError(err)
s.Equal(1, len(kvs))
v, ok := kvs[blobPath]
s.True(ok)
s.NotNil(v)
s.Equal(blobPath, binlog.LogPath)
_, _, err = s.task.composeDeltalog(101, s.dData)
s.Error(err)
}
func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
predicted := []int64{100, 101, 102}
s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool {
return pk.GetValue().(int64) == 1
}), mock.Anything).Return([]int64{100}, true)
s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool {
return pk.GetValue().(int64) == 2
}), mock.Anything).Return(nil, false)
s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool {
return pk.GetValue().(int64) == 3
}), mock.Anything).Return([]int64{100, 101, 102}, true)
diter, err := iter.NewDeltalogIterator([][]byte{s.dBlob}, nil)
s.Require().NoError(err)
s.Require().NotNil(diter)
targetSegBuffer := make(map[int64]*storage.DeleteData)
targetSegIDs := predicted
err = s.task.splitDelta([]*iter.DeltalogIterator{diter}, targetSegBuffer, targetSegIDs)
s.NoError(err)
s.NotEmpty(targetSegBuffer)
s.ElementsMatch(predicted, lo.Keys(targetSegBuffer))
s.EqualValues(2, targetSegBuffer[100].RowCount)
s.EqualValues(1, targetSegBuffer[101].RowCount)
s.EqualValues(1, targetSegBuffer[102].RowCount)
s.ElementsMatch([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(3)}, targetSegBuffer[100].Pks)
s.Equal(storage.NewInt64PrimaryKey(3), targetSegBuffer[101].Pks[0])
s.Equal(storage.NewInt64PrimaryKey(3), targetSegBuffer[102].Pks[0])
}
func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() {
ctx := context.TODO()
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(
func(paths []string) bool {
return len(paths) > 0 && paths[0] == "correct"
})).Return([][]byte{s.dBlob}, nil).Once()
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(
func(paths []string) bool {
return len(paths) > 0 && paths[0] == "error"
})).Return(nil, errors.New("mock err")).Once()
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(
func(paths []string) bool {
return len(paths) > 0 && paths[0] == "invalid-blobs"
})).Return([][]byte{{1}}, nil).Once()
tests := []struct {
description string
paths []string
expectNilIter bool
expectError bool
}{
{"no error", []string{"correct"}, false, false},
{"download error", []string{"error"}, true, true},
{"new iter error", []string{"invalid-blobs"}, true, true},
}
for _, test := range tests {
iters, err := s.task.loadDelta(ctx, test.paths)
if test.expectNilIter {
s.Nil(iters)
} else {
s.NotNil(iters)
s.Equal(1, len(iters))
s.True(iters[0].HasNext())
iter := iters[0]
var pks []storage.PrimaryKey
var tss []storage.Timestamp
for iter.HasNext() {
labeled, err := iter.Next()
s.NoError(err)
pks = append(pks, labeled.GetPk())
tss = append(tss, labeled.GetTimestamp())
}
s.ElementsMatch(pks, s.dData.Pks)
s.ElementsMatch(tss, s.dData.Tss)
}
if test.expectError {
s.Error(err)
} else {
s.NoError(err)
}
}
}

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -250,19 +251,35 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil
} }
var task compactor
switch req.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
binlogIO := io.NewBinlogIO(node.chunkManager, getOrCreateIOPool())
task = newLevelZeroCompactionTask(
node.ctx,
binlogIO,
node.allocator,
ds.metacache,
node.syncMgr,
req,
)
case datapb.CompactionType_MixCompaction, datapb.CompactionType_MinorCompaction:
// TODO, replace this binlogIO with io.BinlogIO
binlogIO := &binlogIO{node.chunkManager, ds.idAllocator} binlogIO := &binlogIO{node.chunkManager, ds.idAllocator}
task := newCompactionTask( task = newCompactionTask(
node.ctx, node.ctx,
binlogIO, binlogIO, binlogIO, binlogIO,
ds.metacache, ds.metacache,
ds.syncMgr, node.syncMgr,
ds.idAllocator, node.allocator,
req, req,
node.chunkManager,
) )
default:
log.Warn("Unknown compaction type", zap.String("type", req.GetType().String()))
return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil
}
node.compactionExecutor.execute(task) node.compactionExecutor.execute(task)
return merr.Success(), nil return merr.Success(), nil
} }

View File

@ -874,6 +874,15 @@ func (data *DeleteData) Merge(other *DeleteData) {
other.RowCount = 0 other.RowCount = 0
} }
func (data *DeleteData) Size() int64 {
var size int64
for _, pk := range data.Pks {
size += pk.Size()
}
return size
}
// DeleteCodec serializes and deserializes the delete data // DeleteCodec serializes and deserializes the delete data
type DeleteCodec struct{} type DeleteCodec struct{}