2023-11-04 12:10:17 +08:00
|
|
|
package writebuffer
|
|
|
|
|
|
|
|
import (
|
2023-12-20 15:00:52 +08:00
|
|
|
"math/rand"
|
2023-11-04 12:10:17 +08:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/samber/lo"
|
2023-11-15 15:24:18 +08:00
|
|
|
"go.uber.org/atomic"
|
2023-11-04 12:10:17 +08:00
|
|
|
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
2023-11-17 21:46:20 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
2023-11-04 12:10:17 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
|
|
)
|
|
|
|
|
2023-11-27 19:48:26 +08:00
|
|
|
type SyncPolicy interface {
|
|
|
|
SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64
|
|
|
|
Reason() string
|
|
|
|
}
|
|
|
|
|
|
|
|
type SelectSegmentFunc func(buffer []*segmentBuffer, ts typeutil.Timestamp) []int64
|
|
|
|
|
|
|
|
type SelectSegmentFnPolicy struct {
|
|
|
|
fn SelectSegmentFunc
|
|
|
|
reason string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f SelectSegmentFnPolicy) SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
|
|
|
|
return f.fn(buffers, ts)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f SelectSegmentFnPolicy) Reason() string { return f.reason }
|
|
|
|
|
|
|
|
func wrapSelectSegmentFuncPolicy(fn SelectSegmentFunc, reason string) SelectSegmentFnPolicy {
|
|
|
|
return SelectSegmentFnPolicy{
|
|
|
|
fn: fn,
|
|
|
|
reason: reason,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetFullBufferPolicy() SyncPolicy {
|
|
|
|
return wrapSelectSegmentFuncPolicy(
|
|
|
|
func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
|
|
|
|
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
|
|
|
|
return buf.segmentID, buf.IsFull()
|
|
|
|
})
|
|
|
|
}, "buffer full")
|
|
|
|
}
|
2023-11-04 12:10:17 +08:00
|
|
|
|
2023-11-27 19:48:26 +08:00
|
|
|
func GetCompactedSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
|
|
|
|
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
|
|
|
|
segmentIDs := lo.Map(buffers, func(buffer *segmentBuffer, _ int) int64 { return buffer.segmentID })
|
|
|
|
return meta.GetSegmentIDsBy(metacache.WithSegmentIDs(segmentIDs...), metacache.WithCompacted())
|
|
|
|
}, "segment compacted")
|
2023-11-04 12:10:17 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func GetSyncStaleBufferPolicy(staleDuration time.Duration) SyncPolicy {
|
2023-11-27 19:48:26 +08:00
|
|
|
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
|
2023-11-04 12:10:17 +08:00
|
|
|
current := tsoutil.PhysicalTime(ts)
|
|
|
|
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
|
|
|
|
minTs := buf.MinTimestamp()
|
|
|
|
start := tsoutil.PhysicalTime(minTs)
|
2023-12-20 15:00:52 +08:00
|
|
|
jitter := time.Duration(rand.Float64() * 0.1 * float64(staleDuration))
|
|
|
|
return buf.segmentID, current.Sub(start) > staleDuration+jitter
|
2023-11-04 12:10:17 +08:00
|
|
|
})
|
2023-11-27 19:48:26 +08:00
|
|
|
}, "buffer stale")
|
2023-11-04 12:10:17 +08:00
|
|
|
}
|
|
|
|
|
2024-01-24 14:19:00 +08:00
|
|
|
func GetSealedSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
|
2023-11-27 19:48:26 +08:00
|
|
|
return wrapSelectSegmentFuncPolicy(func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 {
|
2024-01-24 14:19:00 +08:00
|
|
|
ids := meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Sealed))
|
|
|
|
meta.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing),
|
|
|
|
metacache.WithSegmentIDs(ids...), metacache.WithSegmentState(commonpb.SegmentState_Sealed))
|
|
|
|
return ids
|
2023-11-27 19:48:26 +08:00
|
|
|
}, "segment flushing")
|
2023-11-04 12:10:17 +08:00
|
|
|
}
|
2023-11-15 15:24:18 +08:00
|
|
|
|
|
|
|
func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) SyncPolicy {
|
2023-11-27 19:48:26 +08:00
|
|
|
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
|
2023-11-15 15:24:18 +08:00
|
|
|
flushTs := flushTimestamp.Load()
|
|
|
|
if flushTs != nonFlushTS && ts >= flushTs {
|
|
|
|
// flush segment start pos < flushTs && checkpoint > flushTs
|
|
|
|
ids := lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
|
|
|
|
seg, ok := meta.GetSegmentByID(buf.segmentID)
|
|
|
|
if !ok {
|
|
|
|
return buf.segmentID, false
|
|
|
|
}
|
2023-11-17 21:46:20 +08:00
|
|
|
inRange := seg.State() == commonpb.SegmentState_Flushed ||
|
|
|
|
seg.Level() == datapb.SegmentLevel_L0
|
|
|
|
return buf.segmentID, inRange && buf.MinTimestamp() < flushTs
|
2023-11-15 15:24:18 +08:00
|
|
|
})
|
|
|
|
|
|
|
|
// flush all buffer
|
|
|
|
return ids
|
|
|
|
}
|
|
|
|
return nil
|
2023-11-27 19:48:26 +08:00
|
|
|
}, "flush ts")
|
2023-11-15 15:24:18 +08:00
|
|
|
}
|