2021-11-17 19:43:50 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-06-15 12:41:40 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-17 19:43:50 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-06-15 12:41:40 +08:00
//
2021-11-17 19:43:50 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-06-15 12:41:40 +08:00
2021-06-22 16:44:09 +08:00
package querycoord
2021-06-15 12:41:40 +08:00
import (
2021-10-22 19:07:15 +08:00
"context"
2021-06-15 12:41:40 +08:00
"errors"
2021-06-19 11:45:09 +08:00
"fmt"
"path/filepath"
2021-06-15 12:41:40 +08:00
"strconv"
2021-11-15 15:17:09 +08:00
"strings"
2021-06-15 12:41:40 +08:00
"sync"
2021-06-19 11:45:09 +08:00
"github.com/golang/protobuf/proto"
2021-06-15 12:41:40 +08:00
"go.uber.org/zap"
2021-09-29 09:56:04 +08:00
"github.com/milvus-io/milvus/internal/kv"
2021-06-15 12:41:40 +08:00
"github.com/milvus-io/milvus/internal/log"
2021-10-22 19:07:15 +08:00
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
2021-11-13 08:49:08 +08:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2021-10-22 19:07:15 +08:00
"github.com/milvus-io/milvus/internal/proto/internalpb"
2021-06-15 12:41:40 +08:00
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
2021-11-17 12:13:10 +08:00
"github.com/milvus-io/milvus/internal/util"
2021-06-15 12:41:40 +08:00
)
2021-06-19 11:45:09 +08:00
const (
2021-12-21 13:50:54 +08:00
collectionMetaPrefix = "queryCoord-collectionMeta"
dmChannelMetaPrefix = "queryCoord-dmChannelWatchInfo"
deltaChannelMetaPrefix = "queryCoord-deltaChannel"
2021-06-19 11:45:09 +08:00
)
2021-10-22 19:07:15 +08:00
type col2SegmentInfos = map [ UniqueID ] [ ] * querypb . SegmentInfo
2021-10-26 15:18:22 +08:00
type col2SealedSegmentChangeInfos = map [ UniqueID ] * querypb . SealedSegmentsChangeInfo
2021-10-22 19:07:15 +08:00
2021-09-26 21:57:57 +08:00
// Meta contains information about all loaded collections and partitions, including segment information and vchannel information
2021-08-02 22:39:25 +08:00
type Meta interface {
reloadFromKV ( ) error
2021-10-22 19:07:15 +08:00
setKvClient ( kv kv . MetaKv )
2021-08-02 22:39:25 +08:00
showCollections ( ) [ ] * querypb . CollectionInfo
hasCollection ( collectionID UniqueID ) bool
getCollectionInfoByID ( collectionID UniqueID ) ( * querypb . CollectionInfo , error )
2021-12-21 11:57:39 +08:00
addCollection ( collectionID UniqueID , loadType querypb . LoadType , schema * schemapb . CollectionSchema ) error
2021-08-02 22:39:25 +08:00
releaseCollection ( collectionID UniqueID ) error
2021-12-21 11:57:39 +08:00
addPartitions ( collectionID UniqueID , partitionIDs [ ] UniqueID ) error
2021-08-02 22:39:25 +08:00
showPartitions ( collectionID UniqueID ) ( [ ] * querypb . PartitionStates , error )
hasPartition ( collectionID UniqueID , partitionID UniqueID ) bool
hasReleasePartition ( collectionID UniqueID , partitionID UniqueID ) bool
2021-12-21 11:57:39 +08:00
releasePartitions ( collectionID UniqueID , partitionIDs [ ] UniqueID ) error
2021-08-02 22:39:25 +08:00
showSegmentInfos ( collectionID UniqueID , partitionIDs [ ] UniqueID ) [ ] * querypb . SegmentInfo
getSegmentInfoByID ( segmentID UniqueID ) ( * querypb . SegmentInfo , error )
2021-11-06 15:22:56 +08:00
getSegmentInfosByNode ( nodeID int64 ) [ ] * querypb . SegmentInfo
2021-08-02 22:39:25 +08:00
getPartitionStatesByID ( collectionID UniqueID , partitionID UniqueID ) ( * querypb . PartitionStates , error )
2021-12-21 11:57:39 +08:00
getDmChannelInfosByNodeID ( nodeID int64 ) [ ] * querypb . DmChannelWatchInfo
setDmChannelInfos ( channelInfos [ ] * querypb . DmChannelWatchInfo ) error
2021-08-02 22:39:25 +08:00
2021-11-13 08:49:08 +08:00
getDeltaChannelsByCollectionID ( collectionID UniqueID ) ( [ ] * datapb . VchannelInfo , error )
setDeltaChannel ( collectionID UniqueID , info [ ] * datapb . VchannelInfo ) error
2021-12-21 13:50:54 +08:00
getQueryChannelInfoByID ( collectionID UniqueID ) * querypb . QueryChannelInfo
getQueryStreamByID ( collectionID UniqueID , queryChannel string ) ( msgstream . MsgStream , error )
2021-08-02 22:39:25 +08:00
setLoadType ( collectionID UniqueID , loadType querypb . LoadType ) error
setLoadPercentage ( collectionID UniqueID , partitionID UniqueID , percentage int64 , loadType querypb . LoadType ) error
2021-09-29 09:56:04 +08:00
//printMeta()
2021-10-22 19:07:15 +08:00
saveGlobalSealedSegInfos ( saves col2SegmentInfos ) ( col2SealedSegmentChangeInfos , error )
removeGlobalSealedSegInfos ( collectionID UniqueID , partitionIDs [ ] UniqueID ) ( col2SealedSegmentChangeInfos , error )
2021-12-21 13:50:54 +08:00
sendSealedSegmentChangeInfos ( collectionID UniqueID , queryChannel string , changeInfos * querypb . SealedSegmentsChangeInfo ) ( * internalpb . MsgPosition , error )
2021-08-02 22:39:25 +08:00
}
2021-10-02 09:49:40 +08:00
// MetaReplica records the current load information on all querynodes
2021-08-02 22:39:25 +08:00
type MetaReplica struct {
2021-10-22 19:07:15 +08:00
ctx context . Context
cancel context . CancelFunc
client kv . MetaKv // client of a reliable kv service, i.e. etcd client
msFactory msgstream . Factory
idAllocator func ( ) ( UniqueID , error )
2021-06-19 11:45:09 +08:00
2021-10-22 19:07:15 +08:00
//sync.RWMutex
2021-06-15 12:41:40 +08:00
collectionInfos map [ UniqueID ] * querypb . CollectionInfo
2021-10-22 19:07:15 +08:00
collectionMu sync . RWMutex
2021-06-15 12:41:40 +08:00
segmentInfos map [ UniqueID ] * querypb . SegmentInfo
2021-10-22 19:07:15 +08:00
segmentMu sync . RWMutex
2021-06-15 12:41:40 +08:00
queryChannelInfos map [ UniqueID ] * querypb . QueryChannelInfo
2021-10-22 19:07:15 +08:00
channelMu sync . RWMutex
2021-11-13 08:49:08 +08:00
deltaChannelInfos map [ UniqueID ] [ ] * datapb . VchannelInfo
deltaChannelMu sync . RWMutex
2021-12-21 11:57:39 +08:00
dmChannelInfos map [ string ] * querypb . DmChannelWatchInfo
dmChannelMu sync . RWMutex
2021-10-22 19:07:15 +08:00
queryStreams map [ UniqueID ] msgstream . MsgStream
streamMu sync . RWMutex
2021-06-15 12:41:40 +08:00
2021-08-02 22:39:25 +08:00
//partitionStates map[UniqueID]*querypb.PartitionStates
2021-06-15 12:41:40 +08:00
}
2021-10-22 19:07:15 +08:00
func newMeta ( ctx context . Context , kv kv . MetaKv , factory msgstream . Factory , idAllocator func ( ) ( UniqueID , error ) ) ( Meta , error ) {
childCtx , cancel := context . WithCancel ( ctx )
2021-06-15 12:41:40 +08:00
collectionInfos := make ( map [ UniqueID ] * querypb . CollectionInfo )
segmentInfos := make ( map [ UniqueID ] * querypb . SegmentInfo )
queryChannelInfos := make ( map [ UniqueID ] * querypb . QueryChannelInfo )
2021-11-13 08:49:08 +08:00
deltaChannelInfos := make ( map [ UniqueID ] [ ] * datapb . VchannelInfo )
2021-12-21 11:57:39 +08:00
dmChannelInfos := make ( map [ string ] * querypb . DmChannelWatchInfo )
2021-10-22 19:07:15 +08:00
queryMsgStream := make ( map [ UniqueID ] msgstream . MsgStream )
2021-06-19 11:45:09 +08:00
2021-08-02 22:39:25 +08:00
m := & MetaReplica {
2021-10-22 19:07:15 +08:00
ctx : childCtx ,
cancel : cancel ,
client : kv ,
msFactory : factory ,
idAllocator : idAllocator ,
2021-12-21 13:50:54 +08:00
collectionInfos : collectionInfos ,
segmentInfos : segmentInfos ,
queryChannelInfos : queryChannelInfos ,
deltaChannelInfos : deltaChannelInfos ,
dmChannelInfos : dmChannelInfos ,
queryStreams : queryMsgStream ,
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
err := m . reloadFromKV ( )
if err != nil {
return nil , err
}
return m , nil
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) reloadFromKV ( ) error {
2021-12-01 16:39:31 +08:00
log . Debug ( "start reload from kv" )
2021-06-19 11:45:09 +08:00
collectionKeys , collectionValues , err := m . client . LoadWithPrefix ( collectionMetaPrefix )
if err != nil {
return err
}
for index := range collectionKeys {
collectionID , err := strconv . ParseInt ( filepath . Base ( collectionKeys [ index ] ) , 10 , 64 )
if err != nil {
return err
}
collectionInfo := & querypb . CollectionInfo { }
2021-09-29 20:26:00 +08:00
err = proto . Unmarshal ( [ ] byte ( collectionValues [ index ] ) , collectionInfo )
2021-06-19 11:45:09 +08:00
if err != nil {
return err
}
m . collectionInfos [ collectionID ] = collectionInfo
}
2021-11-17 12:13:10 +08:00
segmentKeys , segmentValues , err := m . client . LoadWithPrefix ( util . SegmentMetaPrefix )
2021-06-19 11:45:09 +08:00
if err != nil {
return err
}
for index := range segmentKeys {
segmentID , err := strconv . ParseInt ( filepath . Base ( segmentKeys [ index ] ) , 10 , 64 )
if err != nil {
return err
}
segmentInfo := & querypb . SegmentInfo { }
2021-09-29 20:26:00 +08:00
err = proto . Unmarshal ( [ ] byte ( segmentValues [ index ] ) , segmentInfo )
2021-06-19 11:45:09 +08:00
if err != nil {
return err
}
m . segmentInfos [ segmentID ] = segmentInfo
}
2021-11-13 08:49:08 +08:00
deltaChannelKeys , deltaChannelValues , err := m . client . LoadWithPrefix ( deltaChannelMetaPrefix )
if err != nil {
return nil
}
for index , value := range deltaChannelValues {
2021-11-15 15:17:09 +08:00
pathStrings := strings . Split ( deltaChannelKeys [ index ] , "/" )
collectionID , err := strconv . ParseInt ( pathStrings [ len ( pathStrings ) - 2 ] , 10 , 64 )
2021-11-13 08:49:08 +08:00
if err != nil {
return err
}
deltaChannelInfo := & datapb . VchannelInfo { }
err = proto . Unmarshal ( [ ] byte ( value ) , deltaChannelInfo )
if err != nil {
return err
}
m . deltaChannelInfos [ collectionID ] = append ( m . deltaChannelInfos [ collectionID ] , deltaChannelInfo )
}
2021-12-21 11:57:39 +08:00
dmChannelKeys , dmChannelValues , err := m . client . LoadWithPrefix ( dmChannelMetaPrefix )
if err != nil {
return err
}
for index := range dmChannelKeys {
dmChannel := filepath . Base ( dmChannelKeys [ index ] )
dmChannelWatchInfo := & querypb . DmChannelWatchInfo { }
err = proto . Unmarshal ( [ ] byte ( dmChannelValues [ index ] ) , dmChannelWatchInfo )
if err != nil {
return err
}
m . dmChannelInfos [ dmChannel ] = dmChannelWatchInfo
}
2021-06-19 11:45:09 +08:00
//TODO::update partition states
2021-12-01 16:39:31 +08:00
log . Debug ( "reload from kv finished" )
2021-06-19 11:45:09 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-10-22 19:07:15 +08:00
func ( m * MetaReplica ) setKvClient ( kv kv . MetaKv ) {
m . client = kv
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) showCollections ( ) [ ] * querypb . CollectionInfo {
2021-10-22 19:07:15 +08:00
m . collectionMu . RLock ( )
defer m . collectionMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
2021-08-02 22:39:25 +08:00
collections := make ( [ ] * querypb . CollectionInfo , 0 )
for _ , info := range m . collectionInfos {
collections = append ( collections , proto . Clone ( info ) . ( * querypb . CollectionInfo ) )
2021-06-15 12:41:40 +08:00
}
return collections
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) showPartitions ( collectionID UniqueID ) ( [ ] * querypb . PartitionStates , error ) {
2021-10-22 19:07:15 +08:00
m . collectionMu . RLock ( )
defer m . collectionMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
2021-06-24 21:10:13 +08:00
//TODO::should update after load collection
2021-08-02 22:39:25 +08:00
results := make ( [ ] * querypb . PartitionStates , 0 )
2021-06-15 12:41:40 +08:00
if info , ok := m . collectionInfos [ collectionID ] ; ok {
2021-08-02 22:39:25 +08:00
for _ , state := range info . PartitionStates {
results = append ( results , proto . Clone ( state ) . ( * querypb . PartitionStates ) )
}
return results , nil
2021-06-15 12:41:40 +08:00
}
return nil , errors . New ( "showPartitions: can't find collection in collectionInfos" )
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) hasCollection ( collectionID UniqueID ) bool {
2021-10-22 19:07:15 +08:00
m . collectionMu . RLock ( )
defer m . collectionMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
if _ , ok := m . collectionInfos [ collectionID ] ; ok {
return true
}
return false
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) hasPartition ( collectionID UniqueID , partitionID UniqueID ) bool {
2021-10-22 19:07:15 +08:00
m . collectionMu . RLock ( )
defer m . collectionMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
if info , ok := m . collectionInfos [ collectionID ] ; ok {
for _ , id := range info . PartitionIDs {
if partitionID == id {
return true
}
}
}
return false
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) hasReleasePartition ( collectionID UniqueID , partitionID UniqueID ) bool {
2021-10-22 19:07:15 +08:00
m . collectionMu . RLock ( )
defer m . collectionMu . RUnlock ( )
2021-06-23 17:44:12 +08:00
if info , ok := m . collectionInfos [ collectionID ] ; ok {
for _ , id := range info . ReleasedPartitionIDs {
if partitionID == id {
return true
}
}
}
return false
}
2021-12-21 11:57:39 +08:00
func ( m * MetaReplica ) addCollection ( collectionID UniqueID , loadType querypb . LoadType , schema * schemapb . CollectionSchema ) error {
2021-10-22 19:07:15 +08:00
hasCollection := m . hasCollection ( collectionID )
if ! hasCollection {
2021-12-21 11:57:39 +08:00
var partitionIDs [ ] UniqueID
var partitionStates [ ] * querypb . PartitionStates
2021-06-15 12:41:40 +08:00
newCollection := & querypb . CollectionInfo {
2021-08-02 22:39:25 +08:00
CollectionID : collectionID ,
2021-12-21 11:57:39 +08:00
PartitionIDs : partitionIDs ,
2021-08-02 22:39:25 +08:00
PartitionStates : partitionStates ,
2021-12-21 11:57:39 +08:00
LoadType : loadType ,
2021-08-02 22:39:25 +08:00
Schema : schema ,
2021-06-15 12:41:40 +08:00
}
2021-08-02 22:39:25 +08:00
err := saveGlobalCollectionInfo ( collectionID , newCollection , m . client )
2021-06-19 11:45:09 +08:00
if err != nil {
log . Error ( "save collectionInfo error" , zap . Any ( "error" , err . Error ( ) ) , zap . Int64 ( "collectionID" , collectionID ) )
2021-08-02 22:39:25 +08:00
return err
2021-06-19 11:45:09 +08:00
}
2021-10-22 19:07:15 +08:00
m . collectionMu . Lock ( )
m . collectionInfos [ collectionID ] = newCollection
m . collectionMu . Unlock ( )
2021-06-15 12:41:40 +08:00
}
2021-08-02 22:39:25 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-12-21 11:57:39 +08:00
func ( m * MetaReplica ) addPartitions ( collectionID UniqueID , partitionIDs [ ] UniqueID ) error {
2021-10-22 19:07:15 +08:00
m . collectionMu . Lock ( )
defer m . collectionMu . Unlock ( )
if info , ok := m . collectionInfos [ collectionID ] ; ok {
2021-12-21 11:57:39 +08:00
collectionInfo := proto . Clone ( info ) . ( * querypb . CollectionInfo )
loadedPartitionID2State := make ( map [ UniqueID ] * querypb . PartitionStates )
for _ , partitionID := range partitionIDs {
loadedPartitionID2State [ partitionID ] = & querypb . PartitionStates {
PartitionID : partitionID ,
State : querypb . PartitionState_NotPresent ,
2021-06-15 12:41:40 +08:00
}
}
2021-12-21 11:57:39 +08:00
for offset , partitionID := range collectionInfo . PartitionIDs {
loadedPartitionID2State [ partitionID ] = collectionInfo . PartitionStates [ offset ]
}
newPartitionIDs := make ( [ ] UniqueID , 0 )
newPartitionStates := make ( [ ] * querypb . PartitionStates , 0 )
for partitionID , state := range loadedPartitionID2State {
newPartitionIDs = append ( newPartitionIDs , partitionID )
newPartitionStates = append ( newPartitionStates , state )
}
newReleasedPartitionIDs := make ( [ ] UniqueID , 0 )
for _ , releasedPartitionID := range collectionInfo . ReleasedPartitionIDs {
if _ , ok = loadedPartitionID2State [ releasedPartitionID ] ; ! ok {
newReleasedPartitionIDs = append ( newReleasedPartitionIDs , releasedPartitionID )
2021-06-23 17:44:12 +08:00
}
}
2021-12-21 11:57:39 +08:00
collectionInfo . PartitionIDs = newPartitionIDs
collectionInfo . PartitionStates = newPartitionStates
collectionInfo . ReleasedPartitionIDs = newReleasedPartitionIDs
log . Debug ( "add a partition to MetaReplica" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , collectionInfo . PartitionIDs ) )
err := saveGlobalCollectionInfo ( collectionID , collectionInfo , m . client )
2021-06-19 11:45:09 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "save collectionInfo error" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , collectionInfo . PartitionIDs ) , zap . Any ( "error" , err . Error ( ) ) )
2021-08-02 22:39:25 +08:00
return err
2021-06-19 11:45:09 +08:00
}
2021-12-21 11:57:39 +08:00
m . collectionInfos [ collectionID ] = collectionInfo
2021-06-15 12:41:40 +08:00
return nil
}
2021-12-21 11:57:39 +08:00
return fmt . Errorf ( "addPartition: can't find collection %d when add partition" , collectionID )
2021-06-15 12:41:40 +08:00
}
2021-12-21 11:57:39 +08:00
func ( m * MetaReplica ) releaseCollection ( collectionID UniqueID ) error {
err := removeCollectionMeta ( collectionID , m . client )
2021-10-22 19:07:15 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Warn ( "remove collectionInfo from etcd failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Any ( "error" , err . Error ( ) ) )
2021-10-22 19:07:15 +08:00
return err
}
2021-12-21 11:57:39 +08:00
m . collectionMu . Lock ( )
delete ( m . collectionInfos , collectionID )
m . collectionMu . Unlock ( )
m . deltaChannelMu . Lock ( )
delete ( m . deltaChannelInfos , collectionID )
m . deltaChannelMu . Unlock ( )
m . dmChannelMu . Lock ( )
for dmChannel , info := range m . dmChannelInfos {
if info . CollectionID == collectionID {
delete ( m . dmChannelInfos , dmChannel )
}
2021-06-19 11:45:09 +08:00
}
2021-12-21 11:57:39 +08:00
m . dmChannelMu . Unlock ( )
2021-08-02 22:39:25 +08:00
return nil
2021-06-19 11:45:09 +08:00
}
2021-12-21 11:57:39 +08:00
func ( m * MetaReplica ) releasePartitions ( collectionID UniqueID , releasedPartitionIDs [ ] UniqueID ) error {
m . collectionMu . Lock ( )
defer m . collectionMu . Unlock ( )
info , ok := m . collectionInfos [ collectionID ]
if ! ok {
return nil
}
collectionInfo := proto . Clone ( info ) . ( * querypb . CollectionInfo )
releasedPartitionMap := make ( map [ UniqueID ] struct { } )
for _ , partitionID := range releasedPartitionIDs {
releasedPartitionMap [ partitionID ] = struct { } { }
}
for _ , partitionID := range collectionInfo . ReleasedPartitionIDs {
releasedPartitionMap [ partitionID ] = struct { } { }
}
2021-06-19 11:45:09 +08:00
2021-12-21 11:57:39 +08:00
newPartitionIDs := make ( [ ] UniqueID , 0 )
newPartitionStates := make ( [ ] * querypb . PartitionStates , 0 )
for offset , partitionID := range collectionInfo . PartitionIDs {
if _ , ok = releasedPartitionMap [ partitionID ] ; ! ok {
newPartitionIDs = append ( newPartitionIDs , partitionID )
newPartitionStates = append ( newPartitionStates , collectionInfo . PartitionStates [ offset ] )
}
}
newReleasedPartitionIDs := make ( [ ] UniqueID , 0 )
for partitionID := range releasedPartitionMap {
newReleasedPartitionIDs = append ( newReleasedPartitionIDs , partitionID )
}
collectionInfo . PartitionIDs = newPartitionIDs
collectionInfo . PartitionStates = newPartitionStates
collectionInfo . ReleasedPartitionIDs = newReleasedPartitionIDs
err := saveGlobalCollectionInfo ( collectionID , collectionInfo , m . client )
2021-10-22 19:07:15 +08:00
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "releasePartition: remove partition infos error" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , releasedPartitionIDs ) , zap . Any ( "error" , err . Error ( ) ) )
2021-10-22 19:07:15 +08:00
return err
}
2021-12-21 11:57:39 +08:00
m . collectionInfos [ collectionID ] = collectionInfo
2021-10-22 19:07:15 +08:00
return nil
}
func ( m * MetaReplica ) saveGlobalSealedSegInfos ( saves col2SegmentInfos ) ( col2SealedSegmentChangeInfos , error ) {
if len ( saves ) == 0 {
return nil , nil
}
// generate segment change info according segment info to updated
col2SegmentChangeInfos := make ( col2SealedSegmentChangeInfos )
2021-12-21 11:57:39 +08:00
segmentsCompactionFrom := make ( [ ] * querypb . SegmentInfo , 0 )
2021-11-17 12:13:10 +08:00
// get segmentInfos to colSegmentInfos
2021-10-22 19:07:15 +08:00
for collectionID , onlineInfos := range saves {
2021-10-26 15:18:22 +08:00
segmentsChangeInfo := & querypb . SealedSegmentsChangeInfo {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_SealedSegmentsChangeInfo ,
} ,
Infos : [ ] * querypb . SegmentChangeInfo { } ,
}
2021-10-22 19:07:15 +08:00
for _ , info := range onlineInfos {
segmentID := info . SegmentID
onlineNodeID := info . NodeID
2021-10-26 15:18:22 +08:00
changeInfo := & querypb . SegmentChangeInfo {
2021-10-22 19:07:15 +08:00
OnlineNodeID : onlineNodeID ,
OnlineSegments : [ ] * querypb . SegmentInfo { info } ,
}
offlineInfo , err := m . getSegmentInfoByID ( segmentID )
if err == nil {
offlineNodeID := offlineInfo . NodeID
// if the offline segment state is growing, it will not impact the global sealed segments
2021-12-15 16:53:12 +08:00
if offlineInfo . SegmentState == commonpb . SegmentState_Sealed {
2021-10-26 15:18:22 +08:00
changeInfo . OfflineNodeID = offlineNodeID
changeInfo . OfflineSegments = [ ] * querypb . SegmentInfo { offlineInfo }
2021-10-22 19:07:15 +08:00
}
}
2021-10-26 15:18:22 +08:00
segmentsChangeInfo . Infos = append ( segmentsChangeInfo . Infos , changeInfo )
2021-11-08 21:00:02 +08:00
// generate offline segment change info if the loaded segment is compacted from other sealed segments
for _ , compactionSegmentID := range info . CompactionFrom {
compactionSegmentInfo , err := m . getSegmentInfoByID ( compactionSegmentID )
2021-12-15 16:53:12 +08:00
if err == nil && compactionSegmentInfo . SegmentState == commonpb . SegmentState_Sealed {
2021-11-08 21:00:02 +08:00
segmentsChangeInfo . Infos = append ( segmentsChangeInfo . Infos , & querypb . SegmentChangeInfo {
OfflineNodeID : compactionSegmentInfo . NodeID ,
OfflineSegments : [ ] * querypb . SegmentInfo { compactionSegmentInfo } ,
} )
2021-12-21 11:57:39 +08:00
segmentsCompactionFrom = append ( segmentsCompactionFrom , compactionSegmentInfo )
2021-11-08 21:00:02 +08:00
} else {
return nil , fmt . Errorf ( "saveGlobalSealedSegInfos: the compacted segment %d has not been loaded into memory" , compactionSegmentID )
}
}
2021-10-22 19:07:15 +08:00
}
2021-10-26 15:18:22 +08:00
col2SegmentChangeInfos [ collectionID ] = segmentsChangeInfo
2021-10-22 19:07:15 +08:00
}
queryChannelInfosMap := make ( map [ UniqueID ] * querypb . QueryChannelInfo )
for collectionID , segmentChangeInfos := range col2SegmentChangeInfos {
// get msgStream to produce sealedSegmentChangeInfos to query channel
2021-12-21 13:50:54 +08:00
queryChannelInfo := m . getQueryChannelInfoByID ( collectionID )
msgPosition , err := m . sendSealedSegmentChangeInfos ( collectionID , queryChannelInfo . QueryChannel , segmentChangeInfos )
2021-10-22 19:07:15 +08:00
if err != nil {
return nil , err
}
2021-12-21 13:50:54 +08:00
queryChannelInfo . SeekPosition = msgPosition
2021-10-22 19:07:15 +08:00
// update segmentInfo, queryChannelInfo meta to cache and etcd
seg2Info := make ( map [ UniqueID ] * querypb . SegmentInfo )
for _ , segmentInfo := range queryChannelInfo . GlobalSealedSegments {
segmentID := segmentInfo . SegmentID
seg2Info [ segmentID ] = segmentInfo
}
if infos , ok := saves [ collectionID ] ; ok {
for _ , segmentInfo := range infos {
segmentID := segmentInfo . SegmentID
seg2Info [ segmentID ] = segmentInfo
}
}
globalSealedSegmentInfos := make ( [ ] * querypb . SegmentInfo , 0 )
for _ , info := range seg2Info {
globalSealedSegmentInfos = append ( globalSealedSegmentInfos , info )
}
queryChannelInfo . GlobalSealedSegments = globalSealedSegmentInfos
queryChannelInfosMap [ collectionID ] = queryChannelInfo
}
2021-10-29 18:04:49 +08:00
// save segmentInfo to etcd
segmentInfoKvs := make ( map [ string ] string )
2021-10-22 19:07:15 +08:00
for _ , infos := range saves {
for _ , info := range infos {
segmentInfoBytes , err := proto . Marshal ( info )
2021-06-19 11:45:09 +08:00
if err != nil {
2021-10-22 19:07:15 +08:00
return col2SegmentChangeInfos , err
2021-06-19 11:45:09 +08:00
}
2021-12-21 11:57:39 +08:00
segmentKey := fmt . Sprintf ( "%s/%d/%d/%d" , util . SegmentMetaPrefix , info . CollectionID , info . PartitionID , info . SegmentID )
2021-10-29 18:04:49 +08:00
segmentInfoKvs [ segmentKey ] = string ( segmentInfoBytes )
}
}
for key , value := range segmentInfoKvs {
err := m . client . Save ( key , value )
if err != nil {
panic ( err )
2021-06-19 11:45:09 +08:00
}
}
2021-08-02 22:39:25 +08:00
2021-11-08 21:00:02 +08:00
// remove compacted segment info from etcd
2021-12-21 11:57:39 +08:00
for _ , segmentInfo := range segmentsCompactionFrom {
segmentKey := fmt . Sprintf ( "%s/%d/%d/%d" , util . SegmentMetaPrefix , segmentInfo . CollectionID , segmentInfo . PartitionID , segmentInfo . SegmentID )
2021-11-08 21:00:02 +08:00
err := m . client . Remove ( segmentKey )
if err != nil {
panic ( err )
}
}
2021-12-21 13:50:54 +08:00
// save sealedSegmentsChangeInfo to etcd
2021-10-29 18:04:49 +08:00
saveKvs := make ( map [ string ] string )
2021-10-22 19:07:15 +08:00
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed
// then the msgID key will not exist, and changeIndo will be ignored by query node
for _ , changeInfos := range col2SegmentChangeInfos {
2021-10-26 15:18:22 +08:00
changeInfoBytes , err := proto . Marshal ( changeInfos )
if err != nil {
return col2SegmentChangeInfos , err
2021-10-22 19:07:15 +08:00
}
2021-10-26 15:18:22 +08:00
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
2021-11-17 12:13:10 +08:00
changeInfoKey := fmt . Sprintf ( "%s/%d" , util . ChangeInfoMetaPrefix , changeInfos . Base . MsgID )
2021-10-26 15:18:22 +08:00
saveKvs [ changeInfoKey ] = string ( changeInfoBytes )
2021-10-22 19:07:15 +08:00
}
2021-12-21 13:50:54 +08:00
err := m . client . MultiSave ( saveKvs )
2021-10-22 19:07:15 +08:00
if err != nil {
2021-10-29 18:04:49 +08:00
panic ( err )
2021-10-22 19:07:15 +08:00
}
2021-10-29 18:04:49 +08:00
2021-10-22 19:07:15 +08:00
m . segmentMu . Lock ( )
for _ , segmentInfos := range saves {
for _ , info := range segmentInfos {
segmentID := info . SegmentID
m . segmentInfos [ segmentID ] = info
}
}
2021-12-21 11:57:39 +08:00
for _ , segmentInfo := range segmentsCompactionFrom {
delete ( m . segmentInfos , segmentInfo . SegmentID )
2021-11-08 21:00:02 +08:00
}
2021-10-22 19:07:15 +08:00
m . segmentMu . Unlock ( )
m . channelMu . Lock ( )
for collectionID , channelInfo := range queryChannelInfosMap {
m . queryChannelInfos [ collectionID ] = channelInfo
}
m . channelMu . Unlock ( )
return col2SegmentChangeInfos , nil
2021-06-15 12:41:40 +08:00
}
2021-10-22 19:07:15 +08:00
func ( m * MetaReplica ) removeGlobalSealedSegInfos ( collectionID UniqueID , partitionIDs [ ] UniqueID ) ( col2SealedSegmentChangeInfos , error ) {
removes := m . showSegmentInfos ( collectionID , partitionIDs )
if len ( removes ) == 0 {
return nil , nil
}
// get segmentInfos to remove
2021-10-26 15:18:22 +08:00
segmentChangeInfos := & querypb . SealedSegmentsChangeInfo {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_SealedSegmentsChangeInfo ,
} ,
Infos : [ ] * querypb . SegmentChangeInfo { } ,
}
2021-10-22 19:07:15 +08:00
for _ , info := range removes {
offlineNodeID := info . NodeID
2021-10-26 15:18:22 +08:00
changeInfo := & querypb . SegmentChangeInfo {
2021-10-22 19:07:15 +08:00
OfflineNodeID : offlineNodeID ,
OfflineSegments : [ ] * querypb . SegmentInfo { info } ,
}
2021-10-26 15:18:22 +08:00
segmentChangeInfos . Infos = append ( segmentChangeInfos . Infos , changeInfo )
2021-10-22 19:07:15 +08:00
}
2021-06-15 12:41:40 +08:00
2021-12-21 13:50:54 +08:00
// produce sealedSegmentChangeInfos to query channel
queryChannelInfo := m . getQueryChannelInfoByID ( collectionID )
msgPosition , err := m . sendSealedSegmentChangeInfos ( collectionID , queryChannelInfo . QueryChannel , segmentChangeInfos )
2021-06-19 11:45:09 +08:00
if err != nil {
2021-10-22 19:07:15 +08:00
return nil , err
2021-06-19 11:45:09 +08:00
}
2021-12-21 13:50:54 +08:00
queryChannelInfo . SeekPosition = msgPosition
2021-10-22 19:07:15 +08:00
// update segmentInfo, queryChannelInfo meta to cache and etcd
seg2Info := make ( map [ UniqueID ] * querypb . SegmentInfo )
for _ , segmentInfo := range queryChannelInfo . GlobalSealedSegments {
segmentID := segmentInfo . SegmentID
seg2Info [ segmentID ] = segmentInfo
}
for _ , segmentInfo := range removes {
segmentID := segmentInfo . SegmentID
delete ( seg2Info , segmentID )
}
globalSealedSegmentInfos := make ( [ ] * querypb . SegmentInfo , 0 )
for _ , info := range seg2Info {
globalSealedSegmentInfos = append ( globalSealedSegmentInfos , info )
}
queryChannelInfo . GlobalSealedSegments = globalSealedSegmentInfos
2021-10-29 18:04:49 +08:00
// remove meta from etcd
for _ , info := range removes {
2021-12-21 11:57:39 +08:00
segmentKey := fmt . Sprintf ( "%s/%d/%d/%d" , util . SegmentMetaPrefix , info . CollectionID , info . PartitionID , info . SegmentID )
2021-10-29 18:04:49 +08:00
err = m . client . Remove ( segmentKey )
if err != nil {
panic ( err )
}
}
2021-10-22 19:07:15 +08:00
saveKvs := make ( map [ string ] string )
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed
// then the msgID key will not exist, and changeIndo will be ignored by query node
2021-10-26 15:18:22 +08:00
changeInfoBytes , err := proto . Marshal ( segmentChangeInfos )
if err != nil {
return col2SealedSegmentChangeInfos { collectionID : segmentChangeInfos } , err
2021-10-22 19:07:15 +08:00
}
2021-10-26 15:18:22 +08:00
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
2021-11-17 12:13:10 +08:00
changeInfoKey := fmt . Sprintf ( "%s/%d" , util . ChangeInfoMetaPrefix , segmentChangeInfos . Base . MsgID )
2021-10-26 15:18:22 +08:00
saveKvs [ changeInfoKey ] = string ( changeInfoBytes )
2021-10-22 19:07:15 +08:00
2021-10-29 18:04:49 +08:00
err = m . client . MultiSave ( saveKvs )
2021-10-22 19:07:15 +08:00
if err != nil {
2021-10-29 18:04:49 +08:00
panic ( err )
2021-10-22 19:07:15 +08:00
}
2021-10-29 18:04:49 +08:00
2021-10-22 19:07:15 +08:00
m . segmentMu . Lock ( )
for _ , info := range removes {
delete ( m . segmentInfos , info . SegmentID )
}
m . segmentMu . Unlock ( )
m . channelMu . Lock ( )
m . queryChannelInfos [ collectionID ] = queryChannelInfo
m . channelMu . Unlock ( )
return col2SealedSegmentChangeInfos { collectionID : segmentChangeInfos } , nil
}
2021-11-17 12:13:10 +08:00
// send sealed segment change infos into query channels
2021-12-21 13:50:54 +08:00
func ( m * MetaReplica ) sendSealedSegmentChangeInfos ( collectionID UniqueID , queryChannel string , changeInfos * querypb . SealedSegmentsChangeInfo ) ( * internalpb . MsgPosition , error ) {
2021-10-22 19:07:15 +08:00
// get msgStream to produce sealedSegmentChangeInfos to query channel
2021-12-21 13:50:54 +08:00
queryStream , err := m . getQueryStreamByID ( collectionID , queryChannel )
2021-10-22 19:07:15 +08:00
if err != nil {
2021-12-21 13:50:54 +08:00
log . Error ( "sendSealedSegmentChangeInfos: get query stream failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Error ( err ) )
return nil , err
2021-10-22 19:07:15 +08:00
}
var msgPack = & msgstream . MsgPack {
Msgs : [ ] msgstream . TsMsg { } ,
}
2021-10-26 15:18:22 +08:00
id , err := m . idAllocator ( )
if err != nil {
2021-12-21 13:50:54 +08:00
log . Error ( "sendSealedSegmentChangeInfos: allocator trigger taskID failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Error ( err ) )
return nil , err
2021-10-26 15:18:22 +08:00
}
changeInfos . Base . MsgID = id
segmentChangeMsg := & msgstream . SealedSegmentsChangeInfoMsg {
BaseMsg : msgstream . BaseMsg {
HashValues : [ ] uint32 { 0 } ,
} ,
SealedSegmentsChangeInfo : * changeInfos ,
2021-10-22 19:07:15 +08:00
}
2021-10-26 15:18:22 +08:00
msgPack . Msgs = append ( msgPack . Msgs , segmentChangeMsg )
2021-10-22 19:07:15 +08:00
messageIDInfos , err := queryStream . ProduceMark ( msgPack )
if err != nil {
2021-12-21 13:50:54 +08:00
log . Error ( "sendSealedSegmentChangeInfos: send sealed segment change info failed" , zap . Int64 ( "collectionID" , collectionID ) , zap . Error ( err ) )
return nil , err
}
messageIDs , ok := messageIDInfos [ queryChannel ]
if ! ok {
return nil , fmt . Errorf ( "sendSealedSegmentChangeInfos: send sealed segment change info to wrong query channel, collectionID = %d, query channel = %s" , collectionID , queryChannel )
}
// len(messageIDs) = 1
if len ( messageIDs ) != 1 {
return nil , fmt . Errorf ( "sendSealedSegmentChangeInfos: length of the positions in stream is not correct, collectionID = %d, query channel = %s, len = %d" , collectionID , queryChannel , len ( messageIDs ) )
2021-10-22 19:07:15 +08:00
}
2021-12-21 13:50:54 +08:00
log . Debug ( "updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel" , zap . Any ( "msgPack" , msgPack ) )
return & internalpb . MsgPosition {
ChannelName : queryChannel ,
MsgID : messageIDs [ 0 ] . Serialize ( ) ,
} , nil
2021-06-15 12:41:40 +08:00
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) showSegmentInfos ( collectionID UniqueID , partitionIDs [ ] UniqueID ) [ ] * querypb . SegmentInfo {
2021-10-22 19:07:15 +08:00
m . segmentMu . RLock ( )
defer m . segmentMu . RUnlock ( )
2021-06-19 11:45:09 +08:00
2021-08-02 22:39:25 +08:00
results := make ( [ ] * querypb . SegmentInfo , 0 )
2021-06-15 12:41:40 +08:00
segmentInfos := make ( [ ] * querypb . SegmentInfo , 0 )
2021-08-02 22:39:25 +08:00
for _ , info := range m . segmentInfos {
if info . CollectionID == collectionID {
2021-06-19 11:45:09 +08:00
segmentInfos = append ( segmentInfos , proto . Clone ( info ) . ( * querypb . SegmentInfo ) )
2021-06-15 12:41:40 +08:00
}
}
2021-08-02 22:39:25 +08:00
if len ( partitionIDs ) == 0 {
return segmentInfos
}
2021-12-21 13:50:54 +08:00
partitionIDMap := getCompareMapFromSlice ( partitionIDs )
2021-08-02 22:39:25 +08:00
for _ , info := range segmentInfos {
2021-12-21 13:50:54 +08:00
partitionID := info . PartitionID
if _ , ok := partitionIDMap [ partitionID ] ; ok {
results = append ( results , info )
2021-08-02 22:39:25 +08:00
}
}
return results
2021-06-15 12:41:40 +08:00
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) getSegmentInfoByID ( segmentID UniqueID ) ( * querypb . SegmentInfo , error ) {
2021-10-22 19:07:15 +08:00
m . segmentMu . RLock ( )
defer m . segmentMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
if info , ok := m . segmentInfos [ segmentID ] ; ok {
2021-06-19 11:45:09 +08:00
return proto . Clone ( info ) . ( * querypb . SegmentInfo ) , nil
2021-06-15 12:41:40 +08:00
}
return nil , errors . New ( "getSegmentInfoByID: can't find segmentID in segmentInfos" )
}
2021-11-06 15:22:56 +08:00
func ( m * MetaReplica ) getSegmentInfosByNode ( nodeID int64 ) [ ] * querypb . SegmentInfo {
m . segmentMu . RLock ( )
defer m . segmentMu . RUnlock ( )
segmentInfos := make ( [ ] * querypb . SegmentInfo , 0 )
for _ , info := range m . segmentInfos {
if info . NodeID == nodeID {
segmentInfos = append ( segmentInfos , proto . Clone ( info ) . ( * querypb . SegmentInfo ) )
}
}
return segmentInfos
}
2021-06-15 12:41:40 +08:00
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) getCollectionInfoByID ( collectionID UniqueID ) ( * querypb . CollectionInfo , error ) {
2021-10-22 19:07:15 +08:00
m . collectionMu . RLock ( )
defer m . collectionMu . RUnlock ( )
2021-06-19 11:45:09 +08:00
if info , ok := m . collectionInfos [ collectionID ] ; ok {
return proto . Clone ( info ) . ( * querypb . CollectionInfo ) , nil
}
return nil , errors . New ( "getCollectionInfoByID: can't find collectionID in collectionInfo" )
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) getPartitionStatesByID ( collectionID UniqueID , partitionID UniqueID ) ( * querypb . PartitionStates , error ) {
2021-10-22 19:07:15 +08:00
m . collectionMu . RLock ( )
defer m . collectionMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
2021-08-02 22:39:25 +08:00
if info , ok := m . collectionInfos [ collectionID ] ; ok {
for offset , id := range info . PartitionIDs {
if id == partitionID {
return proto . Clone ( info . PartitionStates [ offset ] ) . ( * querypb . PartitionStates ) , nil
}
}
return nil , errors . New ( "getPartitionStateByID: can't find partitionID in partitionStates" )
2021-06-15 12:41:40 +08:00
}
2021-08-02 22:39:25 +08:00
return nil , errors . New ( "getPartitionStateByID: can't find collectionID in collectionInfo" )
2021-06-15 12:41:40 +08:00
}
2021-12-21 11:57:39 +08:00
func ( m * MetaReplica ) getDmChannelInfosByNodeID ( nodeID int64 ) [ ] * querypb . DmChannelWatchInfo {
m . dmChannelMu . RLock ( )
defer m . dmChannelMu . RUnlock ( )
2021-06-15 12:41:40 +08:00
2021-12-21 11:57:39 +08:00
var watchedDmChannelWatchInfo [ ] * querypb . DmChannelWatchInfo
for _ , channelInfo := range m . dmChannelInfos {
if channelInfo . NodeIDLoaded == nodeID {
watchedDmChannelWatchInfo = append ( watchedDmChannelWatchInfo , proto . Clone ( channelInfo ) . ( * querypb . DmChannelWatchInfo ) )
2021-06-15 12:41:40 +08:00
}
}
2021-12-21 11:57:39 +08:00
return watchedDmChannelWatchInfo
2021-06-15 12:41:40 +08:00
}
2021-12-21 11:57:39 +08:00
func ( m * MetaReplica ) setDmChannelInfos ( dmChannelWatchInfos [ ] * querypb . DmChannelWatchInfo ) error {
m . dmChannelMu . Lock ( )
defer m . dmChannelMu . Unlock ( )
2021-06-19 11:45:09 +08:00
2021-12-21 11:57:39 +08:00
err := saveDmChannelWatchInfos ( dmChannelWatchInfos , m . client )
if err != nil {
log . Error ( "save dmChannelWatchInfo error" , zap . Any ( "error" , err . Error ( ) ) )
return err
2021-06-19 11:45:09 +08:00
}
2021-12-21 11:57:39 +08:00
for _ , channelInfo := range dmChannelWatchInfos {
m . dmChannelInfos [ channelInfo . DmChannel ] = channelInfo
2021-06-15 12:41:40 +08:00
}
2021-12-21 11:57:39 +08:00
return nil
2021-06-15 12:41:40 +08:00
}
2021-12-21 13:50:54 +08:00
func ( m * MetaReplica ) createQueryChannel ( collectionID UniqueID ) * querypb . QueryChannelInfo {
// TODO::to remove
// all collection use the same query channel
colIDForAssignChannel := UniqueID ( 0 )
2021-06-15 12:41:40 +08:00
searchPrefix := Params . SearchChannelPrefix
searchResultPrefix := Params . SearchResultChannelPrefix
2021-12-21 13:50:54 +08:00
allocatedQueryChannel := searchPrefix + "-" + strconv . FormatInt ( colIDForAssignChannel , 10 )
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv . FormatInt ( colIDForAssignChannel , 10 )
2021-06-22 16:44:09 +08:00
log . Debug ( "query coordinator create query channel" , zap . String ( "queryChannelName" , allocatedQueryChannel ) , zap . String ( "queryResultChannelName" , allocatedQueryResultChannel ) )
2021-06-15 12:41:40 +08:00
2021-10-22 19:07:15 +08:00
seekPosition := & internalpb . MsgPosition {
ChannelName : allocatedQueryChannel ,
}
2021-12-21 13:50:54 +08:00
segmentInfos := m . showSegmentInfos ( collectionID , nil )
2021-10-22 19:07:15 +08:00
info := & querypb . QueryChannelInfo {
2021-06-15 12:41:40 +08:00
CollectionID : collectionID ,
2021-12-15 16:53:12 +08:00
QueryChannel : allocatedQueryChannel ,
QueryResultChannel : allocatedQueryResultChannel ,
2021-12-21 13:50:54 +08:00
GlobalSealedSegments : segmentInfos ,
2021-10-22 19:07:15 +08:00
SeekPosition : seekPosition ,
}
return info
}
2021-11-13 08:49:08 +08:00
// Get delta channel info for collection, so far all the collection share the same query channel 0
func ( m * MetaReplica ) getDeltaChannelsByCollectionID ( collectionID UniqueID ) ( [ ] * datapb . VchannelInfo , error ) {
m . deltaChannelMu . RLock ( )
defer m . deltaChannelMu . RUnlock ( )
if infos , ok := m . deltaChannelInfos [ collectionID ] ; ok {
return infos , nil
}
2021-12-21 11:57:39 +08:00
return nil , fmt . Errorf ( "delta channel not exist in meta, collectionID = %d" , collectionID )
2021-11-13 08:49:08 +08:00
}
func ( m * MetaReplica ) setDeltaChannel ( collectionID UniqueID , infos [ ] * datapb . VchannelInfo ) error {
m . deltaChannelMu . Lock ( )
defer m . deltaChannelMu . Unlock ( )
_ , ok := m . deltaChannelInfos [ collectionID ]
if ok {
2021-12-01 16:39:31 +08:00
log . Debug ( "delta channel already exist" , zap . Any ( "collectionID" , collectionID ) )
2021-11-13 08:49:08 +08:00
return nil
}
err := saveDeltaChannelInfo ( collectionID , infos , m . client )
if err != nil {
2021-12-21 11:57:39 +08:00
log . Error ( "save delta channel info error" , zap . Int64 ( "collectionID" , collectionID ) , zap . Error ( err ) )
2021-11-13 08:49:08 +08:00
return err
}
2021-12-21 11:57:39 +08:00
log . Debug ( "save delta channel infos to meta" , zap . Any ( "collectionID" , collectionID ) )
2021-11-13 08:49:08 +08:00
m . deltaChannelInfos [ collectionID ] = infos
return nil
}
2021-11-05 14:57:44 +08:00
// Get Query channel info for collection, so far all the collection share the same query channel 0
2021-12-21 13:50:54 +08:00
func ( m * MetaReplica ) getQueryChannelInfoByID ( collectionID UniqueID ) * querypb . QueryChannelInfo {
2021-10-22 19:07:15 +08:00
m . channelMu . Lock ( )
defer m . channelMu . Unlock ( )
2021-12-21 13:50:54 +08:00
var channelInfo * querypb . QueryChannelInfo
2021-10-22 19:07:15 +08:00
if info , ok := m . queryChannelInfos [ collectionID ] ; ok {
2021-12-21 13:50:54 +08:00
channelInfo = proto . Clone ( info ) . ( * querypb . QueryChannelInfo )
} else {
channelInfo = m . createQueryChannel ( collectionID )
m . queryChannelInfos [ collectionID ] = channelInfo
2021-06-15 12:41:40 +08:00
}
2021-10-22 19:07:15 +08:00
2021-12-21 13:50:54 +08:00
return proto . Clone ( channelInfo ) . ( * querypb . QueryChannelInfo )
2021-06-15 12:41:40 +08:00
}
2021-06-19 11:45:09 +08:00
2021-12-21 13:50:54 +08:00
func ( m * MetaReplica ) getQueryStreamByID ( collectionID UniqueID , queryChannel string ) ( msgstream . MsgStream , error ) {
2021-10-22 19:07:15 +08:00
m . streamMu . Lock ( )
defer m . streamMu . Unlock ( )
2021-06-19 11:45:09 +08:00
2021-12-21 13:50:54 +08:00
var queryStream msgstream . MsgStream
var err error
if stream , ok := m . queryStreams [ collectionID ] ; ok {
queryStream = stream
} else {
queryStream , err = m . msFactory . NewMsgStream ( m . ctx )
2021-10-22 19:07:15 +08:00
if err != nil {
log . Error ( "updateGlobalSealedSegmentInfos: create msgStream failed" , zap . Error ( err ) )
return nil , err
}
2021-12-21 13:50:54 +08:00
queryStream . AsProducer ( [ ] string { queryChannel } )
m . queryStreams [ collectionID ] = queryStream
2021-10-22 19:07:15 +08:00
log . Debug ( "getQueryStreamByID: create query msgStream for collection" , zap . Int64 ( "collectionID" , collectionID ) )
}
2021-12-21 13:50:54 +08:00
return queryStream , nil
2021-10-22 19:07:15 +08:00
}
func ( m * MetaReplica ) setLoadType ( collectionID UniqueID , loadType querypb . LoadType ) error {
2021-12-21 11:57:39 +08:00
m . collectionMu . Lock ( )
defer m . collectionMu . Unlock ( )
if _ , ok := m . collectionInfos [ collectionID ] ; ok {
info := proto . Clone ( m . collectionInfos [ collectionID ] ) . ( * querypb . CollectionInfo )
2021-08-02 22:39:25 +08:00
info . LoadType = loadType
err := saveGlobalCollectionInfo ( collectionID , info , m . client )
if err != nil {
log . Error ( "save collectionInfo error" , zap . Any ( "error" , err . Error ( ) ) , zap . Int64 ( "collectionID" , collectionID ) )
return err
}
2021-10-22 19:07:15 +08:00
2021-12-21 11:57:39 +08:00
m . collectionInfos [ collectionID ] = info
2021-08-02 22:39:25 +08:00
return nil
}
2021-06-19 11:45:09 +08:00
2021-08-02 22:39:25 +08:00
return errors . New ( "setLoadType: can't find collection in collectionInfos" )
2021-06-19 11:45:09 +08:00
}
2021-08-02 22:39:25 +08:00
func ( m * MetaReplica ) setLoadPercentage ( collectionID UniqueID , partitionID UniqueID , percentage int64 , loadType querypb . LoadType ) error {
2021-12-21 11:57:39 +08:00
m . collectionMu . Lock ( )
defer m . collectionMu . Unlock ( )
if _ , ok := m . collectionInfos [ collectionID ] ; ! ok {
2021-08-02 22:39:25 +08:00
return errors . New ( "setLoadPercentage: can't find collection in collectionInfos" )
}
2021-12-21 11:57:39 +08:00
info := proto . Clone ( m . collectionInfos [ collectionID ] ) . ( * querypb . CollectionInfo )
2021-08-02 22:39:25 +08:00
if loadType == querypb . LoadType_loadCollection {
info . InMemoryPercentage = percentage
for _ , partitionState := range info . PartitionStates {
if percentage >= 100 {
partitionState . State = querypb . PartitionState_InMemory
} else {
partitionState . State = querypb . PartitionState_PartialInMemory
}
partitionState . InMemoryPercentage = percentage
}
err := saveGlobalCollectionInfo ( collectionID , info , m . client )
2021-06-19 11:45:09 +08:00
if err != nil {
log . Error ( "save collectionInfo error" , zap . Any ( "error" , err . Error ( ) ) , zap . Int64 ( "collectionID" , collectionID ) )
2021-08-02 22:39:25 +08:00
return err
2021-06-19 11:45:09 +08:00
}
2021-08-02 22:39:25 +08:00
} else {
2021-10-22 19:07:15 +08:00
findPartition := false
2021-08-02 22:39:25 +08:00
for _ , partitionState := range info . PartitionStates {
if partitionState . PartitionID == partitionID {
2021-10-22 19:07:15 +08:00
findPartition = true
2021-08-02 22:39:25 +08:00
if percentage >= 100 {
partitionState . State = querypb . PartitionState_InMemory
} else {
partitionState . State = querypb . PartitionState_PartialInMemory
}
partitionState . InMemoryPercentage = percentage
err := saveGlobalCollectionInfo ( collectionID , info , m . client )
if err != nil {
log . Error ( "save collectionInfo error" , zap . Any ( "error" , err . Error ( ) ) , zap . Int64 ( "collectionID" , collectionID ) )
return err
}
}
}
2021-10-22 19:07:15 +08:00
if ! findPartition {
return errors . New ( "setLoadPercentage: can't find partitionID in collectionInfos" )
}
2021-06-19 11:45:09 +08:00
}
2021-10-22 19:07:15 +08:00
m . collectionInfos [ collectionID ] = info
2021-08-02 22:39:25 +08:00
return nil
2021-06-19 11:45:09 +08:00
}
2021-09-29 09:56:04 +08:00
//func (m *MetaReplica) printMeta() {
// m.RLock()
// defer m.RUnlock()
// for id, info := range m.collectionInfos {
// log.Debug("query coordinator MetaReplica: collectionInfo", zap.Int64("collectionID", id), zap.Any("info", info))
// }
//
// for id, info := range m.segmentInfos {
// log.Debug("query coordinator MetaReplica: segmentInfo", zap.Int64("segmentID", id), zap.Any("info", info))
// }
//
// for id, info := range m.queryChannelInfos {
// log.Debug("query coordinator MetaReplica: queryChannelInfo", zap.Int64("collectionID", id), zap.Any("info", info))
// }
//}
2021-08-02 22:39:25 +08:00
2021-09-29 09:56:04 +08:00
func saveGlobalCollectionInfo ( collectionID UniqueID , info * querypb . CollectionInfo , kv kv . MetaKv ) error {
2021-09-29 20:26:00 +08:00
infoBytes , err := proto . Marshal ( info )
if err != nil {
return err
}
2021-08-02 22:39:25 +08:00
key := fmt . Sprintf ( "%s/%d" , collectionMetaPrefix , collectionID )
2021-09-29 20:26:00 +08:00
return kv . Save ( key , string ( infoBytes ) )
2021-08-02 22:39:25 +08:00
}
2021-12-21 11:57:39 +08:00
func saveDeltaChannelInfo ( collectionID UniqueID , infos [ ] * datapb . VchannelInfo , kv kv . MetaKv ) error {
2021-10-22 19:07:15 +08:00
kvs := make ( map [ string ] string )
2021-12-21 11:57:39 +08:00
for _ , info := range infos {
2021-10-22 19:07:15 +08:00
infoBytes , err := proto . Marshal ( info )
if err != nil {
return err
}
2021-12-21 11:57:39 +08:00
key := fmt . Sprintf ( "%s/%d/%s" , deltaChannelMetaPrefix , collectionID , info . ChannelName )
2021-10-22 19:07:15 +08:00
kvs [ key ] = string ( infoBytes )
2021-09-29 20:26:00 +08:00
}
2021-10-22 19:07:15 +08:00
return kv . MultiSave ( kvs )
2021-08-02 22:39:25 +08:00
}
2021-12-21 11:57:39 +08:00
func saveDmChannelWatchInfos ( infos [ ] * querypb . DmChannelWatchInfo , kv kv . MetaKv ) error {
2021-11-13 08:49:08 +08:00
kvs := make ( map [ string ] string )
for _ , info := range infos {
infoBytes , err := proto . Marshal ( info )
if err != nil {
return err
}
2021-12-21 11:57:39 +08:00
key := fmt . Sprintf ( "%s/%d/%s" , dmChannelMetaPrefix , info . CollectionID , info . DmChannel )
2021-11-13 08:49:08 +08:00
kvs [ key ] = string ( infoBytes )
}
return kv . MultiSave ( kvs )
}
2021-12-21 11:57:39 +08:00
func removeCollectionMeta ( collectionID UniqueID , kv kv . MetaKv ) error {
var prefixes [ ] string
collectionInfosPrefix := fmt . Sprintf ( "%s/%d" , collectionMetaPrefix , collectionID )
prefixes = append ( prefixes , collectionInfosPrefix )
dmChannelInfosPrefix := fmt . Sprintf ( "%s/%d" , dmChannelMetaPrefix , collectionID )
prefixes = append ( prefixes , dmChannelInfosPrefix )
deltaChannelInfosPrefix := fmt . Sprintf ( "%s/%d" , deltaChannelMetaPrefix , collectionID )
prefixes = append ( prefixes , deltaChannelInfosPrefix )
return kv . MultiRemoveWithPrefix ( prefixes )
}