milvus/internal/querynode/search_service.go

166 lines
5.0 KiB
Go
Raw Normal View History

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import "C"
import (
"context"
"errors"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"go.uber.org/zap"
"strconv"
"strings"
)
type searchService struct {
ctx context.Context
cancel context.CancelFunc
replica ReplicaInterface
searchMsgStream msgstream.MsgStream
searchResultMsgStream msgstream.MsgStream
queryNodeID UniqueID
searchCollections map[UniqueID]*searchCollection
emptySearchCollection *searchCollection
}
func newSearchService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *searchService {
searchStream, _ := factory.NewQueryMsgStream(ctx)
searchResultStream, _ := factory.NewQueryMsgStream(ctx)
// query node doesn't need to consumer any search or search result channel actively.
consumeChannels := Params.SearchChannelNames
consumeSubName := Params.MsgChannelSubName
searchStream.AsConsumer(consumeChannels, consumeSubName)
log.Debug("query node AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
producerChannels := Params.SearchResultChannelNames
searchResultStream.AsProducer(producerChannels)
log.Debug("query node AsProducer: " + strings.Join(producerChannels, ", "))
searchServiceCtx, searchServiceCancel := context.WithCancel(ctx)
return &searchService{
ctx: searchServiceCtx,
cancel: searchServiceCancel,
replica: replica,
searchMsgStream: searchStream,
searchResultMsgStream: searchResultStream,
queryNodeID: Params.QueryNodeID,
searchCollections: make(map[UniqueID]*searchCollection),
}
}
func (s *searchService) start() {
s.searchMsgStream.Start()
s.searchResultMsgStream.Start()
s.startEmptySearchCollection()
s.consumeSearch()
}
func (s *searchService) collectionCheck(collectionID UniqueID) error {
// check if collection exists
if ok := s.replica.hasCollection(collectionID); !ok {
err := errors.New("no collection found, collectionID = " + strconv.FormatInt(collectionID, 10))
log.Error(err.Error())
return err
}
return nil
}
func (s *searchService) consumeSearch() {
for {
select {
case <-s.ctx.Done():
return
default:
msgPack := s.searchMsgStream.Consume()
if msgPack == nil || len(msgPack.Msgs) <= 0 {
continue
}
for _, msg := range msgPack.Msgs {
log.Debug("consume search message", zap.Int64("msgID", msg.ID()))
sm, ok := msg.(*msgstream.SearchMsg)
if !ok {
continue
}
sp, ctx := trace.StartSpanFromContext(sm.TraceCtx())
sm.SetTraceCtx(ctx)
err := s.collectionCheck(sm.CollectionID)
if err != nil {
s.emptySearchCollection.emptySearch(sm)
log.Debug("cannot found collection, do empty search done",
zap.Int64("msgID", sm.ID()),
zap.Int64("collectionID", sm.CollectionID))
continue
}
_, ok = s.searchCollections[sm.CollectionID]
if !ok {
s.startSearchCollection(sm.CollectionID)
log.Debug("new search collection, start search collection service",
zap.Int64("collectionID", sm.CollectionID))
}
s.searchCollections[sm.CollectionID].msgBuffer <- sm
sp.Finish()
}
}
}
}
func (s *searchService) close() {
if s.searchMsgStream != nil {
s.searchMsgStream.Close()
}
if s.searchResultMsgStream != nil {
s.searchResultMsgStream.Close()
}
for collectionID := range s.searchCollections {
s.stopSearchCollection(collectionID)
}
s.searchCollections = make(map[UniqueID]*searchCollection)
s.cancel()
}
func (s *searchService) startSearchCollection(collectionID UniqueID) {
ctx1, cancel := context.WithCancel(s.ctx)
sc := newSearchCollection(ctx1, cancel, collectionID, s.replica, s.searchResultMsgStream)
s.searchCollections[collectionID] = sc
sc.start()
}
func (s *searchService) startEmptySearchCollection() {
ctx1, cancel := context.WithCancel(s.ctx)
sc := newSearchCollection(ctx1, cancel, UniqueID(-1), s.replica, s.searchResultMsgStream)
s.emptySearchCollection = sc
sc.start()
}
func (s *searchService) hasSearchCollection(collectionID UniqueID) bool {
_, ok := s.searchCollections[collectionID]
return ok
}
func (s *searchService) stopSearchCollection(collectionID UniqueID) {
sc, ok := s.searchCollections[collectionID]
if !ok {
log.Error("stopSearchCollection failed, collection doesn't exist", zap.Int64("collectionID", collectionID))
}
sc.cancel()
delete(s.searchCollections, collectionID)
}