milvus/internal/datanode/iterators/deltalog_iterator.go
XuanYang-cn e6eb6f2c78
enhance: Speed up L0 compaction (#30410)
This PR changes the following to speed up L0 compaction and
prevent OOM:

1. Lower deltabuf limit to 16MB by default, so that each L0 segment
would be 4X smaller than before.
2. Add BatchProcess, use it if memory is sufficient
3. Iterator will Deserialize when called HasNext to avoid massive memory
peek
4. Add tracing in spiltDelta

See also: #30191

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
2024-02-04 10:49:05 +08:00

89 lines
1.7 KiB
Go

package iterator
import (
"sync"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
)
var _ Iterator = (*DeltalogIterator)(nil)
type DeltalogIterator struct {
disposeCh chan struct{}
disposedOnce sync.Once
disposed atomic.Bool
data *storage.DeleteData
blobs []*storage.Blob
label *Label
pos int
}
func NewDeltalogIterator(v [][]byte, label *Label) *DeltalogIterator {
blobs := make([]*storage.Blob, len(v))
for i := range blobs {
blobs[i] = &storage.Blob{Value: v[i]}
}
return &DeltalogIterator{
disposeCh: make(chan struct{}),
blobs: blobs,
label: label,
}
}
func (d *DeltalogIterator) HasNext() bool {
return !d.isDisposed() && d.hasNext()
}
func (d *DeltalogIterator) Next() (*LabeledRowData, error) {
if d.isDisposed() {
return nil, ErrDisposed
}
if !d.hasNext() {
return nil, ErrNoMoreRecord
}
row := &DeltalogRow{
Pk: d.data.Pks[d.pos],
Timestamp: d.data.Tss[d.pos],
}
d.pos++
return NewLabeledRowData(row, d.label), nil
}
func (d *DeltalogIterator) Dispose() {
d.disposed.CompareAndSwap(false, true)
d.disposedOnce.Do(func() {
close(d.disposeCh)
})
}
func (d *DeltalogIterator) hasNext() bool {
if d.data == nil && d.blobs != nil {
reader := storage.NewDeleteCodec()
_, _, dData, err := reader.Deserialize(d.blobs)
if err != nil {
log.Warn("Deltalog iterator failed to deserialize blobs", zap.Error(err))
return false
}
d.data = dData
d.blobs = nil
}
return int64(d.pos) < d.data.RowCount
}
func (d *DeltalogIterator) isDisposed() bool {
return d.disposed.Load()
}
func (d *DeltalogIterator) WaitForDisposed() {
<-d.disposeCh
}