mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-05 05:18:52 +08:00
e6eb6f2c78
This PR changes the following to speed up L0 compaction and prevent OOM: 1. Lower deltabuf limit to 16MB by default, so that each L0 segment would be 4X smaller than before. 2. Add BatchProcess, use it if memory is sufficient 3. Iterator will Deserialize when called HasNext to avoid massive memory peek 4. Add tracing in spiltDelta See also: #30191 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>
69 lines
1.4 KiB
Go
69 lines
1.4 KiB
Go
package iterator
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/suite"
|
|
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
)
|
|
|
|
func TestDeltalogIteratorSuite(t *testing.T) {
|
|
suite.Run(t, new(DeltalogIteratorSuite))
|
|
}
|
|
|
|
type DeltalogIteratorSuite struct {
|
|
suite.Suite
|
|
}
|
|
|
|
func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() {
|
|
s.Run("invalid blobs", func() {
|
|
iter := NewDeltalogIterator([][]byte{}, nil)
|
|
|
|
s.NotNil(iter)
|
|
s.False(iter.HasNext())
|
|
})
|
|
|
|
testpks := []int64{1, 2, 3, 4}
|
|
testtss := []uint64{43757345, 43757346, 43757347, 43757348}
|
|
|
|
dData := &storage.DeleteData{}
|
|
for i := range testpks {
|
|
dData.Append(storage.NewInt64PrimaryKey(testpks[i]), testtss[i])
|
|
}
|
|
|
|
dCodec := storage.NewDeleteCodec()
|
|
blob, err := dCodec.Serialize(CollectionID, 1, 1, dData)
|
|
s.Require().NoError(err)
|
|
value := [][]byte{blob.Value[:]}
|
|
|
|
iter := NewDeltalogIterator(value, &Label{segmentID: 100})
|
|
s.NotNil(iter)
|
|
|
|
var (
|
|
gotpks = []int64{}
|
|
gottss = []uint64{}
|
|
)
|
|
|
|
for iter.HasNext() {
|
|
labeled, err := iter.Next()
|
|
s.NoError(err)
|
|
|
|
s.Equal(labeled.GetSegmentID(), int64(100))
|
|
gotpks = append(gotpks, labeled.GetPk().GetValue().(int64))
|
|
gottss = append(gottss, labeled.GetTimestamp())
|
|
}
|
|
|
|
s.ElementsMatch(gotpks, testpks)
|
|
s.ElementsMatch(gottss, testtss)
|
|
|
|
_, err = iter.Next()
|
|
s.ErrorIs(err, ErrNoMoreRecord)
|
|
|
|
iter.Dispose()
|
|
iter.WaitForDisposed()
|
|
|
|
_, err = iter.Next()
|
|
s.ErrorIs(err, ErrDisposed)
|
|
}
|