mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
82ccd4cec0
* Rename module Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
217 lines
5.9 KiB
Go
217 lines
5.9 KiB
Go
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
package querynode
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
"github.com/milvus-io/milvus/internal/types"
|
|
)
|
|
|
|
const loadingCheckInterval = 3
|
|
|
|
type loadService struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
segLoader *segmentLoader
|
|
}
|
|
|
|
// -------------------------------------------- load index -------------------------------------------- //
|
|
func (s *loadService) start() {
|
|
wg := &sync.WaitGroup{}
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
case <-time.After(loadingCheckInterval * time.Second):
|
|
wg.Add(2)
|
|
//go s.segLoader.indexLoader.doLoadIndex(wg)
|
|
go s.loadSegmentActively(wg)
|
|
wg.Wait()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *loadService) close() {
|
|
s.cancel()
|
|
}
|
|
|
|
func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) {
|
|
collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getSegmentsToLoadBySegmentType(segmentTypeGrowing)
|
|
if len(collectionIDs) <= 0 {
|
|
wg.Done()
|
|
return
|
|
}
|
|
log.Debug("do load segment for growing segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs)))
|
|
for i := range collectionIDs {
|
|
collection, err := s.segLoader.replica.getCollectionByID(collectionIDs[i])
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
}
|
|
|
|
fieldIDs, err := s.segLoader.replica.getFieldIDsByCollectionID(collectionIDs[i])
|
|
if err != nil {
|
|
log.Error(err.Error())
|
|
continue
|
|
}
|
|
segment := newSegment(collection, segmentIDs[i], partitionIDs[i], collectionIDs[i], segmentTypeSealed)
|
|
segment.setLoadBinLogEnable(true)
|
|
err = s.loadSegmentInternal(collectionIDs[i], segment, fieldIDs)
|
|
if err == nil {
|
|
// replace segment
|
|
err = s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
|
|
}
|
|
if err != nil {
|
|
deleteSegment(segment)
|
|
log.Error(err.Error())
|
|
}
|
|
}
|
|
// sendQueryNodeStats
|
|
err := s.segLoader.indexLoader.sendQueryNodeStats()
|
|
if err != nil {
|
|
log.Error(err.Error())
|
|
wg.Done()
|
|
return
|
|
}
|
|
|
|
wg.Done()
|
|
}
|
|
|
|
// load segment passively
|
|
func (s *loadService) loadSegmentPassively(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error {
|
|
// TODO: interim solution
|
|
if len(fieldIDs) == 0 {
|
|
var err error
|
|
fieldIDs, err = s.segLoader.replica.getFieldIDsByCollectionID(collectionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
for _, segmentID := range segmentIDs {
|
|
collection, err := s.segLoader.replica.getCollectionByID(collectionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = s.segLoader.replica.getPartitionByID(partitionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
segment := newSegment(collection, segmentID, partitionID, collectionID, segmentTypeSealed)
|
|
segment.setLoadBinLogEnable(true)
|
|
err = s.loadSegmentInternal(collectionID, segment, fieldIDs)
|
|
if err == nil {
|
|
err = s.segLoader.replica.setSegment(segment)
|
|
}
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
err = s.addSegmentToLoadBuffer(segment)
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *loadService) addSegmentToLoadBuffer(segment *Segment) error {
|
|
segmentID := segment.segmentID
|
|
partitionID := segment.partitionID
|
|
collectionID := segment.collectionID
|
|
deleteSegment(segment)
|
|
err := s.segLoader.replica.addSegment(segmentID, partitionID, collectionID, segmentTypeGrowing)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.segLoader.replica.setSegmentEnableLoadBinLog(segmentID, true)
|
|
if err != nil {
|
|
s.segLoader.replica.removeSegment(segmentID)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (s *loadService) loadSegmentInternal(collectionID UniqueID, segment *Segment, fieldIDs []int64) error {
|
|
// create segment
|
|
statesResp, err := s.segLoader.GetSegmentStates(segment.segmentID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if statesResp.States[0].State != commonpb.SegmentState_Flushed {
|
|
return errors.New("segment not flush done")
|
|
}
|
|
|
|
insertBinlogPaths, srcFieldIDs, err := s.segLoader.getInsertBinlogPaths(segment.segmentID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
vectorFieldIDs, err := s.segLoader.replica.getVecFieldIDsByCollectionID(collectionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
loadIndexFieldIDs := make([]int64, 0)
|
|
for _, vecFieldID := range vectorFieldIDs {
|
|
err = s.segLoader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID)
|
|
if err != nil {
|
|
log.Warn(err.Error())
|
|
continue
|
|
}
|
|
loadIndexFieldIDs = append(loadIndexFieldIDs, vecFieldID)
|
|
}
|
|
// we don't need load to vector fields
|
|
fieldIDs = s.segLoader.filterOutVectorFields(fieldIDs, loadIndexFieldIDs)
|
|
|
|
//log.Debug("srcFieldIDs in internal:", srcFieldIDs)
|
|
//log.Debug("dstFieldIDs in internal:", fieldIDs)
|
|
targetFields, err := s.segLoader.checkTargetFields(insertBinlogPaths, srcFieldIDs, fieldIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Debug("loading insert...")
|
|
err = s.segLoader.loadSegmentFieldsData(segment, targetFields)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, id := range loadIndexFieldIDs {
|
|
log.Debug("loading index...")
|
|
err = s.segLoader.indexLoader.loadIndex(segment, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func newLoadService(ctx context.Context, masterService types.MasterService, dataService types.DataService, indexService types.IndexService, replica ReplicaInterface) *loadService {
|
|
ctx1, cancel := context.WithCancel(ctx)
|
|
|
|
segLoader := newSegmentLoader(ctx1, masterService, indexService, dataService, replica)
|
|
|
|
return &loadService{
|
|
ctx: ctx1,
|
|
cancel: cancel,
|
|
|
|
segLoader: segLoader,
|
|
}
|
|
}
|