mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
Check index periodically and load index, add indexing segment type
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
996111bf8c
commit
3573e01093
@ -19,6 +19,7 @@ enum SegmentType {
|
||||
Invalid = 0,
|
||||
Growing = 1,
|
||||
Sealed = 2,
|
||||
Indexing = 3,
|
||||
};
|
||||
|
||||
typedef enum SegmentType SegmentType;
|
||||
|
@ -36,6 +36,7 @@ NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type) {
|
||||
segment = milvus::segcore::CreateGrowingSegment(col->get_schema());
|
||||
break;
|
||||
case Sealed:
|
||||
case Indexing:
|
||||
segment = milvus::segcore::CreateSealedSegment(col->get_schema());
|
||||
break;
|
||||
default:
|
||||
|
@ -67,6 +67,7 @@ type collectionReplica interface {
|
||||
getSegmentByID(segmentID UniqueID) (*Segment, error)
|
||||
hasSegment(segmentID UniqueID) bool
|
||||
getVecFieldsBySegmentID(segmentID UniqueID) (map[int64]string, error)
|
||||
getSealedSegments() ([]UniqueID, []UniqueID)
|
||||
|
||||
freeAll()
|
||||
}
|
||||
@ -425,11 +426,11 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S
|
||||
SegmentID: segmentID,
|
||||
MemorySize: currentMemSize,
|
||||
NumRows: segmentNumOfRows,
|
||||
RecentlyModified: segment.GetRecentlyModified(),
|
||||
RecentlyModified: segment.getRecentlyModified(),
|
||||
}
|
||||
|
||||
statisticData = append(statisticData, &stat)
|
||||
segment.SetRecentlyModified(false)
|
||||
segment.setRecentlyModified(false)
|
||||
}
|
||||
|
||||
return statisticData
|
||||
@ -560,6 +561,22 @@ func (colReplica *collectionReplicaImpl) getVecFieldsBySegmentID(segmentID Uniqu
|
||||
return vecFields, nil
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplicaImpl) getSealedSegments() ([]UniqueID, []UniqueID) {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
collectionIDs := make([]UniqueID, 0)
|
||||
segmentIDs := make([]UniqueID, 0)
|
||||
for k, v := range colReplica.segments {
|
||||
if v.getType() == segTypeSealed {
|
||||
collectionIDs = append(collectionIDs, v.collectionID)
|
||||
segmentIDs = append(segmentIDs, k)
|
||||
}
|
||||
}
|
||||
|
||||
return collectionIDs, segmentIDs
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------------------------------
|
||||
func (colReplica *collectionReplicaImpl) freeAll() {
|
||||
colReplica.mu.Lock()
|
||||
|
@ -20,12 +20,15 @@ import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/storage"
|
||||
)
|
||||
|
||||
type loadIndexService struct {
|
||||
const indexCheckInterval = 1
|
||||
|
||||
type loadService struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
client *minioKV.MinIOKV
|
||||
|
||||
replica collectionReplica
|
||||
queryNodeID UniqueID
|
||||
replica collectionReplica
|
||||
|
||||
fieldIndexes map[string][]*internalpb2.IndexStats
|
||||
fieldStatsChan chan []*internalpb2.FieldStats
|
||||
@ -33,63 +36,10 @@ type loadIndexService struct {
|
||||
loadIndexReqChan chan []msgstream.TsMsg
|
||||
loadIndexMsgStream msgstream.MsgStream
|
||||
|
||||
queryNodeID UniqueID
|
||||
segManager *segmentManager
|
||||
}
|
||||
|
||||
func newLoadIndexService(ctx context.Context, replica collectionReplica) *loadIndexService {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
|
||||
option := &minioKV.Option{
|
||||
Address: Params.MinioEndPoint,
|
||||
AccessKeyID: Params.MinioAccessKeyID,
|
||||
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
||||
UseSSL: Params.MinioUseSSLStr,
|
||||
CreateBucket: true,
|
||||
BucketName: Params.MinioBucketName,
|
||||
}
|
||||
|
||||
// TODO: load bucketName from config
|
||||
MinioKV, err := minioKV.NewMinIOKV(ctx1, option)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// init msgStream
|
||||
receiveBufSize := Params.LoadIndexReceiveBufSize
|
||||
pulsarBufSize := Params.LoadIndexPulsarBufSize
|
||||
|
||||
msgStreamURL := Params.PulsarAddress
|
||||
|
||||
consumeChannels := Params.LoadIndexChannelNames
|
||||
consumeSubName := Params.MsgChannelSubName
|
||||
|
||||
loadIndexStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
loadIndexStream.SetPulsarClient(msgStreamURL)
|
||||
unmarshalDispatcher := util.NewUnmarshalDispatcher()
|
||||
loadIndexStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
|
||||
|
||||
var stream msgstream.MsgStream = loadIndexStream
|
||||
|
||||
// init index load requests channel size by message receive buffer size
|
||||
indexLoadChanSize := receiveBufSize
|
||||
|
||||
return &loadIndexService{
|
||||
ctx: ctx1,
|
||||
cancel: cancel,
|
||||
client: MinioKV,
|
||||
|
||||
replica: replica,
|
||||
fieldIndexes: make(map[string][]*internalpb2.IndexStats),
|
||||
fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
|
||||
|
||||
loadIndexReqChan: make(chan []msgstream.TsMsg, indexLoadChanSize),
|
||||
loadIndexMsgStream: stream,
|
||||
|
||||
queryNodeID: Params.QueryNodeID,
|
||||
}
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) consume() {
|
||||
func (lis *loadService) consume() {
|
||||
for {
|
||||
select {
|
||||
case <-lis.ctx.Done():
|
||||
@ -105,9 +55,37 @@ func (lis *loadIndexService) consume() {
|
||||
}
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) start() {
|
||||
func (lis *loadService) indexListener() {
|
||||
for {
|
||||
select {
|
||||
case <-lis.ctx.Done():
|
||||
return
|
||||
case <-time.After(indexCheckInterval * time.Second):
|
||||
collectionIDs, segmentIDs := lis.replica.getSealedSegments()
|
||||
for i := range collectionIDs {
|
||||
// we don't need index id yet
|
||||
_, buildID, err := lis.segManager.getIndexInfo(collectionIDs[i], segmentIDs[i])
|
||||
if err != nil {
|
||||
indexPaths, err := lis.segManager.getIndexPaths(buildID)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
err = lis.segManager.loadIndex(segmentIDs[i], indexPaths)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lis *loadService) start() {
|
||||
lis.loadIndexMsgStream.Start()
|
||||
go lis.consume()
|
||||
go lis.indexListener()
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -132,7 +110,7 @@ func (lis *loadIndexService) start() {
|
||||
}
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) execute(msg msgstream.TsMsg) error {
|
||||
func (lis *loadService) execute(msg msgstream.TsMsg) error {
|
||||
indexMsg, ok := msg.(*msgstream.LoadIndexMsg)
|
||||
if !ok {
|
||||
return errors.New("type assertion failed for LoadIndexMsg")
|
||||
@ -174,21 +152,21 @@ func (lis *loadIndexService) execute(msg msgstream.TsMsg) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) close() {
|
||||
func (lis *loadService) close() {
|
||||
if lis.loadIndexMsgStream != nil {
|
||||
lis.loadIndexMsgStream.Close()
|
||||
}
|
||||
lis.cancel()
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) printIndexParams(index []*commonpb.KeyValuePair) {
|
||||
func (lis *loadService) printIndexParams(index []*commonpb.KeyValuePair) {
|
||||
fmt.Println("=================================================")
|
||||
for i := 0; i < len(index); i++ {
|
||||
fmt.Println(index[i])
|
||||
}
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
|
||||
func (lis *loadService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
|
||||
if len(index1) != len(index2) {
|
||||
return false
|
||||
}
|
||||
@ -204,11 +182,11 @@ func (lis *loadIndexService) indexParamsEqual(index1 []*commonpb.KeyValuePair, i
|
||||
return true
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
|
||||
func (lis *loadService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
|
||||
return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10)
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
|
||||
func (lis *loadService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
|
||||
ids := strings.Split(key, "/")
|
||||
if len(ids) != 2 {
|
||||
return 0, 0, errors.New("illegal fieldsStatsKey")
|
||||
@ -224,7 +202,7 @@ func (lis *loadIndexService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID,
|
||||
return collectionID, fieldID, nil
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) updateSegmentIndexStats(indexParams indexParam, indexMsg *msgstream.LoadIndexMsg) error {
|
||||
func (lis *loadService) updateSegmentIndexStats(indexParams indexParam, indexMsg *msgstream.LoadIndexMsg) error {
|
||||
targetSegment, err := lis.replica.getSegmentByID(indexMsg.SegmentID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -265,12 +243,10 @@ func (lis *loadIndexService) updateSegmentIndexStats(indexParams indexParam, ind
|
||||
})
|
||||
}
|
||||
}
|
||||
targetSegment.setIndexParam(indexMsg.FieldID, indexMsg.IndexParams)
|
||||
|
||||
return nil
|
||||
return targetSegment.setIndexParam(indexMsg.FieldID, indexMsg.IndexParams)
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) loadIndex(indexPath []string) ([][]byte, indexParam, error) {
|
||||
func (lis *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, error) {
|
||||
index := make([][]byte, 0)
|
||||
|
||||
var indexParams indexParam
|
||||
@ -303,7 +279,7 @@ func (lis *loadIndexService) loadIndex(indexPath []string) ([][]byte, indexParam
|
||||
return index, indexParams, nil
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, loadIndexMsg *msgstream.LoadIndexMsg) error {
|
||||
func (lis *loadService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, loadIndexMsg *msgstream.LoadIndexMsg) error {
|
||||
segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -328,15 +304,10 @@ func (lis *loadIndexService) updateSegmentIndex(indexParams indexParam, bytesInd
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = segment.updateSegmentIndex(loadIndexInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return segment.updateSegmentIndex(loadIndexInfo)
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) sendQueryNodeStats() error {
|
||||
func (lis *loadService) sendQueryNodeStats() error {
|
||||
resultFieldsStats := make([]*internalpb2.FieldStats, 0)
|
||||
for fieldStatsKey, indexStats := range lis.fieldIndexes {
|
||||
colID, fieldID, err := lis.fieldsStatsKey2IDs(fieldStatsKey)
|
||||
@ -356,7 +327,7 @@ func (lis *loadIndexService) sendQueryNodeStats() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) checkIndexReady(indexParams indexParam, loadIndexMsg *msgstream.LoadIndexMsg) (bool, error) {
|
||||
func (lis *loadService) checkIndexReady(indexParams indexParam, loadIndexMsg *msgstream.LoadIndexMsg) (bool, error) {
|
||||
segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -367,3 +338,60 @@ func (lis *loadIndexService) checkIndexReady(indexParams indexParam, loadIndexMs
|
||||
return true, nil
|
||||
|
||||
}
|
||||
|
||||
func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
|
||||
option := &minioKV.Option{
|
||||
Address: Params.MinioEndPoint,
|
||||
AccessKeyID: Params.MinioAccessKeyID,
|
||||
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
||||
UseSSL: Params.MinioUseSSLStr,
|
||||
CreateBucket: true,
|
||||
BucketName: Params.MinioBucketName,
|
||||
}
|
||||
|
||||
MinioKV, err := minioKV.NewMinIOKV(ctx1, option)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// init msgStream
|
||||
receiveBufSize := Params.LoadIndexReceiveBufSize
|
||||
pulsarBufSize := Params.LoadIndexPulsarBufSize
|
||||
|
||||
msgStreamURL := Params.PulsarAddress
|
||||
|
||||
consumeChannels := Params.LoadIndexChannelNames
|
||||
consumeSubName := Params.MsgChannelSubName
|
||||
|
||||
loadIndexStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
loadIndexStream.SetPulsarClient(msgStreamURL)
|
||||
unmarshalDispatcher := util.NewUnmarshalDispatcher()
|
||||
loadIndexStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
|
||||
|
||||
var stream msgstream.MsgStream = loadIndexStream
|
||||
|
||||
// init index load requests channel size by message receive buffer size
|
||||
indexLoadChanSize := receiveBufSize
|
||||
|
||||
// init segment manager
|
||||
loadIndexReqChan := make(chan []msgstream.TsMsg, indexLoadChanSize)
|
||||
manager := newSegmentManager(ctx1, masterClient, dataClient, indexClient, replica, dmStream, loadIndexReqChan)
|
||||
|
||||
return &loadService{
|
||||
ctx: ctx1,
|
||||
cancel: cancel,
|
||||
client: MinioKV,
|
||||
|
||||
replica: replica,
|
||||
queryNodeID: Params.QueryNodeID,
|
||||
fieldIndexes: make(map[string][]*internalpb2.IndexStats),
|
||||
fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
|
||||
|
||||
loadIndexReqChan: loadIndexReqChan,
|
||||
loadIndexMsgStream: stream,
|
||||
|
||||
segManager: manager,
|
||||
}
|
||||
}
|
@ -25,13 +25,13 @@ import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/storage"
|
||||
)
|
||||
|
||||
func TestLoadIndexService_FloatVector(t *testing.T) {
|
||||
func TestLoadService_LoadIndex_FloatVector(t *testing.T) {
|
||||
node := newQueryNodeMock()
|
||||
collectionID := rand.Int63n(1000000)
|
||||
segmentID := rand.Int63n(1000000)
|
||||
initTestMeta(t, node, "collection0", collectionID, segmentID)
|
||||
|
||||
// loadIndexService and statsService
|
||||
// loadService and statsService
|
||||
suffix := "-test-search" + strconv.FormatInt(rand.Int63n(1000000), 10)
|
||||
oldSearchChannelNames := Params.SearchChannelNames
|
||||
newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix)
|
||||
@ -321,7 +321,7 @@ func TestLoadIndexService_FloatVector(t *testing.T) {
|
||||
params := indexStats0.IndexParams
|
||||
// sort index params by key
|
||||
sort.Slice(indexParamsKV, func(i, j int) bool { return indexParamsKV[i].Key < indexParamsKV[j].Key })
|
||||
indexEqual := node.loadIndexService.indexParamsEqual(params, indexParamsKV)
|
||||
indexEqual := node.loadService.indexParamsEqual(params, indexParamsKV)
|
||||
assert.Equal(t, indexEqual, true)
|
||||
}
|
||||
|
||||
@ -354,13 +354,13 @@ func TestLoadIndexService_FloatVector(t *testing.T) {
|
||||
node.Stop()
|
||||
}
|
||||
|
||||
func TestLoadIndexService_BinaryVector(t *testing.T) {
|
||||
func TestLoadService_LoadIndex_BinaryVector(t *testing.T) {
|
||||
node := newQueryNodeMock()
|
||||
collectionID := rand.Int63n(1000000)
|
||||
segmentID := rand.Int63n(1000000)
|
||||
initTestMeta(t, node, "collection0", collectionID, segmentID, true)
|
||||
|
||||
// loadIndexService and statsService
|
||||
// loadService and statsService
|
||||
suffix := "-test-search-binary" + strconv.FormatInt(rand.Int63n(1000000), 10)
|
||||
oldSearchChannelNames := Params.SearchChannelNames
|
||||
newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix)
|
||||
@ -640,7 +640,7 @@ func TestLoadIndexService_BinaryVector(t *testing.T) {
|
||||
params := indexStats0.IndexParams
|
||||
// sort index params by key
|
||||
sort.Slice(indexParamsKV, func(i, j int) bool { return indexParamsKV[i].Key < indexParamsKV[j].Key })
|
||||
indexEqual := node.loadIndexService.indexParamsEqual(params, indexParamsKV)
|
||||
indexEqual := node.loadService.indexParamsEqual(params, indexParamsKV)
|
||||
assert.Equal(t, indexEqual, true)
|
||||
}
|
||||
|
@ -56,13 +56,11 @@ type QueryNode struct {
|
||||
replica collectionReplica
|
||||
|
||||
// internal services
|
||||
dataSyncService *dataSyncService
|
||||
metaService *metaService
|
||||
searchService *searchService
|
||||
loadIndexService *loadIndexService
|
||||
statsService *statsService
|
||||
|
||||
segManager *segmentManager
|
||||
dataSyncService *dataSyncService
|
||||
metaService *metaService
|
||||
searchService *searchService
|
||||
loadService *loadService
|
||||
statsService *statsService
|
||||
|
||||
//opentracing
|
||||
tracer opentracing.Tracer
|
||||
@ -86,7 +84,6 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
|
||||
metaService: nil,
|
||||
searchService: nil,
|
||||
statsService: nil,
|
||||
segManager: nil,
|
||||
}
|
||||
|
||||
var err error
|
||||
@ -128,7 +125,6 @@ func NewQueryNodeWithoutID(ctx context.Context) *QueryNode {
|
||||
metaService: nil,
|
||||
searchService: nil,
|
||||
statsService: nil,
|
||||
segManager: nil,
|
||||
}
|
||||
|
||||
var err error
|
||||
@ -167,10 +163,6 @@ func Init() {
|
||||
|
||||
func (node *QueryNode) Init() error {
|
||||
Params.Init()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *QueryNode) Start() error {
|
||||
registerReq := &queryPb.RegisterNodeRequest{
|
||||
Address: &commonpb.Address{
|
||||
Ip: Params.QueryNodeIP,
|
||||
@ -189,6 +181,10 @@ func (node *QueryNode) Start() error {
|
||||
Params.QueryNodeID = response.InitParams.NodeID
|
||||
fmt.Println("QueryNodeID is", Params.QueryNodeID)
|
||||
|
||||
if node.masterClient == nil {
|
||||
log.Println("WARN: null master service detected")
|
||||
}
|
||||
|
||||
if node.indexClient == nil {
|
||||
log.Println("WARN: null index service detected")
|
||||
}
|
||||
@ -197,20 +193,22 @@ func (node *QueryNode) Start() error {
|
||||
log.Println("WARN: null data service detected")
|
||||
}
|
||||
|
||||
// todo add connectMaster logic
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *QueryNode) Start() error {
|
||||
// init services and manager
|
||||
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
|
||||
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
|
||||
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
|
||||
node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
|
||||
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
|
||||
node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream, node.loadIndexService.loadIndexReqChan)
|
||||
node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream)
|
||||
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.fieldStatsChan)
|
||||
|
||||
// start services
|
||||
go node.dataSyncService.start()
|
||||
go node.searchService.start()
|
||||
go node.metaService.start()
|
||||
go node.loadIndexService.start()
|
||||
go node.loadService.start()
|
||||
go node.statsService.start()
|
||||
|
||||
node.stateCode.Store(internalpb2.StateCode_HEALTHY)
|
||||
@ -232,8 +230,8 @@ func (node *QueryNode) Stop() error {
|
||||
if node.searchService != nil {
|
||||
node.searchService.close()
|
||||
}
|
||||
if node.loadIndexService != nil {
|
||||
node.loadIndexService.close()
|
||||
if node.loadService != nil {
|
||||
node.loadService.close()
|
||||
}
|
||||
if node.statsService != nil {
|
||||
node.statsService.close()
|
||||
@ -457,7 +455,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
|
||||
if in.LastSegmentState.State == datapb.SegmentState_SegmentGrowing {
|
||||
segmentNum := len(segmentIDs)
|
||||
positions := in.LastSegmentState.StartPositions
|
||||
err = node.segManager.seekSegment(positions)
|
||||
err = node.loadService.segManager.seekSegment(positions)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
@ -468,7 +466,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
|
||||
segmentIDs = segmentIDs[:segmentNum-1]
|
||||
}
|
||||
|
||||
err = node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
|
||||
err = node.loadService.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
@ -493,7 +491,7 @@ func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*comm
|
||||
|
||||
// release all fields in the segments
|
||||
for _, id := range in.SegmentIDs {
|
||||
err := node.segManager.releaseSegment(id)
|
||||
err := node.loadService.segManager.releaseSegment(id)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
|
@ -23,17 +23,18 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
segTypeInvalid = C.Invalid
|
||||
segTypeGrowing = C.Growing
|
||||
segTypeSealed = C.Sealed
|
||||
segTypeInvalid = C.Invalid
|
||||
segTypeGrowing = C.Growing
|
||||
segTypeSealed = C.Sealed
|
||||
segTypeIndexing = C.Indexing
|
||||
)
|
||||
|
||||
type segmentType = C.SegmentType
|
||||
type indexParam = map[string]string
|
||||
|
||||
type Segment struct {
|
||||
segmentPtr C.CSegmentInterface
|
||||
segmentType C.SegmentType
|
||||
segmentPtr C.CSegmentInterface
|
||||
|
||||
segmentID UniqueID
|
||||
partitionTag string // TODO: use partitionID
|
||||
partitionID UniqueID
|
||||
@ -44,6 +45,9 @@ type Segment struct {
|
||||
rmMutex sync.Mutex // guards recentlyModified
|
||||
recentlyModified bool
|
||||
|
||||
typeMu sync.Mutex // guards builtIndex
|
||||
segmentType C.SegmentType
|
||||
|
||||
paramMutex sync.RWMutex // guards indexParam
|
||||
indexParam map[int64]indexParam
|
||||
}
|
||||
@ -53,22 +57,30 @@ func (s *Segment) ID() UniqueID {
|
||||
return s.segmentID
|
||||
}
|
||||
|
||||
func (s *Segment) Type() segmentType {
|
||||
return s.segmentType
|
||||
}
|
||||
|
||||
func (s *Segment) SetRecentlyModified(modify bool) {
|
||||
func (s *Segment) setRecentlyModified(modify bool) {
|
||||
s.rmMutex.Lock()
|
||||
defer s.rmMutex.Unlock()
|
||||
s.recentlyModified = modify
|
||||
}
|
||||
|
||||
func (s *Segment) GetRecentlyModified() bool {
|
||||
func (s *Segment) getRecentlyModified() bool {
|
||||
s.rmMutex.Lock()
|
||||
defer s.rmMutex.Unlock()
|
||||
return s.recentlyModified
|
||||
}
|
||||
|
||||
func (s *Segment) setType(segType segmentType) {
|
||||
s.typeMu.Lock()
|
||||
defer s.typeMu.Unlock()
|
||||
s.segmentType = segType
|
||||
}
|
||||
|
||||
func (s *Segment) getType() segmentType {
|
||||
s.typeMu.Lock()
|
||||
defer s.typeMu.Unlock()
|
||||
return s.segmentType
|
||||
}
|
||||
|
||||
func newSegment2(collection *Collection, segmentID int64, partitionTag string, collectionID UniqueID, segType segmentType) *Segment {
|
||||
/*
|
||||
CSegmentInterface
|
||||
@ -195,7 +207,7 @@ func (s *Segment) fillTargetEntry(plan *Plan,
|
||||
return nil
|
||||
}
|
||||
|
||||
// segment, err := loadIndexService.replica.getSegmentByID(segmentID)
|
||||
// segment, err := loadService.replica.getSegmentByID(segmentID)
|
||||
func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error {
|
||||
var status C.CStatus
|
||||
|
||||
@ -215,6 +227,8 @@ func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error {
|
||||
return errors.New("updateSegmentIndex failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
|
||||
}
|
||||
|
||||
s.setType(segTypeIndexing)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -324,7 +338,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
|
||||
return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
|
||||
}
|
||||
|
||||
s.SetRecentlyModified(true)
|
||||
s.setRecentlyModified(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,6 @@ func (s *segmentManager) seekSegment(positions []*internalPb.MsgPosition) error
|
||||
return nil
|
||||
}
|
||||
|
||||
//TODO, index params
|
||||
func (s *segmentManager) getIndexInfo(collectionID UniqueID, segmentID UniqueID) (UniqueID, UniqueID, error) {
|
||||
req := &milvuspb.DescribeSegmentRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
@ -73,7 +72,7 @@ func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID
|
||||
// we don't need index id yet
|
||||
_, buildID, err := s.getIndexInfo(collectionID, segmentID)
|
||||
if err == nil {
|
||||
// we don't need load vector fields
|
||||
// we don't need load to vector fields
|
||||
vectorFields, err := s.replica.getVecFieldsBySegmentID(segmentID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -229,11 +228,11 @@ func (s *segmentManager) loadSegmentFieldsData(segmentID UniqueID, targetFields
|
||||
numRows = fieldData.NumRows
|
||||
data = fieldData.Data
|
||||
case *storage.FloatVectorFieldData:
|
||||
// segment to be loaded doesn't need vector field,
|
||||
// so we ignore the type of vector field data
|
||||
continue
|
||||
numRows = fieldData.NumRows
|
||||
data = fieldData.Data
|
||||
case *storage.BinaryVectorFieldData:
|
||||
continue
|
||||
numRows = fieldData.NumRows
|
||||
data = fieldData.Data
|
||||
default:
|
||||
return errors.New("unexpected field data type")
|
||||
}
|
||||
@ -282,7 +281,7 @@ func (s *segmentManager) loadIndex(segmentID UniqueID, indexPaths []string) erro
|
||||
return err
|
||||
}
|
||||
for id, name := range vecFieldIDs {
|
||||
// non-blocking send
|
||||
// non-blocking sending
|
||||
go s.sendLoadIndex(indexPaths, segmentID, id, name)
|
||||
}
|
||||
|
||||
|
@ -406,9 +406,8 @@ func TestSegmentManager_load_release_and_search(t *testing.T) {
|
||||
defer node.Stop()
|
||||
|
||||
ctx := node.queryNodeLoopCtx
|
||||
node.loadIndexService = newLoadIndexService(ctx, node.replica)
|
||||
node.segManager = newSegmentManager(ctx, nil, nil, nil, node.replica, nil, node.loadIndexService.loadIndexReqChan)
|
||||
go node.loadIndexService.start()
|
||||
node.loadService = newLoadService(ctx, nil, nil, nil, node.replica, nil)
|
||||
go node.loadService.start()
|
||||
|
||||
collectionName := "collection0"
|
||||
initTestMeta(t, node, collectionName, collectionID, 0)
|
||||
@ -422,16 +421,16 @@ func TestSegmentManager_load_release_and_search(t *testing.T) {
|
||||
paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix)
|
||||
assert.NoError(t, err)
|
||||
|
||||
fieldsMap := node.segManager.getTargetFields(paths, srcFieldIDs, fieldIDs)
|
||||
fieldsMap := node.loadService.segManager.getTargetFields(paths, srcFieldIDs, fieldIDs)
|
||||
assert.Equal(t, len(fieldsMap), 2)
|
||||
|
||||
err = node.segManager.loadSegmentFieldsData(segmentID, fieldsMap)
|
||||
err = node.loadService.segManager.loadSegmentFieldsData(segmentID, fieldsMap)
|
||||
assert.NoError(t, err)
|
||||
|
||||
indexPaths, err := generateIndex(segmentID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = node.segManager.loadIndex(segmentID, indexPaths)
|
||||
err = node.loadService.segManager.loadIndex(segmentID, indexPaths)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// do search
|
||||
|
Loading…
Reference in New Issue
Block a user