2021-07-07 14:02:01 +08:00
|
|
|
package datacoord
|
|
|
|
|
|
|
|
import (
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
|
|
)
|
|
|
|
|
|
|
|
type SegmentsInfo struct {
|
2021-07-12 17:24:25 +08:00
|
|
|
segments map[UniqueID]*SegmentInfo
|
|
|
|
}
|
|
|
|
|
|
|
|
type SegmentInfo struct {
|
|
|
|
*datapb.SegmentInfo
|
|
|
|
currRows int64
|
|
|
|
allocations []*Allocation
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
|
|
|
|
return &SegmentInfo{
|
|
|
|
SegmentInfo: info,
|
|
|
|
currRows: 0,
|
|
|
|
allocations: make([]*Allocation, 0, 16),
|
|
|
|
}
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewSegmentsInfo() *SegmentsInfo {
|
2021-07-12 17:24:25 +08:00
|
|
|
return &SegmentsInfo{segments: make(map[UniqueID]*SegmentInfo)}
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment, ok := s.segments[segmentID]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return segment
|
|
|
|
}
|
|
|
|
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
|
|
|
|
segments := make([]*SegmentInfo, 0, len(s.segments))
|
2021-07-07 14:02:01 +08:00
|
|
|
for _, segment := range s.segments {
|
|
|
|
segments = append(segments, segment)
|
|
|
|
}
|
|
|
|
return segments
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
|
|
|
|
delete(s.segments, segmentID)
|
|
|
|
}
|
|
|
|
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
s.segments[segmentID] = segment
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) SetRowCount(segmentID UniqueID, rowCount int64) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = s.ShadowClone(segment, SetRowCount(rowCount))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) SetLasteExpiraTime(segmentID UniqueID, expireTs Timestamp) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = s.ShadowClone(segment, SetExpireTime(expireTs))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) SetState(segmentID UniqueID, state commonpb.SegmentState) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = s.ShadowClone(segment, SetState(state))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) SetDmlPositino(segmentID UniqueID, pos *internalpb.MsgPosition) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = s.Clone(segment, SetDmlPositino(pos))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) SetStartPosition(segmentID UniqueID, pos *internalpb.MsgPosition) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = s.Clone(segment, SetStartPosition(pos))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = s.ShadowClone(segment, SetAllocations(allocations))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) AddAllocation(segmentID UniqueID, allocation *Allocation) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = s.Clone(segment, AddAllocation(allocation))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) SetCurrentRows(segmentID UniqueID, rows int64) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = s.ShadowClone(segment, SetCurrentRows(rows))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SegmentsInfo) Clone(segment *SegmentInfo, opts ...SegmentInfoOption) *SegmentInfo {
|
|
|
|
info := proto.Clone(segment.SegmentInfo).(*datapb.SegmentInfo)
|
|
|
|
cloned := &SegmentInfo{
|
|
|
|
SegmentInfo: info,
|
|
|
|
currRows: segment.currRows,
|
|
|
|
allocations: segment.allocations,
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(cloned)
|
|
|
|
}
|
|
|
|
return cloned
|
|
|
|
}
|
|
|
|
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) ShadowClone(segment *SegmentInfo, opts ...SegmentInfoOption) *SegmentInfo {
|
|
|
|
cloned := &SegmentInfo{
|
|
|
|
SegmentInfo: segment.SegmentInfo,
|
|
|
|
currRows: segment.currRows,
|
|
|
|
allocations: segment.allocations,
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(cloned)
|
|
|
|
}
|
|
|
|
return cloned
|
|
|
|
}
|
|
|
|
|
2021-07-12 17:24:25 +08:00
|
|
|
type SegmentInfoOption func(segment *SegmentInfo)
|
2021-07-07 14:02:01 +08:00
|
|
|
|
|
|
|
func SetRowCount(rowCount int64) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.NumOfRows = rowCount
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func SetExpireTime(expireTs Timestamp) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.LastExpireTime = expireTs
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func SetState(state commonpb.SegmentState) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.State = state
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func SetDmlPositino(pos *internalpb.MsgPosition) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.DmlPosition = pos
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func SetStartPosition(pos *internalpb.MsgPosition) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.StartPosition = pos
|
|
|
|
}
|
|
|
|
}
|
2021-07-12 17:24:25 +08:00
|
|
|
|
|
|
|
func SetAllocations(allocations []*Allocation) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.allocations = allocations
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func AddAllocation(allocation *Allocation) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.allocations = append(segment.allocations, allocation)
|
|
|
|
segment.LastExpireTime = allocation.expireTime
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func SetCurrentRows(rows int64) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.currRows = rows
|
|
|
|
}
|
|
|
|
}
|