2021-10-25 20:21:18 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 11:35:38 +08:00
// with the License. You may obtain a copy of the License at
//
2021-10-25 20:21:18 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 11:35:38 +08:00
//
2021-10-25 20:21:18 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-10-04 17:16:54 +08:00
2021-10-05 22:33:01 +08:00
// Package datacoord contains core functions in datacoord
2021-06-22 10:42:07 +08:00
package datacoord
2021-01-15 17:09:41 +08:00
import (
2022-08-20 10:24:51 +08:00
"context"
2021-01-15 17:09:41 +08:00
"fmt"
2022-11-17 20:37:10 +08:00
"path"
2021-01-15 17:09:41 +08:00
"sync"
2021-08-19 14:08:10 +08:00
"time"
2021-01-15 17:09:41 +08:00
2022-10-10 20:31:22 +08:00
"github.com/golang/protobuf/proto"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/util/tsoutil"
2023-01-04 19:37:36 +08:00
"github.com/samber/lo"
"go.uber.org/zap"
2022-11-10 22:13:04 +08:00
"golang.org/x/exp/maps"
2022-08-25 15:48:54 +08:00
2023-06-09 01:28:37 +08:00
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
2022-08-20 10:24:51 +08:00
"github.com/milvus-io/milvus/internal/metastore"
2023-01-04 19:37:36 +08:00
"github.com/milvus-io/milvus/internal/metastore/model"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2022-11-17 20:37:10 +08:00
"github.com/milvus-io/milvus/internal/storage"
2023-02-27 10:41:46 +08:00
"github.com/milvus-io/milvus/internal/util/segmentutil"
2023-04-06 19:14:32 +08:00
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
2021-04-09 09:55:04 +08:00
)
2021-06-22 18:24:08 +08:00
type meta struct {
sync . RWMutex
2022-11-17 20:37:10 +08:00
ctx context . Context
catalog metastore . DataCoordCatalog
2023-03-04 23:21:50 +08:00
collections map [ UniqueID ] * collectionInfo // collection id to collection info
segments * SegmentsInfo // segment id to segment info
channelCPs map [ string ] * msgpb . MsgPosition // vChannel -> channel checkpoint/see position
2022-11-17 20:37:10 +08:00
chunkManager storage . ChunkManager
2023-01-04 19:37:36 +08:00
// collectionIndexes records which indexes are on the collection
// collID -> indexID -> index
indexes map [ UniqueID ] map [ UniqueID ] * model . Index
// buildID2Meta records the meta information of the segment
// buildID -> segmentIndex
buildID2SegmentIndex map [ UniqueID ] * model . SegmentIndex
2022-10-10 20:31:22 +08:00
}
2022-11-22 19:21:13 +08:00
// A local cache of segment metric update. Must call commit() to take effect.
type segMetricMutation struct {
stateChange map [ string ] int // segment state -> state change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
}
2022-10-10 20:31:22 +08:00
type collectionInfo struct {
ID int64
Schema * schemapb . CollectionSchema
Partitions [ ] int64
StartPositions [ ] * commonpb . KeyDataPair
Properties map [ string ] string
2023-03-08 18:53:51 +08:00
CreatedAt Timestamp
2021-06-22 18:24:08 +08:00
}
2021-12-09 22:07:10 +08:00
// NewMeta creates meta from provided `kv.TxnKV`
2023-01-06 14:33:36 +08:00
func newMeta ( ctx context . Context , catalog metastore . DataCoordCatalog , chunkManager storage . ChunkManager ) ( * meta , error ) {
2021-01-15 17:09:41 +08:00
mt := & meta {
2023-01-04 19:37:36 +08:00
ctx : ctx ,
2023-01-06 14:33:36 +08:00
catalog : catalog ,
2023-01-04 19:37:36 +08:00
collections : make ( map [ UniqueID ] * collectionInfo ) ,
segments : NewSegmentsInfo ( ) ,
2023-03-04 23:21:50 +08:00
channelCPs : make ( map [ string ] * msgpb . MsgPosition ) ,
2023-01-04 19:37:36 +08:00
chunkManager : chunkManager ,
indexes : make ( map [ UniqueID ] map [ UniqueID ] * model . Index ) ,
buildID2SegmentIndex : make ( map [ UniqueID ] * model . SegmentIndex ) ,
2021-01-15 17:09:41 +08:00
}
err := mt . reloadFromKV ( )
if err != nil {
return nil , err
}
return mt , nil
}
2021-12-09 22:07:10 +08:00
// reloadFromKV loads meta from KV storage
2021-05-25 15:35:37 +08:00
func ( m * meta ) reloadFromKV ( ) error {
2022-11-08 20:13:03 +08:00
record := timerecord . NewTimeRecorder ( "datacoord" )
2022-08-20 10:24:51 +08:00
segments , err := m . catalog . ListSegments ( m . ctx )
2021-01-15 17:09:41 +08:00
if err != nil {
return err
}
2022-03-15 21:51:21 +08:00
metrics . DataCoordNumCollections . WithLabelValues ( ) . Set ( 0 )
metrics . DataCoordNumSegments . WithLabelValues ( metrics . SealedSegmentLabel ) . Set ( 0 )
metrics . DataCoordNumSegments . WithLabelValues ( metrics . GrowingSegmentLabel ) . Set ( 0 )
metrics . DataCoordNumSegments . WithLabelValues ( metrics . FlushedSegmentLabel ) . Set ( 0 )
metrics . DataCoordNumSegments . WithLabelValues ( metrics . FlushingSegmentLabel ) . Set ( 0 )
2022-11-22 19:21:13 +08:00
metrics . DataCoordNumSegments . WithLabelValues ( metrics . DroppedSegmentLabel ) . Set ( 0 )
2022-03-15 21:51:21 +08:00
metrics . DataCoordNumStoredRows . WithLabelValues ( ) . Set ( 0 )
numStoredRows := int64 ( 0 )
2022-08-20 10:24:51 +08:00
for _ , segment := range segments {
m . segments . SetSegment ( segment . ID , NewSegmentInfo ( segment ) )
metrics . DataCoordNumSegments . WithLabelValues ( segment . State . String ( ) ) . Inc ( )
if segment . State == commonpb . SegmentState_Flushed {
numStoredRows += segment . NumOfRows
2023-06-26 17:52:44 +08:00
insertFileNum := 0
for _ , fieldBinlog := range segment . GetBinlogs ( ) {
insertFileNum += len ( fieldBinlog . GetBinlogs ( ) )
}
metrics . FlushedSegmentFileNum . WithLabelValues ( metrics . InsertFileLabel ) . Observe ( float64 ( insertFileNum ) )
statFileNum := 0
for _ , fieldBinlog := range segment . GetStatslogs ( ) {
statFileNum += len ( fieldBinlog . GetBinlogs ( ) )
}
metrics . FlushedSegmentFileNum . WithLabelValues ( metrics . StatFileLabel ) . Observe ( float64 ( statFileNum ) )
deleteFileNum := 0
for _ , filedBinlog := range segment . GetDeltalogs ( ) {
deleteFileNum += len ( filedBinlog . GetBinlogs ( ) )
}
metrics . FlushedSegmentFileNum . WithLabelValues ( metrics . DeleteFileLabel ) . Observe ( float64 ( deleteFileNum ) )
2022-03-15 21:51:21 +08:00
}
2021-01-15 17:09:41 +08:00
}
2022-03-15 21:51:21 +08:00
metrics . DataCoordNumStoredRows . WithLabelValues ( ) . Set ( float64 ( numStoredRows ) )
2022-04-29 15:35:47 +08:00
metrics . DataCoordNumStoredRowsCounter . WithLabelValues ( ) . Add ( float64 ( numStoredRows ) )
2022-11-10 22:13:04 +08:00
channelCPs , err := m . catalog . ListChannelCheckpoint ( m . ctx )
if err != nil {
return err
}
for vChannel , pos := range channelCPs {
2023-02-16 16:46:35 +08:00
// for 2.2.2 issue https://github.com/milvus-io/milvus/issues/22181
pos . ChannelName = vChannel
2022-11-10 22:13:04 +08:00
m . channelCPs [ vChannel ] = pos
}
2023-01-04 19:37:36 +08:00
// load field indexes
fieldIndexes , err := m . catalog . ListIndexes ( m . ctx )
if err != nil {
log . Error ( "DataCoord meta reloadFromKV load field indexes fail" , zap . Error ( err ) )
return err
}
for _ , fieldIndex := range fieldIndexes {
m . updateCollectionIndex ( fieldIndex )
}
segmentIndexes , err := m . catalog . ListSegmentIndexes ( m . ctx )
if err != nil {
log . Error ( "DataCoord meta reloadFromKV load segment indexes fail" , zap . Error ( err ) )
return err
}
for _ , segIdx := range segmentIndexes {
m . updateSegmentIndex ( segIdx )
2023-06-26 17:52:44 +08:00
metrics . FlushedSegmentFileNum . WithLabelValues ( metrics . IndexFileLabel ) . Observe ( float64 ( len ( segIdx . IndexFileKeys ) ) )
2023-01-04 19:37:36 +08:00
}
2023-03-30 18:56:22 +08:00
log . Info ( "DataCoord meta reloadFromKV done" , zap . Duration ( "duration" , record . ElapseSpan ( ) ) )
2021-01-15 17:09:41 +08:00
return nil
}
2021-12-09 22:07:10 +08:00
// AddCollection adds a collection into meta
2021-09-08 11:35:59 +08:00
// Note that collection info is just for caching and will not be set into etcd from datacoord
2022-10-10 20:31:22 +08:00
func ( m * meta ) AddCollection ( collection * collectionInfo ) {
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: add collection" , zap . Int64 ( "collectionID" , collection . ID ) )
2021-05-25 15:35:37 +08:00
m . Lock ( )
defer m . Unlock ( )
m . collections [ collection . ID ] = collection
2022-03-15 21:51:21 +08:00
metrics . DataCoordNumCollections . WithLabelValues ( ) . Set ( float64 ( len ( m . collections ) ) )
2023-07-14 15:56:31 +08:00
log . Info ( "meta update: add collection - complete" , zap . Int64 ( "collectionID" , collection . ID ) )
2021-01-15 17:09:41 +08:00
}
2021-12-15 16:57:23 +08:00
// GetCollection returns collection info with provided collection id from local cache
2022-10-10 20:31:22 +08:00
func ( m * meta ) GetCollection ( collectionID UniqueID ) * collectionInfo {
2021-05-25 15:35:37 +08:00
m . RLock ( )
defer m . RUnlock ( )
collection , ok := m . collections [ collectionID ]
2021-01-19 15:35:40 +08:00
if ! ok {
2021-07-07 14:02:01 +08:00
return nil
2021-01-19 15:35:40 +08:00
}
2021-07-07 14:02:01 +08:00
return collection
2021-01-19 15:35:40 +08:00
}
2021-01-15 17:09:41 +08:00
2022-10-10 20:31:22 +08:00
func ( m * meta ) GetClonedCollectionInfo ( collectionID UniqueID ) * collectionInfo {
m . RLock ( )
defer m . RUnlock ( )
coll , ok := m . collections [ collectionID ]
if ! ok {
return nil
}
clonedProperties := make ( map [ string ] string )
maps . Copy ( clonedProperties , coll . Properties )
cloneColl := & collectionInfo {
ID : coll . ID ,
Schema : proto . Clone ( coll . Schema ) . ( * schemapb . CollectionSchema ) ,
Partitions : coll . Partitions ,
StartPositions : common . CloneKeyDataPairs ( coll . StartPositions ) ,
Properties : clonedProperties ,
}
return cloneColl
}
2022-01-04 17:33:22 +08:00
// chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName
2021-11-05 22:25:00 +08:00
type chanPartSegments struct {
2022-09-30 11:48:55 +08:00
collectionID UniqueID
partitionID UniqueID
channelName string
segments [ ] * SegmentInfo
2021-11-05 22:25:00 +08:00
}
2022-09-30 11:48:55 +08:00
// GetSegmentsChanPart returns segments organized in Channel-Partition dimension with selector applied
2021-11-05 22:25:00 +08:00
func ( m * meta ) GetSegmentsChanPart ( selector SegmentInfoSelector ) [ ] * chanPartSegments {
m . RLock ( )
defer m . RUnlock ( )
mDimEntry := make ( map [ string ] * chanPartSegments )
for _ , segmentInfo := range m . segments . segments {
if ! selector ( segmentInfo ) {
continue
}
dim := fmt . Sprintf ( "%d-%s" , segmentInfo . PartitionID , segmentInfo . InsertChannel )
entry , ok := mDimEntry [ dim ]
if ! ok {
entry = & chanPartSegments {
2022-09-30 11:48:55 +08:00
collectionID : segmentInfo . CollectionID ,
partitionID : segmentInfo . PartitionID ,
channelName : segmentInfo . InsertChannel ,
2021-11-05 22:25:00 +08:00
}
mDimEntry [ dim ] = entry
}
entry . segments = append ( entry . segments , segmentInfo )
}
result := make ( [ ] * chanPartSegments , 0 , len ( mDimEntry ) )
for _ , entry := range mDimEntry {
result = append ( result , entry )
}
return result
}
2021-09-08 11:35:59 +08:00
// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection
2021-07-07 14:02:01 +08:00
func ( m * meta ) GetNumRowsOfCollection ( collectionID UniqueID ) int64 {
2021-05-25 15:35:37 +08:00
m . RLock ( )
defer m . RUnlock ( )
2021-12-14 15:31:07 +08:00
var ret int64
2021-07-07 14:02:01 +08:00
segments := m . segments . GetSegments ( )
for _ , segment := range segments {
2021-11-12 00:22:42 +08:00
if isSegmentHealthy ( segment ) && segment . GetCollectionID ( ) == collectionID {
2021-06-22 18:24:08 +08:00
ret += segment . GetNumOfRows ( )
2021-02-02 14:25:58 +08:00
}
}
2021-07-07 14:02:01 +08:00
return ret
2021-02-02 14:25:58 +08:00
}
2023-04-26 21:52:36 +08:00
// GetCollectionBinlogSize returns the total binlog size and binlog size of collections.
func ( m * meta ) GetCollectionBinlogSize ( ) ( int64 , map [ UniqueID ] int64 ) {
2022-10-12 10:03:22 +08:00
m . RLock ( )
defer m . RUnlock ( )
2023-04-26 21:52:36 +08:00
collectionBinlogSize := make ( map [ UniqueID ] int64 )
2022-10-12 10:03:22 +08:00
segments := m . segments . GetSegments ( )
2023-04-26 21:52:36 +08:00
var total int64
2022-10-12 10:03:22 +08:00
for _ , segment := range segments {
2023-04-26 21:52:36 +08:00
segmentSize := segment . getSegmentSize ( )
2022-11-14 21:03:09 +08:00
if isSegmentHealthy ( segment ) {
2023-04-26 21:52:36 +08:00
total += segmentSize
collectionBinlogSize [ segment . GetCollectionID ( ) ] += segmentSize
2023-05-31 16:45:29 +08:00
metrics . DataCoordStoredBinlogSize . WithLabelValues (
fmt . Sprint ( segment . GetCollectionID ( ) ) , fmt . Sprint ( segment . GetID ( ) ) ) . Set ( float64 ( segmentSize ) )
2022-11-14 21:03:09 +08:00
}
}
2023-04-26 21:52:36 +08:00
return total , collectionBinlogSize
2022-10-12 10:03:22 +08:00
}
2021-09-08 11:35:59 +08:00
// AddSegment records segment info, persisting info into kv store
2021-07-12 17:24:25 +08:00
func ( m * meta ) AddSegment ( segment * SegmentInfo ) error {
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: adding segment" , zap . Int64 ( "segmentID" , segment . GetID ( ) ) )
2021-05-25 15:35:37 +08:00
m . Lock ( )
defer m . Unlock ( )
2022-08-20 10:24:51 +08:00
if err := m . catalog . AddSegment ( m . ctx , segment . SegmentInfo ) ; err != nil {
2022-09-30 11:48:55 +08:00
log . Error ( "meta update: adding segment failed" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segment . GetID ( ) ) ,
2022-09-30 11:48:55 +08:00
zap . Error ( err ) )
2021-01-15 17:09:41 +08:00
return err
}
2022-07-08 13:48:21 +08:00
m . segments . SetSegment ( segment . GetID ( ) , segment )
2022-06-30 17:26:19 +08:00
metrics . DataCoordNumSegments . WithLabelValues ( segment . GetState ( ) . String ( ) ) . Inc ( )
2023-07-14 15:56:31 +08:00
log . Info ( "meta update: adding segment - complete" , zap . Int64 ( "segmentID" , segment . GetID ( ) ) )
2021-01-15 17:09:41 +08:00
return nil
}
2021-09-08 11:35:59 +08:00
// DropSegment remove segment with provided id, etcd persistence also removed
2021-05-25 15:35:37 +08:00
func ( m * meta ) DropSegment ( segmentID UniqueID ) error {
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: dropping segment" , zap . Int64 ( "segmentID" , segmentID ) )
2021-05-25 15:35:37 +08:00
m . Lock ( )
defer m . Unlock ( )
2021-07-07 14:02:01 +08:00
segment := m . segments . GetSegment ( segmentID )
2021-09-08 11:35:59 +08:00
if segment == nil {
2022-09-30 11:48:55 +08:00
log . Warn ( "meta update: dropping segment failed - segment not found" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segmentID ) )
2021-09-08 11:35:59 +08:00
return nil
}
2022-08-20 10:24:51 +08:00
if err := m . catalog . DropSegment ( m . ctx , segment . SegmentInfo ) ; err != nil {
2022-09-30 11:48:55 +08:00
log . Warn ( "meta update: dropping segment failed" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segmentID ) ,
2022-09-30 11:48:55 +08:00
zap . Error ( err ) )
2021-02-02 14:25:58 +08:00
return err
}
2022-11-22 19:21:13 +08:00
metrics . DataCoordNumSegments . WithLabelValues ( segment . GetState ( ) . String ( ) ) . Dec ( )
2021-11-23 11:23:15 +08:00
m . segments . DropSegment ( segmentID )
2022-10-25 19:31:30 +08:00
log . Info ( "meta update: dropping segment - complete" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segmentID ) )
2021-01-19 15:35:40 +08:00
return nil
}
2023-03-03 14:13:49 +08:00
// GetHealthySegment returns segment info with provided id
2021-09-08 11:35:59 +08:00
// if not segment is found, nil will be returned
2023-03-03 14:13:49 +08:00
func ( m * meta ) GetHealthySegment ( segID UniqueID ) * SegmentInfo {
2021-05-25 15:35:37 +08:00
m . RLock ( )
defer m . RUnlock ( )
2021-11-12 00:22:42 +08:00
segment := m . segments . GetSegment ( segID )
if segment != nil && isSegmentHealthy ( segment ) {
return segment
}
return nil
2021-01-15 17:09:41 +08:00
}
2023-03-03 14:13:49 +08:00
// GetSegment returns segment info with provided id
2022-10-15 15:13:24 +08:00
// include the unhealthy segment
2022-09-16 11:32:48 +08:00
// if not segment is found, nil will be returned
2023-03-03 14:13:49 +08:00
func ( m * meta ) GetSegment ( segID UniqueID ) * SegmentInfo {
2022-09-16 11:32:48 +08:00
m . RLock ( )
defer m . RUnlock ( )
return m . segments . GetSegment ( segID )
}
2022-10-15 15:13:24 +08:00
// GetAllSegmentsUnsafe returns all segments
func ( m * meta ) GetAllSegmentsUnsafe ( ) [ ] * SegmentInfo {
2022-06-17 18:24:12 +08:00
m . RLock ( )
defer m . RUnlock ( )
2022-10-15 15:13:24 +08:00
return m . segments . GetSegments ( )
2022-06-17 18:24:12 +08:00
}
2021-09-08 11:35:59 +08:00
// SetState setting segment with provided ID state
2022-09-26 18:06:54 +08:00
func ( m * meta ) SetState ( segmentID UniqueID , targetState commonpb . SegmentState ) error {
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: setting segment state" ,
zap . Int64 ( "segmentID" , segmentID ) ,
2022-09-30 11:48:55 +08:00
zap . Any ( "target state" , targetState ) )
2021-05-25 15:35:37 +08:00
m . Lock ( )
defer m . Unlock ( )
2022-03-15 21:51:21 +08:00
curSegInfo := m . segments . GetSegment ( segmentID )
if curSegInfo == nil {
2022-09-30 11:48:55 +08:00
log . Warn ( "meta update: setting segment state - segment not found" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segmentID ) ,
2022-09-30 11:48:55 +08:00
zap . Any ( "target state" , targetState ) )
2023-01-31 12:41:53 +08:00
// idempotent drop
if targetState == commonpb . SegmentState_Dropped {
return nil
}
return fmt . Errorf ( "segment is not exist with ID = %d" , segmentID )
2022-03-15 21:51:21 +08:00
}
2022-09-26 18:06:54 +08:00
// Persist segment updates first.
clonedSegment := curSegInfo . Clone ( )
2022-11-22 19:21:13 +08:00
metricMutation := & segMetricMutation {
stateChange : make ( map [ string ] int ) ,
}
2022-09-26 18:06:54 +08:00
if clonedSegment != nil && isSegmentHealthy ( clonedSegment ) {
2023-07-21 14:21:05 +08:00
// Update segment state and prepare segment metric update.
updateSegStateAndPrepareMetrics ( clonedSegment , targetState , metricMutation )
2023-07-06 20:46:25 +08:00
if err := m . catalog . AlterSegments ( m . ctx , [ ] * datapb . SegmentInfo { clonedSegment . SegmentInfo } ) ; err != nil {
2023-07-14 15:56:31 +08:00
log . Warn ( "meta update: setting segment state - failed to alter segments" ,
zap . Int64 ( "segmentID" , segmentID ) ,
2022-09-30 11:48:55 +08:00
zap . String ( "target state" , targetState . String ( ) ) ,
2022-09-26 18:06:54 +08:00
zap . Error ( err ) )
return err
}
2022-11-22 19:21:13 +08:00
// Apply segment metric update after successful meta update.
metricMutation . commit ( )
2023-07-21 14:21:05 +08:00
// Update in-memory meta.
m . segments . SetState ( segmentID , targetState )
2022-09-26 18:06:54 +08:00
}
2022-10-25 19:31:30 +08:00
log . Info ( "meta update: setting segment state - complete" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segmentID ) ,
2022-09-30 11:48:55 +08:00
zap . String ( "target state" , targetState . String ( ) ) )
2022-09-26 18:06:54 +08:00
return nil
}
// UnsetIsImporting removes the `isImporting` flag of a segment.
func ( m * meta ) UnsetIsImporting ( segmentID UniqueID ) error {
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: unsetting isImport state of segment" ,
zap . Int64 ( "segmentID" , segmentID ) )
2022-09-26 18:06:54 +08:00
m . Lock ( )
defer m . Unlock ( )
curSegInfo := m . segments . GetSegment ( segmentID )
if curSegInfo == nil {
return fmt . Errorf ( "segment not found %d" , segmentID )
}
// Persist segment updates first.
clonedSegment := curSegInfo . Clone ( )
clonedSegment . IsImporting = false
if isSegmentHealthy ( clonedSegment ) {
2023-07-06 20:46:25 +08:00
if err := m . catalog . AlterSegments ( m . ctx , [ ] * datapb . SegmentInfo { clonedSegment . SegmentInfo } ) ; err != nil {
2023-07-14 15:56:31 +08:00
log . Warn ( "meta update: unsetting isImport state of segment - failed to unset segment isImporting state" ,
zap . Int64 ( "segmentID" , segmentID ) ,
2022-09-26 18:06:54 +08:00
zap . Error ( err ) )
return err
}
}
// Update in-memory meta.
m . segments . SetIsImporting ( segmentID , false )
2022-10-25 19:31:30 +08:00
log . Info ( "meta update: unsetting isImport state of segment - complete" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segmentID ) )
2021-01-22 11:07:07 +08:00
return nil
}
2021-09-08 11:35:59 +08:00
// UpdateFlushSegmentsInfo update segment partial/completed flush info
// `flushed` parameter indicating whether segment is flushed completely or partially
// `binlogs`, `checkpoints` and `statPositions` are persistence data for segment
2021-11-12 00:22:42 +08:00
func ( m * meta ) UpdateFlushSegmentsInfo (
segmentID UniqueID ,
flushed bool ,
dropped bool ,
2022-04-20 14:03:40 +08:00
importing bool ,
2021-12-19 20:00:42 +08:00
binlogs , statslogs , deltalogs [ ] * datapb . FieldBinlog ,
2021-11-12 00:22:42 +08:00
checkpoints [ ] * datapb . CheckPoint ,
startPositions [ ] * datapb . SegmentStartPosition ,
) error {
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: update flush segments info" ,
2022-09-26 18:06:54 +08:00
zap . Int64 ( "segmentId" , segmentID ) ,
2022-06-08 15:08:07 +08:00
zap . Int ( "binlog" , len ( binlogs ) ) ,
2022-09-26 18:06:54 +08:00
zap . Int ( "stats log" , len ( statslogs ) ) ,
zap . Int ( "delta logs" , len ( deltalogs ) ) ,
2022-06-08 15:08:07 +08:00
zap . Bool ( "flushed" , flushed ) ,
zap . Bool ( "dropped" , dropped ) ,
2022-09-26 18:06:54 +08:00
zap . Any ( "check points" , checkpoints ) ,
zap . Any ( "start position" , startPositions ) ,
2022-06-08 15:08:07 +08:00
zap . Bool ( "importing" , importing ) )
2022-09-30 11:48:55 +08:00
m . Lock ( )
defer m . Unlock ( )
2021-08-19 13:00:12 +08:00
segment := m . segments . GetSegment ( segmentID )
2021-11-12 00:22:42 +08:00
if segment == nil || ! isSegmentHealthy ( segment ) {
2022-09-30 11:48:55 +08:00
log . Warn ( "meta update: update flush segments info - segment not found" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segmentID ) ,
2022-09-30 11:48:55 +08:00
zap . Bool ( "segment nil" , segment == nil ) ,
zap . Bool ( "segment unhealthy" , ! isSegmentHealthy ( segment ) ) )
2021-08-19 13:00:12 +08:00
return nil
2021-05-31 18:47:32 +08:00
}
2021-08-19 13:00:12 +08:00
2022-11-22 19:21:13 +08:00
metricMutation := & segMetricMutation {
stateChange : make ( map [ string ] int ) ,
}
2021-10-12 23:46:35 +08:00
clonedSegment := segment . Clone ( )
modSegments := make ( map [ UniqueID ] * SegmentInfo )
2021-06-04 11:45:45 +08:00
if flushed {
2022-11-22 19:21:13 +08:00
// Update segment state and prepare metrics.
updateSegStateAndPrepareMetrics ( clonedSegment , commonpb . SegmentState_Flushing , metricMutation )
2021-10-12 23:46:35 +08:00
modSegments [ segmentID ] = clonedSegment
2021-06-04 11:45:45 +08:00
}
2021-11-12 00:22:42 +08:00
if dropped {
2022-11-22 19:21:13 +08:00
// Update segment state and prepare metrics.
updateSegStateAndPrepareMetrics ( clonedSegment , commonpb . SegmentState_Dropped , metricMutation )
2021-11-16 14:23:21 +08:00
clonedSegment . DroppedAt = uint64 ( time . Now ( ) . UnixNano ( ) )
2021-11-12 00:22:42 +08:00
modSegments [ segmentID ] = clonedSegment
}
2022-06-08 15:08:07 +08:00
// TODO add diff encoding and compression
2021-10-12 23:46:35 +08:00
currBinlogs := clonedSegment . GetBinlogs ( )
2021-08-19 13:00:12 +08:00
var getFieldBinlogs = func ( id UniqueID , binlogs [ ] * datapb . FieldBinlog ) * datapb . FieldBinlog {
for _ , binlog := range binlogs {
if id == binlog . GetFieldID ( ) {
return binlog
}
}
return nil
}
2021-10-19 14:32:41 +08:00
// binlogs
2021-08-19 13:00:12 +08:00
for _ , tBinlogs := range binlogs {
fieldBinlogs := getFieldBinlogs ( tBinlogs . GetFieldID ( ) , currBinlogs )
if fieldBinlogs == nil {
currBinlogs = append ( currBinlogs , tBinlogs )
} else {
fieldBinlogs . Binlogs = append ( fieldBinlogs . Binlogs , tBinlogs . Binlogs ... )
}
}
2021-10-12 23:46:35 +08:00
clonedSegment . Binlogs = currBinlogs
2022-10-31 17:41:34 +08:00
// statlogs
currStatsLogs := clonedSegment . GetStatslogs ( )
for _ , tStatsLogs := range statslogs {
fieldStatsLog := getFieldBinlogs ( tStatsLogs . GetFieldID ( ) , currStatsLogs )
if fieldStatsLog == nil {
currStatsLogs = append ( currStatsLogs , tStatsLogs )
} else {
fieldStatsLog . Binlogs = append ( fieldStatsLog . Binlogs , tStatsLogs . Binlogs ... )
}
2021-10-19 14:32:41 +08:00
}
2022-10-31 17:41:34 +08:00
clonedSegment . Statslogs = currStatsLogs
2021-10-19 14:32:41 +08:00
// deltalogs
2021-12-19 20:00:42 +08:00
currDeltaLogs := clonedSegment . GetDeltalogs ( )
for _ , tDeltaLogs := range deltalogs {
fieldDeltaLogs := getFieldBinlogs ( tDeltaLogs . GetFieldID ( ) , currDeltaLogs )
if fieldDeltaLogs == nil {
currDeltaLogs = append ( currDeltaLogs , tDeltaLogs )
} else {
fieldDeltaLogs . Binlogs = append ( fieldDeltaLogs . Binlogs , tDeltaLogs . Binlogs ... )
}
}
clonedSegment . Deltalogs = currDeltaLogs
2021-10-12 23:46:35 +08:00
modSegments [ segmentID ] = clonedSegment
2021-10-23 13:31:09 +08:00
var getClonedSegment = func ( segmentID UniqueID ) * SegmentInfo {
if s , ok := modSegments [ segmentID ] ; ok {
return s
}
2021-11-12 00:22:42 +08:00
if s := m . segments . GetSegment ( segmentID ) ; s != nil && isSegmentHealthy ( s ) {
2021-10-23 13:31:09 +08:00
return s . Clone ( )
}
return nil
}
2021-06-15 11:06:42 +08:00
for _ , pos := range startPositions {
2021-08-19 13:00:12 +08:00
if len ( pos . GetStartPosition ( ) . GetMsgID ( ) ) == 0 {
2021-06-15 11:06:42 +08:00
continue
}
2021-10-23 13:31:09 +08:00
s := getClonedSegment ( pos . GetSegmentID ( ) )
2021-10-12 23:46:35 +08:00
if s == nil {
continue
}
s . StartPosition = pos . GetStartPosition ( )
modSegments [ pos . GetSegmentID ( ) ] = s
2021-06-15 11:06:42 +08:00
}
2021-10-12 23:46:35 +08:00
2023-02-27 10:41:46 +08:00
if importing {
s := clonedSegment
count := segmentutil . CalcRowCountFromBinLog ( s . SegmentInfo )
if count != segment . currRows {
log . Info ( "check point reported inconsistent with bin log row count" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segment . GetID ( ) ) ,
2023-02-27 10:41:46 +08:00
zap . Int64 ( "current rows (wrong)" , segment . currRows ) ,
zap . Int64 ( "segment bin log row count (correct)" , count ) )
2021-06-04 11:45:45 +08:00
}
2023-02-27 10:41:46 +08:00
s . NumOfRows = count
modSegments [ segmentID ] = s
} else {
for _ , cp := range checkpoints {
if cp . SegmentID != segmentID {
// Don't think this is gonna to happen, ignore for now.
log . Warn ( "checkpoint in segment is not same as flush segment to update, igreo" , zap . Int64 ( "current" , segmentID ) , zap . Int64 ( "checkpoint segment" , cp . SegmentID ) )
continue
}
s := clonedSegment
if s . DmlPosition != nil && s . DmlPosition . Timestamp >= cp . Position . Timestamp {
log . Warn ( "checkpoint in segment is larger than reported" , zap . Any ( "current" , s . GetDmlPosition ( ) ) , zap . Any ( "reported" , cp . GetPosition ( ) ) )
// segment position in etcd is larger than checkpoint, then dont change it
continue
}
2021-10-12 23:46:35 +08:00
2023-02-27 10:41:46 +08:00
count := segmentutil . CalcRowCountFromBinLog ( s . SegmentInfo )
// count should smaller than or equal to cp reported
if count != cp . NumOfRows {
log . Info ( "check point reported inconsistent with bin log row count" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segment . GetID ( ) ) ,
2023-02-27 10:41:46 +08:00
zap . Int64 ( "check point (wrong)" , cp . NumOfRows ) ,
zap . Int64 ( "segment bin log row count (correct)" , count ) )
}
s . NumOfRows = count
s . DmlPosition = cp . GetPosition ( )
modSegments [ cp . GetSegmentID ( ) ] = s
}
2021-06-15 11:06:42 +08:00
}
2022-08-20 10:24:51 +08:00
segments := make ( [ ] * datapb . SegmentInfo , 0 , len ( modSegments ) )
for _ , seg := range modSegments {
segments = append ( segments , seg . SegmentInfo )
2021-10-12 23:46:35 +08:00
}
2023-07-06 18:20:26 +08:00
if err := m . catalog . AlterSegments ( m . ctx , segments ,
metastore . BinlogsIncrement {
2023-07-19 15:28:57 +08:00
Segment : clonedSegment . SegmentInfo ,
2023-07-06 18:20:26 +08:00
} ) ; err != nil {
2022-09-30 11:48:55 +08:00
log . Error ( "meta update: update flush segments info - failed to store flush segment info into Etcd" ,
zap . Error ( err ) )
2021-05-25 15:35:37 +08:00
return err
}
2022-11-22 19:21:13 +08:00
// Apply metric mutation after a successful meta update.
metricMutation . commit ( )
2021-10-12 23:46:35 +08:00
// update memory status
for id , s := range modSegments {
m . segments . SetSegment ( id , s )
}
2022-10-25 19:31:30 +08:00
log . Info ( "meta update: update flush segments info - update flush segments info successfully" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "segmentID" , segmentID ) )
2021-05-25 15:35:37 +08:00
return nil
}
2021-11-29 22:35:41 +08:00
// UpdateDropChannelSegmentInfo updates segment checkpoints and binlogs before drop
// reusing segment info to pass segment id, binlogs, statslog, deltalog, start position and checkpoint
func ( m * meta ) UpdateDropChannelSegmentInfo ( channel string , segments [ ] * SegmentInfo ) error {
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: update drop channel segment info" ,
2022-09-30 11:48:55 +08:00
zap . String ( "channel" , channel ) )
2021-11-29 22:35:41 +08:00
m . Lock ( )
defer m . Unlock ( )
2022-11-22 19:21:13 +08:00
// Prepare segment metric mutation.
metricMutation := & segMetricMutation {
stateChange : make ( map [ string ] int ) ,
}
2021-11-29 22:35:41 +08:00
modSegments := make ( map [ UniqueID ] * SegmentInfo )
2022-08-20 10:24:51 +08:00
// save new segments flushed from buffer data
2021-11-29 22:35:41 +08:00
for _ , seg2Drop := range segments {
2022-11-22 19:21:13 +08:00
var segment * SegmentInfo
segment , metricMutation = m . mergeDropSegment ( seg2Drop )
2021-11-29 22:35:41 +08:00
if segment != nil {
modSegments [ seg2Drop . GetID ( ) ] = segment
}
}
2022-08-20 10:24:51 +08:00
// set existed segments of channel to Dropped
2021-12-03 18:55:34 +08:00
for _ , seg := range m . segments . segments {
if seg . InsertChannel != channel {
continue
}
_ , ok := modSegments [ seg . ID ]
// seg inf mod segments are all in dropped state
if ! ok {
clonedSeg := seg . Clone ( )
2022-11-22 19:21:13 +08:00
updateSegStateAndPrepareMetrics ( clonedSeg , commonpb . SegmentState_Dropped , metricMutation )
2021-12-03 18:55:34 +08:00
modSegments [ seg . ID ] = clonedSeg
}
}
2022-03-15 21:51:21 +08:00
err := m . batchSaveDropSegments ( channel , modSegments )
2022-09-30 11:48:55 +08:00
if err != nil {
2023-07-14 15:56:31 +08:00
log . Warn ( "meta update: update drop channel segment info failed" ,
2022-09-30 11:48:55 +08:00
zap . String ( "channel" , channel ) ,
zap . Error ( err ) )
} else {
2022-10-25 19:31:30 +08:00
log . Info ( "meta update: update drop channel segment info - complete" ,
2022-09-30 11:48:55 +08:00
zap . String ( "channel" , channel ) )
2023-02-03 19:33:52 +08:00
// Apply segment metric mutation on successful meta update.
metricMutation . commit ( )
2022-09-30 11:48:55 +08:00
}
2022-03-15 21:51:21 +08:00
return err
2021-11-29 22:35:41 +08:00
}
// mergeDropSegment merges drop segment information with meta segments
2022-11-22 19:21:13 +08:00
func ( m * meta ) mergeDropSegment ( seg2Drop * SegmentInfo ) ( * SegmentInfo , * segMetricMutation ) {
2022-11-24 17:37:13 +08:00
metricMutation := & segMetricMutation {
stateChange : make ( map [ string ] int ) ,
}
2021-11-29 22:35:41 +08:00
segment := m . segments . GetSegment ( seg2Drop . ID )
// healthy check makes sure the Idempotence
if segment == nil || ! isSegmentHealthy ( segment ) {
log . Warn ( "UpdateDropChannel skipping nil or unhealthy" , zap . Bool ( "is nil" , segment == nil ) ,
zap . Bool ( "isHealthy" , isSegmentHealthy ( segment ) ) )
2022-11-24 17:37:13 +08:00
return nil , metricMutation
2021-11-29 22:35:41 +08:00
}
clonedSegment := segment . Clone ( )
2022-11-22 19:21:13 +08:00
updateSegStateAndPrepareMetrics ( clonedSegment , commonpb . SegmentState_Dropped , metricMutation )
2021-11-29 22:35:41 +08:00
currBinlogs := clonedSegment . GetBinlogs ( )
var getFieldBinlogs = func ( id UniqueID , binlogs [ ] * datapb . FieldBinlog ) * datapb . FieldBinlog {
for _ , binlog := range binlogs {
if id == binlog . GetFieldID ( ) {
return binlog
}
}
return nil
}
// binlogs
for _ , tBinlogs := range seg2Drop . GetBinlogs ( ) {
fieldBinlogs := getFieldBinlogs ( tBinlogs . GetFieldID ( ) , currBinlogs )
if fieldBinlogs == nil {
currBinlogs = append ( currBinlogs , tBinlogs )
} else {
fieldBinlogs . Binlogs = append ( fieldBinlogs . Binlogs , tBinlogs . Binlogs ... )
}
}
clonedSegment . Binlogs = currBinlogs
// statlogs
currStatsLogs := clonedSegment . GetStatslogs ( )
for _ , tStatsLogs := range seg2Drop . GetStatslogs ( ) {
fieldStatsLog := getFieldBinlogs ( tStatsLogs . GetFieldID ( ) , currStatsLogs )
if fieldStatsLog == nil {
currStatsLogs = append ( currStatsLogs , tStatsLogs )
} else {
fieldStatsLog . Binlogs = append ( fieldStatsLog . Binlogs , tStatsLogs . Binlogs ... )
}
}
clonedSegment . Statslogs = currStatsLogs
// deltalogs
clonedSegment . Deltalogs = append ( clonedSegment . Deltalogs , seg2Drop . GetDeltalogs ( ) ... )
// start position
if seg2Drop . GetStartPosition ( ) != nil {
clonedSegment . StartPosition = seg2Drop . GetStartPosition ( )
}
// checkpoint
if seg2Drop . GetDmlPosition ( ) != nil {
clonedSegment . DmlPosition = seg2Drop . GetDmlPosition ( )
}
clonedSegment . currRows = seg2Drop . currRows
2022-11-22 19:21:13 +08:00
clonedSegment . NumOfRows = seg2Drop . currRows
return clonedSegment , metricMutation
2021-11-29 22:35:41 +08:00
}
// batchSaveDropSegments saves drop segments info with channel removal flag
// since the channel unwatching operation is not atomic here
// ** the removal flag is always with last batch
// ** the last batch must contains at least one segment
2023-03-03 14:13:49 +08:00
// 1. when failure occurs between batches, failover mechanism will continue with the earliest checkpoint of this channel
2022-11-03 12:15:35 +08:00
// since the flag is not marked so DataNode can re-consume the drop collection msg
// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel
2021-11-29 22:35:41 +08:00
func ( m * meta ) batchSaveDropSegments ( channel string , modSegments map [ int64 ] * SegmentInfo ) error {
2022-11-03 12:15:35 +08:00
var modSegIDs [ ] int64
for k := range modSegments {
modSegIDs = append ( modSegIDs , k )
}
log . Info ( "meta update: batch save drop segments" ,
zap . Int64s ( "drop segments" , modSegIDs ) )
2022-08-25 16:48:53 +08:00
segments := make ( [ ] * datapb . SegmentInfo , 0 )
for _ , seg := range modSegments {
segments = append ( segments , seg . SegmentInfo )
}
err := m . catalog . SaveDroppedSegmentsInBatch ( m . ctx , segments )
2021-11-29 22:35:41 +08:00
if err != nil {
return err
}
2022-08-25 16:48:53 +08:00
if err = m . catalog . MarkChannelDeleted ( m . ctx , channel ) ; err != nil {
return err
2022-08-20 10:24:51 +08:00
}
2021-11-29 22:35:41 +08:00
// update memory info
2022-08-25 16:48:53 +08:00
for id , segment := range modSegments {
m . segments . SetSegment ( id , segment )
2021-11-29 22:35:41 +08:00
}
2022-08-20 10:24:51 +08:00
return nil
2021-12-02 16:39:33 +08:00
}
2021-09-08 11:35:59 +08:00
// GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh`
2021-07-12 17:24:25 +08:00
func ( m * meta ) GetSegmentsByChannel ( dmlCh string ) [ ] * SegmentInfo {
2021-06-23 12:20:06 +08:00
m . RLock ( )
defer m . RUnlock ( )
2021-07-12 17:24:25 +08:00
infos := make ( [ ] * SegmentInfo , 0 )
2021-07-07 14:02:01 +08:00
segments := m . segments . GetSegments ( )
for _ , segment := range segments {
2021-11-12 00:22:42 +08:00
if ! isSegmentHealthy ( segment ) || segment . InsertChannel != dmlCh {
2021-05-31 18:47:32 +08:00
continue
}
2021-07-07 14:02:01 +08:00
infos = append ( infos , segment )
2021-05-31 18:47:32 +08:00
}
return infos
}
2021-11-05 22:25:00 +08:00
// GetSegmentsOfCollection get all segments of collection
func ( m * meta ) GetSegmentsOfCollection ( collectionID UniqueID ) [ ] * SegmentInfo {
m . RLock ( )
defer m . RUnlock ( )
ret := make ( [ ] * SegmentInfo , 0 )
segments := m . segments . GetSegments ( )
2021-11-08 21:45:00 +08:00
for _ , segment := range segments {
2021-11-12 00:22:42 +08:00
if isSegmentHealthy ( segment ) && segment . GetCollectionID ( ) == collectionID {
2021-11-08 21:45:00 +08:00
ret = append ( ret , segment )
}
}
2021-11-05 22:25:00 +08:00
return ret
}
// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID`
func ( m * meta ) GetSegmentsIDOfCollection ( collectionID UniqueID ) [ ] UniqueID {
2021-05-25 15:35:37 +08:00
m . RLock ( )
defer m . RUnlock ( )
2021-01-19 15:35:40 +08:00
ret := make ( [ ] UniqueID , 0 )
2021-07-07 14:02:01 +08:00
segments := m . segments . GetSegments ( )
2021-11-12 00:22:42 +08:00
for _ , segment := range segments {
if isSegmentHealthy ( segment ) && segment . CollectionID == collectionID {
ret = append ( ret , segment . ID )
2021-01-19 15:35:40 +08:00
}
}
return ret
}
2022-09-16 11:32:48 +08:00
// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID`
func ( m * meta ) GetSegmentsIDOfCollectionWithDropped ( collectionID UniqueID ) [ ] UniqueID {
m . RLock ( )
defer m . RUnlock ( )
ret := make ( [ ] UniqueID , 0 )
segments := m . segments . GetSegments ( )
for _ , segment := range segments {
if segment != nil &&
segment . GetState ( ) != commonpb . SegmentState_SegmentStateNone &&
segment . GetState ( ) != commonpb . SegmentState_NotExist &&
segment . CollectionID == collectionID {
ret = append ( ret , segment . ID )
}
}
return ret
}
2021-11-05 22:25:00 +08:00
// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID`
func ( m * meta ) GetSegmentsIDOfPartition ( collectionID , partitionID UniqueID ) [ ] UniqueID {
2021-05-25 15:35:37 +08:00
m . RLock ( )
defer m . RUnlock ( )
2021-01-19 15:35:40 +08:00
ret := make ( [ ] UniqueID , 0 )
2021-07-07 14:02:01 +08:00
segments := m . segments . GetSegments ( )
2021-11-12 00:22:42 +08:00
for _ , segment := range segments {
if isSegmentHealthy ( segment ) && segment . CollectionID == collectionID && segment . PartitionID == partitionID {
ret = append ( ret , segment . ID )
2021-01-19 15:35:40 +08:00
}
}
return ret
}
2022-09-16 11:32:48 +08:00
// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID`
func ( m * meta ) GetSegmentsIDOfPartitionWithDropped ( collectionID , partitionID UniqueID ) [ ] UniqueID {
m . RLock ( )
defer m . RUnlock ( )
ret := make ( [ ] UniqueID , 0 )
segments := m . segments . GetSegments ( )
for _ , segment := range segments {
if segment != nil &&
segment . GetState ( ) != commonpb . SegmentState_SegmentStateNone &&
segment . GetState ( ) != commonpb . SegmentState_NotExist &&
segment . CollectionID == collectionID &&
segment . PartitionID == partitionID {
ret = append ( ret , segment . ID )
}
}
return ret
}
2021-09-08 11:35:59 +08:00
// GetNumRowsOfPartition returns row count of segments belongs to provided collection & partition
2021-07-07 14:02:01 +08:00
func ( m * meta ) GetNumRowsOfPartition ( collectionID UniqueID , partitionID UniqueID ) int64 {
2021-05-25 15:35:37 +08:00
m . RLock ( )
defer m . RUnlock ( )
2021-12-14 15:31:07 +08:00
var ret int64
2021-07-07 14:02:01 +08:00
segments := m . segments . GetSegments ( )
2021-11-12 00:22:42 +08:00
for _ , segment := range segments {
if isSegmentHealthy ( segment ) && segment . CollectionID == collectionID && segment . PartitionID == partitionID {
ret += segment . NumOfRows
2021-02-02 14:25:58 +08:00
}
}
2021-07-07 14:02:01 +08:00
return ret
2021-02-02 14:25:58 +08:00
}
2021-09-08 11:35:59 +08:00
// GetUnFlushedSegments get all segments which state is not `Flushing` nor `Flushed`
2021-07-12 17:24:25 +08:00
func ( m * meta ) GetUnFlushedSegments ( ) [ ] * SegmentInfo {
2021-05-25 15:35:37 +08:00
m . RLock ( )
defer m . RUnlock ( )
2021-07-12 17:24:25 +08:00
ret := make ( [ ] * SegmentInfo , 0 )
2021-07-07 14:02:01 +08:00
segments := m . segments . GetSegments ( )
2021-11-12 00:22:42 +08:00
for _ , segment := range segments {
if segment . State == commonpb . SegmentState_Growing || segment . State == commonpb . SegmentState_Sealed {
ret = append ( ret , segment )
2021-05-28 09:55:21 +08:00
}
}
2021-07-07 14:02:01 +08:00
return ret
2021-05-28 09:55:21 +08:00
}
2021-09-08 11:35:59 +08:00
// GetFlushingSegments get all segments which state is `Flushing`
2021-07-12 17:24:25 +08:00
func ( m * meta ) GetFlushingSegments ( ) [ ] * SegmentInfo {
2021-05-28 09:55:21 +08:00
m . RLock ( )
defer m . RUnlock ( )
2021-07-12 17:24:25 +08:00
ret := make ( [ ] * SegmentInfo , 0 )
2021-07-07 14:02:01 +08:00
segments := m . segments . GetSegments ( )
for _ , info := range segments {
2021-05-28 09:55:21 +08:00
if info . State == commonpb . SegmentState_Flushing {
2021-07-07 14:02:01 +08:00
ret = append ( ret , info )
2021-05-21 18:30:41 +08:00
}
}
2021-07-07 14:02:01 +08:00
return ret
2021-05-21 18:30:41 +08:00
}
2021-10-20 15:02:36 +08:00
// SelectSegments select segments with selector
func ( m * meta ) SelectSegments ( selector SegmentInfoSelector ) [ ] * SegmentInfo {
m . RLock ( )
defer m . RUnlock ( )
var ret [ ] * SegmentInfo
segments := m . segments . GetSegments ( )
for _ , info := range segments {
if selector ( info ) {
ret = append ( ret , info )
}
}
return ret
}
2021-09-08 11:35:59 +08:00
// AddAllocation add allocation in segment
2021-07-12 17:24:25 +08:00
func ( m * meta ) AddAllocation ( segmentID UniqueID , allocation * Allocation ) error {
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: add allocation" ,
2022-09-30 11:48:55 +08:00
zap . Int64 ( "segmentID" , segmentID ) ,
zap . Any ( "allocation" , allocation ) )
2021-07-12 17:24:25 +08:00
m . Lock ( )
defer m . Unlock ( )
2022-09-26 18:06:54 +08:00
curSegInfo := m . segments . GetSegment ( segmentID )
if curSegInfo == nil {
// TODO: Error handling.
2023-07-24 14:11:07 +08:00
log . Warn ( "meta update: add allocation failed - segment not found" , zap . Int64 ( "segmentID" , segmentID ) )
2022-09-26 18:06:54 +08:00
return nil
}
2023-07-24 14:11:07 +08:00
// As we use global segment lastExpire to guarantee data correctness after restart
// there is no need to persist allocation to meta store, only update allocation in-memory meta.
2022-09-26 18:06:54 +08:00
m . segments . AddAllocation ( segmentID , allocation )
2023-07-24 14:11:07 +08:00
log . Info ( "meta update: add allocation - complete" , zap . Int64 ( "segmentID" , segmentID ) )
2021-07-12 17:24:25 +08:00
return nil
}
2021-09-08 11:35:59 +08:00
// SetAllocations set Segment allocations, will overwrite ALL original allocations
// Note that allocations is not persisted in KV store
2021-07-12 17:24:25 +08:00
func ( m * meta ) SetAllocations ( segmentID UniqueID , allocations [ ] * Allocation ) {
m . Lock ( )
defer m . Unlock ( )
m . segments . SetAllocations ( segmentID , allocations )
}
2021-09-08 11:35:59 +08:00
// SetCurrentRows set current row count for segment with provided `segmentID`
// Note that currRows is not persisted in KV store
2021-07-12 17:24:25 +08:00
func ( m * meta ) SetCurrentRows ( segmentID UniqueID , rows int64 ) {
m . Lock ( )
defer m . Unlock ( )
m . segments . SetCurrentRows ( segmentID , rows )
}
2023-07-24 14:11:07 +08:00
// SetLastExpire set lastExpire time for segment
// Note that last is not necessary to store in KV meta
func ( m * meta ) SetLastExpire ( segmentID UniqueID , lastExpire uint64 ) {
m . Lock ( )
defer m . Unlock ( )
clonedSegment := m . segments . GetSegment ( segmentID ) . Clone ( )
clonedSegment . LastExpireTime = lastExpire
m . segments . SetSegment ( segmentID , clonedSegment )
}
2021-09-08 11:35:59 +08:00
// SetLastFlushTime set LastFlushTime for segment with provided `segmentID`
// Note that lastFlushTime is not persisted in KV store
2021-08-19 14:08:10 +08:00
func ( m * meta ) SetLastFlushTime ( segmentID UniqueID , t time . Time ) {
m . Lock ( )
defer m . Unlock ( )
m . segments . SetFlushTime ( segmentID , t )
}
2021-11-05 22:25:00 +08:00
// SetSegmentCompacting sets compaction state for segment
func ( m * meta ) SetSegmentCompacting ( segmentID UniqueID , compacting bool ) {
m . Lock ( )
defer m . Unlock ( )
m . segments . SetIsCompacting ( segmentID , compacting )
}
2022-11-03 12:15:35 +08:00
// PrepareCompleteCompactionMutation returns
2022-09-27 16:02:53 +08:00
// - the segment info of compactedFrom segments before compaction to revert
// - the segment info of compactedFrom segments after compaction to alter
// - the segment info of compactedTo segment after compaction to add
// The compactedTo segment could contain 0 numRows
2022-11-22 19:21:13 +08:00
func ( m * meta ) PrepareCompleteCompactionMutation ( compactionLogs [ ] * datapb . CompactionSegmentBinlogs ,
2022-11-24 10:19:12 +08:00
result * datapb . CompactionResult ) ( [ ] * SegmentInfo , [ ] * SegmentInfo , * SegmentInfo , * segMetricMutation , error ) {
2022-11-03 12:15:35 +08:00
log . Info ( "meta update: prepare for complete compaction mutation" )
2021-11-05 22:25:00 +08:00
m . Lock ( )
defer m . Unlock ( )
2022-09-27 16:02:53 +08:00
var (
2022-11-24 10:19:12 +08:00
oldSegments = make ( [ ] * SegmentInfo , 0 , len ( compactionLogs ) )
2022-09-27 16:02:53 +08:00
modSegments = make ( [ ] * SegmentInfo , 0 , len ( compactionLogs ) )
)
2022-11-22 19:21:13 +08:00
metricMutation := & segMetricMutation {
stateChange : make ( map [ string ] int ) ,
}
2021-11-05 22:25:00 +08:00
for _ , cl := range compactionLogs {
if segment := m . segments . GetSegment ( cl . GetSegmentID ( ) ) ; segment != nil {
2022-11-24 10:19:12 +08:00
oldSegments = append ( oldSegments , segment . Clone ( ) )
2022-09-27 16:02:53 +08:00
2021-11-05 22:25:00 +08:00
cloned := segment . Clone ( )
2022-11-22 19:21:13 +08:00
updateSegStateAndPrepareMetrics ( cloned , commonpb . SegmentState_Dropped , metricMutation )
2021-11-22 20:11:14 +08:00
cloned . DroppedAt = uint64 ( time . Now ( ) . UnixNano ( ) )
2022-09-27 16:02:53 +08:00
modSegments = append ( modSegments , cloned )
2021-11-05 22:25:00 +08:00
}
}
2023-03-04 23:21:50 +08:00
var startPosition , dmlPosition * msgpb . MsgPosition
2022-09-27 16:02:53 +08:00
for _ , s := range modSegments {
2022-07-20 17:56:30 +08:00
if dmlPosition == nil ||
s . GetDmlPosition ( ) != nil && s . GetDmlPosition ( ) . GetTimestamp ( ) < dmlPosition . GetTimestamp ( ) {
2021-11-05 22:25:00 +08:00
dmlPosition = s . GetDmlPosition ( )
}
2021-11-25 17:19:17 +08:00
2022-07-20 17:56:30 +08:00
if startPosition == nil ||
s . GetStartPosition ( ) != nil && s . GetStartPosition ( ) . GetTimestamp ( ) < startPosition . GetTimestamp ( ) {
2021-11-25 17:19:17 +08:00
startPosition = s . GetStartPosition ( )
}
2021-11-05 22:25:00 +08:00
}
// find new added delta logs when executing compaction
2021-12-19 20:00:42 +08:00
var originDeltalogs [ ] * datapb . FieldBinlog
2022-09-27 16:02:53 +08:00
for _ , s := range modSegments {
2021-11-05 22:25:00 +08:00
originDeltalogs = append ( originDeltalogs , s . GetDeltalogs ( ) ... )
}
2021-12-19 20:00:42 +08:00
var deletedDeltalogs [ ] * datapb . FieldBinlog
2021-11-05 22:25:00 +08:00
for _ , l := range compactionLogs {
deletedDeltalogs = append ( deletedDeltalogs , l . GetDeltalogs ( ) ... )
}
newAddedDeltalogs := m . updateDeltalogs ( originDeltalogs , deletedDeltalogs , nil )
2022-11-17 20:37:10 +08:00
copiedDeltalogs , err := m . copyDeltaFiles ( newAddedDeltalogs , modSegments [ 0 ] . CollectionID , modSegments [ 0 ] . PartitionID , result . GetSegmentID ( ) )
if err != nil {
2022-11-22 19:21:13 +08:00
return nil , nil , nil , nil , err
2022-11-17 20:37:10 +08:00
}
deltalogs := append ( result . GetDeltalogs ( ) , copiedDeltalogs ... )
2021-11-05 22:25:00 +08:00
2022-09-27 16:02:53 +08:00
compactionFrom := make ( [ ] UniqueID , 0 , len ( modSegments ) )
for _ , s := range modSegments {
2021-11-05 22:25:00 +08:00
compactionFrom = append ( compactionFrom , s . GetID ( ) )
}
2022-06-15 19:58:10 +08:00
segmentInfo := & datapb . SegmentInfo {
ID : result . GetSegmentID ( ) ,
2022-09-27 16:02:53 +08:00
CollectionID : modSegments [ 0 ] . CollectionID ,
PartitionID : modSegments [ 0 ] . PartitionID ,
InsertChannel : modSegments [ 0 ] . InsertChannel ,
2022-06-15 19:58:10 +08:00
NumOfRows : result . NumOfRows ,
State : commonpb . SegmentState_Flushing ,
2022-09-27 16:02:53 +08:00
MaxRowNum : modSegments [ 0 ] . MaxRowNum ,
2022-06-15 19:58:10 +08:00
Binlogs : result . GetInsertLogs ( ) ,
Statslogs : result . GetField2StatslogPaths ( ) ,
Deltalogs : deltalogs ,
StartPosition : startPosition ,
DmlPosition : dmlPosition ,
CreatedByCompaction : true ,
CompactionFrom : compactionFrom ,
}
segment := NewSegmentInfo ( segmentInfo )
2022-11-22 19:21:13 +08:00
metricMutation . addNewSeg ( segment . GetState ( ) , segment . GetNumOfRows ( ) )
2022-11-03 12:15:35 +08:00
log . Info ( "meta update: prepare for complete compaction mutation - complete" ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "collectionID" , segment . GetCollectionID ( ) ) ,
zap . Int64 ( "partitionID" , segment . GetPartitionID ( ) ) ,
2022-11-03 12:15:35 +08:00
zap . Int64 ( "new segment ID" , segment . GetID ( ) ) ,
zap . Int64 ( "new segment num of rows" , segment . GetNumOfRows ( ) ) ,
zap . Any ( "compacted from" , segment . GetCompactionFrom ( ) ) )
2022-07-04 11:10:20 +08:00
2022-11-22 19:21:13 +08:00
return oldSegments , modSegments , segment , metricMutation , nil
2022-11-17 20:37:10 +08:00
}
func ( m * meta ) copyDeltaFiles ( binlogs [ ] * datapb . FieldBinlog , collectionID , partitionID , targetSegmentID int64 ) ( [ ] * datapb . FieldBinlog , error ) {
ret := make ( [ ] * datapb . FieldBinlog , 0 , len ( binlogs ) )
for _ , fieldBinlog := range binlogs {
fieldBinlog = proto . Clone ( fieldBinlog ) . ( * datapb . FieldBinlog )
for _ , binlog := range fieldBinlog . Binlogs {
blobKey := metautil . JoinIDPath ( collectionID , partitionID , targetSegmentID , binlog . LogID )
blobPath := path . Join ( m . chunkManager . RootPath ( ) , common . SegmentDeltaLogPath , blobKey )
blob , err := m . chunkManager . Read ( m . ctx , binlog . LogPath )
if err != nil {
return nil , err
}
err = m . chunkManager . Write ( m . ctx , blobPath , blob )
if err != nil {
return nil , err
}
binlog . LogPath = blobPath
}
ret = append ( ret , fieldBinlog )
}
return ret , nil
2022-09-27 16:02:53 +08:00
}
2021-11-05 22:25:00 +08:00
2023-03-17 17:27:56 +08:00
func ( m * meta ) alterMetaStoreAfterCompaction ( segmentCompactTo * SegmentInfo , segmentsCompactFrom [ ] * SegmentInfo ) error {
2023-07-13 14:08:29 +08:00
modInfos := make ( [ ] * datapb . SegmentInfo , 0 , len ( segmentsCompactFrom ) )
for _ , segment := range segmentsCompactFrom {
modInfos = append ( modInfos , segment . SegmentInfo )
2023-03-17 17:27:56 +08:00
}
2023-07-13 14:08:29 +08:00
2023-03-17 17:27:56 +08:00
newSegment := segmentCompactTo . SegmentInfo
modSegIDs := lo . Map ( modInfos , func ( segment * datapb . SegmentInfo , _ int ) int64 { return segment . GetID ( ) } )
2023-03-03 14:13:49 +08:00
if newSegment . GetNumOfRows ( ) == 0 {
newSegment . State = commonpb . SegmentState_Dropped
2022-11-03 12:15:35 +08:00
}
2023-03-03 14:13:49 +08:00
2023-07-14 15:56:31 +08:00
log . Debug ( "meta update: alter meta store for compaction updates" ,
2022-11-03 12:15:35 +08:00
zap . Int64s ( "compact from segments (segments to be updated as dropped)" , modSegIDs ) ,
2023-07-14 15:56:31 +08:00
zap . Int64 ( "new segmentID" , newSegment . GetID ( ) ) ,
2022-12-26 16:35:30 +08:00
zap . Int ( "binlog" , len ( newSegment . GetBinlogs ( ) ) ) ,
zap . Int ( "stats log" , len ( newSegment . GetStatslogs ( ) ) ) ,
zap . Int ( "delta logs" , len ( newSegment . GetDeltalogs ( ) ) ) ,
2022-11-03 12:15:35 +08:00
zap . Int64 ( "compact to segment" , newSegment . GetID ( ) ) )
2022-11-24 10:19:12 +08:00
2023-07-13 14:08:29 +08:00
err := m . catalog . AlterSegments ( m . ctx , append ( modInfos , newSegment ) , metastore . BinlogsIncrement {
2023-07-19 15:28:57 +08:00
Segment : newSegment ,
2023-07-13 14:08:29 +08:00
} )
2023-03-17 17:27:56 +08:00
if err != nil {
log . Warn ( "fail to alter segments and new segment" , zap . Error ( err ) )
2022-11-24 10:19:12 +08:00
return err
}
2023-03-17 17:27:56 +08:00
var compactFromIDs [ ] int64
for _ , v := range segmentsCompactFrom {
compactFromIDs = append ( compactFromIDs , v . GetID ( ) )
2022-11-24 10:19:12 +08:00
}
2022-09-27 16:02:53 +08:00
m . Lock ( )
defer m . Unlock ( )
2023-03-17 17:27:56 +08:00
for _ , s := range segmentsCompactFrom {
2022-06-18 01:24:11 +08:00
m . segments . SetSegment ( s . GetID ( ) , s )
2021-11-05 22:25:00 +08:00
}
2023-03-17 17:27:56 +08:00
m . segments . SetSegment ( segmentCompactTo . GetID ( ) , segmentCompactTo )
log . Info ( "meta update: alter in memory meta after compaction - complete" ,
zap . Int64 ( "compact to segment ID" , segmentCompactTo . GetID ( ) ) ,
zap . Int64s ( "compact from segment IDs" , compactFromIDs ) )
2022-11-24 10:19:12 +08:00
return nil
2021-11-05 22:25:00 +08:00
}
func ( m * meta ) updateBinlogs ( origin [ ] * datapb . FieldBinlog , removes [ ] * datapb . FieldBinlog , adds [ ] * datapb . FieldBinlog ) [ ] * datapb . FieldBinlog {
2021-12-19 20:00:42 +08:00
fieldBinlogs := make ( map [ int64 ] map [ string ] * datapb . Binlog )
2021-11-05 22:25:00 +08:00
for _ , f := range origin {
fid := f . GetFieldID ( )
if _ , ok := fieldBinlogs [ fid ] ; ! ok {
2021-12-19 20:00:42 +08:00
fieldBinlogs [ fid ] = make ( map [ string ] * datapb . Binlog )
2021-11-05 22:25:00 +08:00
}
for _ , p := range f . GetBinlogs ( ) {
2021-12-19 20:00:42 +08:00
fieldBinlogs [ fid ] [ p . GetLogPath ( ) ] = p
2021-11-05 22:25:00 +08:00
}
}
for _ , f := range removes {
fid := f . GetFieldID ( )
if _ , ok := fieldBinlogs [ fid ] ; ! ok {
continue
}
for _ , p := range f . GetBinlogs ( ) {
2021-12-19 20:00:42 +08:00
delete ( fieldBinlogs [ fid ] , p . GetLogPath ( ) )
2021-11-05 22:25:00 +08:00
}
}
for _ , f := range adds {
fid := f . GetFieldID ( )
if _ , ok := fieldBinlogs [ fid ] ; ! ok {
2021-12-19 20:00:42 +08:00
fieldBinlogs [ fid ] = make ( map [ string ] * datapb . Binlog )
2021-11-05 22:25:00 +08:00
}
for _ , p := range f . GetBinlogs ( ) {
2021-12-19 20:00:42 +08:00
fieldBinlogs [ fid ] [ p . GetLogPath ( ) ] = p
2021-11-05 22:25:00 +08:00
}
}
res := make ( [ ] * datapb . FieldBinlog , 0 , len ( fieldBinlogs ) )
for fid , logs := range fieldBinlogs {
if len ( logs ) == 0 {
continue
}
2021-12-19 20:00:42 +08:00
binlogs := make ( [ ] * datapb . Binlog , 0 , len ( logs ) )
for _ , log := range logs {
binlogs = append ( binlogs , log )
2021-11-05 22:25:00 +08:00
}
field := & datapb . FieldBinlog { FieldID : fid , Binlogs : binlogs }
res = append ( res , field )
}
return res
}
2021-12-19 20:00:42 +08:00
func ( m * meta ) updateDeltalogs ( origin [ ] * datapb . FieldBinlog , removes [ ] * datapb . FieldBinlog , adds [ ] * datapb . FieldBinlog ) [ ] * datapb . FieldBinlog {
res := make ( [ ] * datapb . FieldBinlog , 0 , len ( origin ) )
for _ , fbl := range origin {
logs := make ( map [ string ] * datapb . Binlog )
for _ , d := range fbl . GetBinlogs ( ) {
logs [ d . GetLogPath ( ) ] = d
}
for _ , remove := range removes {
if remove . GetFieldID ( ) == fbl . GetFieldID ( ) {
for _ , r := range remove . GetBinlogs ( ) {
delete ( logs , r . GetLogPath ( ) )
}
}
}
binlogs := make ( [ ] * datapb . Binlog , 0 , len ( logs ) )
for _ , l := range logs {
binlogs = append ( binlogs , l )
}
if len ( binlogs ) > 0 {
res = append ( res , & datapb . FieldBinlog {
FieldID : fbl . GetFieldID ( ) ,
Binlogs : binlogs ,
} )
}
2021-11-05 22:25:00 +08:00
}
return res
}
2021-09-08 11:35:59 +08:00
// buildSegment utility function for compose datapb.SegmentInfo struct with provided info
2022-09-26 18:06:54 +08:00
func buildSegment ( collectionID UniqueID , partitionID UniqueID , segmentID UniqueID , channelName string , isImporting bool ) * SegmentInfo {
2021-07-12 17:24:25 +08:00
info := & datapb . SegmentInfo {
2021-04-09 09:55:04 +08:00
ID : segmentID ,
2021-02-02 18:53:10 +08:00
CollectionID : collectionID ,
PartitionID : partitionID ,
InsertChannel : channelName ,
2021-06-04 11:45:45 +08:00
NumOfRows : 0 ,
2021-03-11 14:14:29 +08:00
State : commonpb . SegmentState_Growing ,
2022-09-26 18:06:54 +08:00
IsImporting : isImporting ,
2021-07-07 14:02:01 +08:00
}
2021-07-12 17:24:25 +08:00
return NewSegmentInfo ( info )
2021-01-25 15:17:17 +08:00
}
2021-11-12 00:22:42 +08:00
func isSegmentHealthy ( segment * SegmentInfo ) bool {
2022-06-29 16:54:17 +08:00
return segment != nil &&
segment . GetState ( ) != commonpb . SegmentState_SegmentStateNone &&
2021-11-29 22:35:41 +08:00
segment . GetState ( ) != commonpb . SegmentState_NotExist &&
2021-11-12 00:22:42 +08:00
segment . GetState ( ) != commonpb . SegmentState_Dropped
}
2022-05-31 16:36:03 +08:00
func ( m * meta ) HasSegments ( segIDs [ ] UniqueID ) ( bool , error ) {
m . RLock ( )
defer m . RUnlock ( )
for _ , segID := range segIDs {
if _ , ok := m . segments . segments [ segID ] ; ! ok {
return false , fmt . Errorf ( "segment is not exist with ID = %d" , segID )
}
}
return true , nil
}
2022-11-01 21:01:34 +08:00
func ( m * meta ) GetCompactionTo ( segmentID int64 ) * SegmentInfo {
m . RLock ( )
defer m . RUnlock ( )
segments := m . segments . GetSegments ( )
for _ , segment := range segments {
parents := typeutil . NewUniqueSet ( segment . GetCompactionFrom ( ) ... )
if parents . Contain ( segmentID ) {
return segment
}
}
return nil
}
2022-11-10 22:13:04 +08:00
// UpdateChannelCheckpoint updates and saves channel checkpoint.
2023-03-04 23:21:50 +08:00
func ( m * meta ) UpdateChannelCheckpoint ( vChannel string , pos * msgpb . MsgPosition ) error {
2022-11-10 22:13:04 +08:00
if pos == nil {
return fmt . Errorf ( "channelCP is nil, vChannel=%s" , vChannel )
}
2022-11-12 21:09:04 +08:00
m . Lock ( )
defer m . Unlock ( )
2022-11-10 22:13:04 +08:00
oldPosition , ok := m . channelCPs [ vChannel ]
if ! ok || oldPosition . Timestamp < pos . Timestamp {
err := m . catalog . SaveChannelCheckpoint ( m . ctx , vChannel , pos )
if err != nil {
return err
}
m . channelCPs [ vChannel ] = pos
ts , _ := tsoutil . ParseTS ( pos . Timestamp )
2023-03-09 14:13:52 +08:00
log . Debug ( "UpdateChannelCheckpoint done" ,
zap . String ( "vChannel" , vChannel ) ,
zap . Uint64 ( "ts" , pos . Timestamp ) ,
zap . Time ( "time" , ts ) )
2022-11-10 22:13:04 +08:00
}
return nil
}
2023-03-04 23:21:50 +08:00
func ( m * meta ) GetChannelCheckpoint ( vChannel string ) * msgpb . MsgPosition {
2022-11-10 22:13:04 +08:00
m . RLock ( )
defer m . RUnlock ( )
if m . channelCPs [ vChannel ] == nil {
return nil
}
2023-03-04 23:21:50 +08:00
return proto . Clone ( m . channelCPs [ vChannel ] ) . ( * msgpb . MsgPosition )
2022-11-10 22:13:04 +08:00
}
func ( m * meta ) DropChannelCheckpoint ( vChannel string ) error {
m . Lock ( )
defer m . Unlock ( )
err := m . catalog . DropChannelCheckpoint ( m . ctx , vChannel )
if err != nil {
return err
}
delete ( m . channelCPs , vChannel )
log . Debug ( "DropChannelCheckpoint done" , zap . String ( "vChannel" , vChannel ) )
return nil
}
2022-11-22 19:21:13 +08:00
2023-01-12 09:55:42 +08:00
func ( m * meta ) GcConfirm ( ctx context . Context , collectionID , partitionID UniqueID ) bool {
return m . catalog . GcConfirm ( ctx , collectionID , partitionID )
}
2022-11-22 19:21:13 +08:00
// addNewSeg update metrics update for a new segment.
func ( s * segMetricMutation ) addNewSeg ( state commonpb . SegmentState , rowCount int64 ) {
s . stateChange [ state . String ( ) ] ++
s . rowCountChange += rowCount
s . rowCountAccChange += rowCount
}
// commit persists all updates in current segMetricMutation, should and must be called AFTER segment state change
// has persisted in Etcd.
func ( s * segMetricMutation ) commit ( ) {
for state , change := range s . stateChange {
metrics . DataCoordNumSegments . WithLabelValues ( state ) . Add ( float64 ( change ) )
}
metrics . DataCoordNumStoredRows . WithLabelValues ( ) . Add ( float64 ( s . rowCountChange ) )
metrics . DataCoordNumStoredRowsCounter . WithLabelValues ( ) . Add ( float64 ( s . rowCountAccChange ) )
}
// append updates current segMetricMutation when segment state change happens.
func ( s * segMetricMutation ) append ( oldState , newState commonpb . SegmentState , rowCountUpdate int64 ) {
if oldState != newState {
s . stateChange [ oldState . String ( ) ] --
s . stateChange [ newState . String ( ) ] ++
}
// Update # of rows on new flush operations and drop operations.
if isFlushState ( newState ) && ! isFlushState ( oldState ) {
// If new flush.
s . rowCountChange += rowCountUpdate
s . rowCountAccChange += rowCountUpdate
} else if newState == commonpb . SegmentState_Dropped && oldState != newState {
// If new drop.
s . rowCountChange -= rowCountUpdate
}
}
func isFlushState ( state commonpb . SegmentState ) bool {
return state == commonpb . SegmentState_Flushing || state == commonpb . SegmentState_Flushed
}
// updateSegStateAndPrepareMetrics updates a segment's in-memory state and prepare for the corresponding metric update.
func updateSegStateAndPrepareMetrics ( segToUpdate * SegmentInfo , targetState commonpb . SegmentState , metricMutation * segMetricMutation ) {
log . Debug ( "updating segment state and updating metrics" ,
2023-05-04 12:22:40 +08:00
zap . Int64 ( "segmentID" , segToUpdate . GetID ( ) ) ,
2022-11-22 19:21:13 +08:00
zap . String ( "old state" , segToUpdate . GetState ( ) . String ( ) ) ,
zap . String ( "new state" , targetState . String ( ) ) ,
zap . Int64 ( "# of rows" , segToUpdate . GetNumOfRows ( ) ) )
metricMutation . append ( segToUpdate . GetState ( ) , targetState , segToUpdate . GetNumOfRows ( ) )
segToUpdate . State = targetState
}