2021-10-25 19:44:37 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-10-14 15:44:34 +08:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-10-25 19:44:37 +08:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-10-14 15:44:34 +08:00
|
|
|
//
|
2021-10-25 19:44:37 +08:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-10-18 21:32:49 +08:00
|
|
|
|
2021-07-07 14:02:01 +08:00
|
|
|
package datacoord
|
|
|
|
|
|
|
|
import (
|
2021-08-19 14:08:10 +08:00
|
|
|
"time"
|
|
|
|
|
2021-10-14 15:44:34 +08:00
|
|
|
"github.com/golang/protobuf/proto"
|
2022-10-16 20:49:27 +08:00
|
|
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
2021-07-07 14:02:01 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
|
|
)
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
|
2021-07-07 14:02:01 +08:00
|
|
|
type SegmentsInfo struct {
|
2021-07-12 17:24:25 +08:00
|
|
|
segments map[UniqueID]*SegmentInfo
|
|
|
|
}
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it
|
2021-07-12 17:24:25 +08:00
|
|
|
type SegmentInfo struct {
|
|
|
|
*datapb.SegmentInfo
|
2021-08-19 14:08:10 +08:00
|
|
|
currRows int64
|
|
|
|
allocations []*Allocation
|
|
|
|
lastFlushTime time.Time
|
2021-11-05 22:25:00 +08:00
|
|
|
isCompacting bool
|
2022-06-15 23:14:10 +08:00
|
|
|
// a cache to avoid calculate twice
|
2022-09-20 20:54:50 +08:00
|
|
|
size int64
|
|
|
|
lastWrittenTime time.Time
|
2021-07-12 17:24:25 +08:00
|
|
|
}
|
|
|
|
|
2021-09-08 11:35:59 +08:00
|
|
|
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
|
2022-08-19 19:50:50 +08:00
|
|
|
// assign current rows to last checkpoint and pre-allocate `allocations` slice
|
2021-09-08 11:35:59 +08:00
|
|
|
// Note that the allocation information is not preserved,
|
|
|
|
// the worst case scenario is to have a segment with twice size we expects
|
2021-07-12 17:24:25 +08:00
|
|
|
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
|
|
|
|
return &SegmentInfo{
|
2021-10-20 15:02:36 +08:00
|
|
|
SegmentInfo: info,
|
2022-08-19 19:50:50 +08:00
|
|
|
currRows: info.GetNumOfRows(),
|
2021-10-20 15:02:36 +08:00
|
|
|
allocations: make([]*Allocation, 0, 16),
|
|
|
|
lastFlushTime: time.Now().Add(-1 * flushInterval),
|
2022-09-20 20:54:50 +08:00
|
|
|
// A growing segment from recovery can be also considered idle.
|
|
|
|
lastWrittenTime: getZeroTime(),
|
2021-07-12 17:24:25 +08:00
|
|
|
}
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
|
2021-12-17 18:54:26 +08:00
|
|
|
// NewSegmentsInfo creates a `SegmentsInfo` instance, which makes sure internal map is initialized
|
|
|
|
// note that no mutex is wrapped so external concurrent control is needed
|
2021-07-07 14:02:01 +08:00
|
|
|
func NewSegmentsInfo() *SegmentsInfo {
|
2021-07-12 17:24:25 +08:00
|
|
|
return &SegmentsInfo{segments: make(map[UniqueID]*SegmentInfo)}
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// GetSegment returns SegmentInfo
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment, ok := s.segments[segmentID]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return segment
|
|
|
|
}
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// GetSegments iterates internal map and returns all SegmentInfo in a slice
|
|
|
|
// no deep copy applied
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
|
|
|
|
segments := make([]*SegmentInfo, 0, len(s.segments))
|
2021-07-07 14:02:01 +08:00
|
|
|
for _, segment := range s.segments {
|
|
|
|
segments = append(segments, segment)
|
|
|
|
}
|
|
|
|
return segments
|
|
|
|
}
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// DropSegment deletes provided segmentID
|
|
|
|
// no extra method is taken when segmentID not exists
|
2021-07-07 14:02:01 +08:00
|
|
|
func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
|
|
|
|
delete(s.segments, segmentID)
|
|
|
|
}
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
s.segments[segmentID] = segment
|
|
|
|
}
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// SetRowCount sets rowCount info for SegmentInfo with provided segmentID
|
|
|
|
// if SegmentInfo not found, do nothing
|
2021-07-07 14:02:01 +08:00
|
|
|
func (s *SegmentsInfo) SetRowCount(segmentID UniqueID, rowCount int64) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
2021-09-24 12:53:54 +08:00
|
|
|
s.segments[segmentID] = segment.Clone(SetRowCount(rowCount))
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-03 17:50:03 +08:00
|
|
|
// SetState sets Segment State info for SegmentInfo with provided segmentID
|
2021-09-15 21:07:53 +08:00
|
|
|
// if SegmentInfo not found, do nothing
|
2021-07-07 14:02:01 +08:00
|
|
|
func (s *SegmentsInfo) SetState(segmentID UniqueID, state commonpb.SegmentState) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
2021-09-24 12:53:54 +08:00
|
|
|
s.segments[segmentID] = segment.Clone(SetState(state))
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-26 18:06:54 +08:00
|
|
|
// SetIsImporting sets the import status for a segment.
|
|
|
|
func (s *SegmentsInfo) SetIsImporting(segmentID UniqueID, isImporting bool) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = segment.Clone(SetIsImporting(isImporting))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// SetDmlPosition sets DmlPosition info (checkpoint for recovery) for SegmentInfo with provided segmentID
|
|
|
|
// if SegmentInfo not found, do nothing
|
2021-08-19 13:00:12 +08:00
|
|
|
func (s *SegmentsInfo) SetDmlPosition(segmentID UniqueID, pos *internalpb.MsgPosition) {
|
2021-07-07 14:02:01 +08:00
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
2021-08-19 13:00:12 +08:00
|
|
|
s.segments[segmentID] = segment.Clone(SetDmlPosition(pos))
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-15 21:07:53 +08:00
|
|
|
// SetStartPosition sets StartPosition info (recovery info when no checkout point found) for SegmentInfo with provided segmentID
|
|
|
|
// if SegmentInfo not found, do nothing
|
2021-07-07 14:02:01 +08:00
|
|
|
func (s *SegmentsInfo) SetStartPosition(segmentID UniqueID, pos *internalpb.MsgPosition) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
2021-08-19 13:00:12 +08:00
|
|
|
s.segments[segmentID] = segment.Clone(SetStartPosition(pos))
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-05 15:29:03 +08:00
|
|
|
// SetAllocations sets allocations for segment with specified id
|
|
|
|
// if the segment id is not found, do nothing
|
|
|
|
// uses `ShadowClone` since internal SegmentInfo is not changed
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
2021-08-19 13:00:12 +08:00
|
|
|
s.segments[segmentID] = segment.ShadowClone(SetAllocations(allocations))
|
2021-07-12 17:24:25 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-05 15:29:03 +08:00
|
|
|
// AddAllocation adds a new allocation to specified segment
|
|
|
|
// if the segment is not found, do nothing
|
|
|
|
// uses `Clone` since internal SegmentInfo's LastExpireTime is changed
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) AddAllocation(segmentID UniqueID, allocation *Allocation) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
2021-08-19 13:00:12 +08:00
|
|
|
s.segments[segmentID] = segment.Clone(AddAllocation(allocation))
|
2021-07-12 17:24:25 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-05 22:28:38 +08:00
|
|
|
// SetCurrentRows sets rows count for segment
|
2021-10-06 17:20:07 +08:00
|
|
|
// if the segment is not found, do nothing
|
|
|
|
// uses `ShadowClone` since internal SegmentInfo is not changed
|
2021-07-12 17:24:25 +08:00
|
|
|
func (s *SegmentsInfo) SetCurrentRows(segmentID UniqueID, rows int64) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
2021-08-19 13:00:12 +08:00
|
|
|
s.segments[segmentID] = segment.ShadowClone(SetCurrentRows(rows))
|
2021-07-12 17:24:25 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-05 22:34:49 +08:00
|
|
|
// SetBinlogs sets binlog paths for segment
|
2021-10-06 17:20:07 +08:00
|
|
|
// if the segment is not found, do nothing
|
|
|
|
// uses `Clone` since internal SegmentInfo's Binlogs is changed
|
2021-08-19 13:00:12 +08:00
|
|
|
func (s *SegmentsInfo) SetBinlogs(segmentID UniqueID, binlogs []*datapb.FieldBinlog) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = segment.Clone(SetBinlogs(binlogs))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-05 22:36:48 +08:00
|
|
|
// SetFlushTime sets flush time for segment
|
2021-10-06 17:20:07 +08:00
|
|
|
// if the segment is not found, do nothing
|
|
|
|
// uses `ShadowClone` since internal SegmentInfo is not changed
|
2021-08-19 14:08:10 +08:00
|
|
|
func (s *SegmentsInfo) SetFlushTime(segmentID UniqueID, t time.Time) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = segment.ShadowClone(SetFlushTime(t))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-07 18:32:50 +08:00
|
|
|
// AddSegmentBinlogs adds binlogs for segment
|
2021-10-08 17:31:50 +08:00
|
|
|
// if the segment is not found, do nothing
|
|
|
|
// uses `Clone` since internal SegmentInfo's Binlogs is changed
|
2021-12-19 20:00:42 +08:00
|
|
|
func (s *SegmentsInfo) AddSegmentBinlogs(segmentID UniqueID, field2Binlogs map[UniqueID][]*datapb.Binlog) {
|
2021-08-24 15:51:51 +08:00
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = segment.Clone(addSegmentBinlogs(field2Binlogs))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-26 18:06:54 +08:00
|
|
|
// SetIsCompacting sets compaction status for segment
|
2021-11-05 22:25:00 +08:00
|
|
|
func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) {
|
|
|
|
if segment, ok := s.segments[segmentID]; ok {
|
|
|
|
s.segments[segmentID] = segment.ShadowClone(SetIsCompacting(isCompacting))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-07 17:48:00 +08:00
|
|
|
// Clone deep clone the segment info and return a new instance
|
2021-08-19 13:00:12 +08:00
|
|
|
func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
|
|
|
|
info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo)
|
2021-07-12 17:24:25 +08:00
|
|
|
cloned := &SegmentInfo{
|
2021-08-19 14:08:10 +08:00
|
|
|
SegmentInfo: info,
|
|
|
|
currRows: s.currRows,
|
|
|
|
allocations: s.allocations,
|
|
|
|
lastFlushTime: s.lastFlushTime,
|
2022-07-13 10:08:28 +08:00
|
|
|
isCompacting: s.isCompacting,
|
|
|
|
//cannot copy size, since binlog may be changed
|
2022-09-20 20:54:50 +08:00
|
|
|
lastWrittenTime: s.lastWrittenTime,
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(cloned)
|
|
|
|
}
|
|
|
|
return cloned
|
|
|
|
}
|
|
|
|
|
2021-10-07 17:50:14 +08:00
|
|
|
// ShadowClone shadow clone the segment and return a new instance
|
2021-08-19 13:00:12 +08:00
|
|
|
func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
|
2021-07-12 17:24:25 +08:00
|
|
|
cloned := &SegmentInfo{
|
2022-09-20 20:54:50 +08:00
|
|
|
SegmentInfo: s.SegmentInfo,
|
|
|
|
currRows: s.currRows,
|
|
|
|
allocations: s.allocations,
|
|
|
|
lastFlushTime: s.lastFlushTime,
|
|
|
|
isCompacting: s.isCompacting,
|
|
|
|
size: s.size,
|
|
|
|
lastWrittenTime: s.lastWrittenTime,
|
2021-07-07 14:02:01 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(cloned)
|
|
|
|
}
|
|
|
|
return cloned
|
|
|
|
}
|
|
|
|
|
2021-10-07 17:52:17 +08:00
|
|
|
// SegmentInfoOption is the option to set fields in segment info
|
2021-07-12 17:24:25 +08:00
|
|
|
type SegmentInfoOption func(segment *SegmentInfo)
|
2021-07-07 14:02:01 +08:00
|
|
|
|
2021-10-07 17:54:12 +08:00
|
|
|
// SetRowCount is the option to set row count for segment info
|
2021-07-07 14:02:01 +08:00
|
|
|
func SetRowCount(rowCount int64) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.NumOfRows = rowCount
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-07 17:45:29 +08:00
|
|
|
// SetExpireTime is the option to set expire time for segment info
|
2021-07-07 14:02:01 +08:00
|
|
|
func SetExpireTime(expireTs Timestamp) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.LastExpireTime = expireTs
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-07 17:55:53 +08:00
|
|
|
// SetState is the option to set state for segment info
|
2021-07-07 14:02:01 +08:00
|
|
|
func SetState(state commonpb.SegmentState) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.State = state
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-26 18:06:54 +08:00
|
|
|
// SetIsImporting is the option to set import state for segment info.
|
|
|
|
func SetIsImporting(isImporting bool) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.IsImporting = isImporting
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-07 17:57:47 +08:00
|
|
|
// SetDmlPosition is the option to set dml position for segment info
|
2021-08-19 13:00:12 +08:00
|
|
|
func SetDmlPosition(pos *internalpb.MsgPosition) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.DmlPosition = pos
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-07 17:59:31 +08:00
|
|
|
// SetStartPosition is the option to set start position for segment info
|
2021-07-07 14:02:01 +08:00
|
|
|
func SetStartPosition(pos *internalpb.MsgPosition) SegmentInfoOption {
|
2021-07-12 17:24:25 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment.StartPosition = pos
|
|
|
|
}
|
|
|
|
}
|
2021-07-12 17:24:25 +08:00
|
|
|
|
2021-10-09 00:13:21 +08:00
|
|
|
// SetAllocations is the option to set allocations for segment info
|
2021-07-12 17:24:25 +08:00
|
|
|
func SetAllocations(allocations []*Allocation) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.allocations = allocations
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-09 00:15:13 +08:00
|
|
|
// AddAllocation is the option to add allocation info for segment info
|
2021-07-12 17:24:25 +08:00
|
|
|
func AddAllocation(allocation *Allocation) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.allocations = append(segment.allocations, allocation)
|
2021-07-23 21:58:33 +08:00
|
|
|
segment.LastExpireTime = allocation.ExpireTime
|
2021-07-12 17:24:25 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-09 00:16:56 +08:00
|
|
|
// SetCurrentRows is the option to set current row count for segment info
|
2021-07-12 17:24:25 +08:00
|
|
|
func SetCurrentRows(rows int64) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.currRows = rows
|
2022-09-20 20:54:50 +08:00
|
|
|
segment.lastWrittenTime = time.Now()
|
2021-07-12 17:24:25 +08:00
|
|
|
}
|
|
|
|
}
|
2021-08-19 13:00:12 +08:00
|
|
|
|
2021-10-11 21:04:42 +08:00
|
|
|
// SetBinlogs is the option to set binlogs for segment info
|
2021-08-19 13:00:12 +08:00
|
|
|
func SetBinlogs(binlogs []*datapb.FieldBinlog) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.Binlogs = binlogs
|
|
|
|
}
|
|
|
|
}
|
2021-08-19 14:08:10 +08:00
|
|
|
|
2021-10-11 21:06:37 +08:00
|
|
|
// SetFlushTime is the option to set flush time for segment info
|
2021-08-19 14:08:10 +08:00
|
|
|
func SetFlushTime(t time.Time) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.lastFlushTime = t
|
|
|
|
}
|
|
|
|
}
|
2021-08-24 15:51:51 +08:00
|
|
|
|
2021-12-21 09:19:53 +08:00
|
|
|
// SetIsCompacting is the option to set compaction state for segment info
|
2021-11-05 22:25:00 +08:00
|
|
|
func SetIsCompacting(isCompacting bool) SegmentInfoOption {
|
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
segment.isCompacting = isCompacting
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-19 20:00:42 +08:00
|
|
|
func addSegmentBinlogs(field2Binlogs map[UniqueID][]*datapb.Binlog) SegmentInfoOption {
|
2021-08-24 15:51:51 +08:00
|
|
|
return func(segment *SegmentInfo) {
|
|
|
|
for fieldID, binlogPaths := range field2Binlogs {
|
|
|
|
found := false
|
|
|
|
for _, binlog := range segment.Binlogs {
|
|
|
|
if binlog.FieldID != fieldID {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
binlog.Binlogs = append(binlog.Binlogs, binlogPaths...)
|
|
|
|
found = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if !found {
|
|
|
|
// if no field matched
|
|
|
|
segment.Binlogs = append(segment.Binlogs, &datapb.FieldBinlog{
|
|
|
|
FieldID: fieldID,
|
|
|
|
Binlogs: binlogPaths,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-10-20 15:02:36 +08:00
|
|
|
|
2022-06-15 23:14:10 +08:00
|
|
|
func (s *SegmentInfo) getSegmentSize() int64 {
|
|
|
|
if s.size <= 0 {
|
|
|
|
var size int64
|
|
|
|
for _, binlogs := range s.GetBinlogs() {
|
|
|
|
for _, l := range binlogs.GetBinlogs() {
|
|
|
|
size += l.GetLogSize()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, deltaLogs := range s.GetDeltalogs() {
|
|
|
|
for _, l := range deltaLogs.GetBinlogs() {
|
|
|
|
size += l.GetLogSize()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, statsLogs := range s.GetStatslogs() {
|
|
|
|
for _, l := range statsLogs.GetBinlogs() {
|
|
|
|
size += l.GetLogSize()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
s.size = size
|
|
|
|
}
|
|
|
|
return s.size
|
|
|
|
}
|
|
|
|
|
2021-10-20 15:02:36 +08:00
|
|
|
// SegmentInfoSelector is the function type to select SegmentInfo from meta
|
|
|
|
type SegmentInfoSelector func(*SegmentInfo) bool
|