2021-01-22 14:28:06 +08:00
|
|
|
package queryservice
|
2021-01-15 15:28:54 +08:00
|
|
|
|
2021-01-16 15:31:10 +08:00
|
|
|
import (
|
2021-01-22 14:28:06 +08:00
|
|
|
"context"
|
2021-01-27 09:50:52 +08:00
|
|
|
"fmt"
|
2021-01-26 15:13:20 +08:00
|
|
|
"sort"
|
2021-01-22 14:28:06 +08:00
|
|
|
"strconv"
|
2021-02-18 16:26:02 +08:00
|
|
|
"sync"
|
2021-01-22 14:28:06 +08:00
|
|
|
"sync/atomic"
|
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client"
|
2021-01-22 14:28:06 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/errors"
|
2021-02-08 14:30:54 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
2021-01-22 14:28:06 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
2021-01-26 15:13:20 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
|
2021-01-16 15:31:10 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
2021-01-26 15:13:20 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
2021-01-16 15:31:10 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
|
|
|
|
)
|
2021-01-15 15:28:54 +08:00
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
type MasterServiceInterface interface {
|
|
|
|
ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
|
|
|
|
ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
type DataServiceInterface interface {
|
|
|
|
GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
|
2021-02-04 19:34:35 +08:00
|
|
|
GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
type QueryNodeInterface interface {
|
|
|
|
GetComponentStates() (*internalpb2.ComponentStates, error)
|
|
|
|
|
|
|
|
AddQueryChannel(in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error)
|
|
|
|
RemoveQueryChannel(in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
|
|
|
|
WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error)
|
|
|
|
LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error)
|
|
|
|
ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error)
|
2021-02-04 11:40:14 +08:00
|
|
|
GetSegmentInfo(req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error)
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
2021-01-22 14:28:06 +08:00
|
|
|
|
2021-02-18 16:26:02 +08:00
|
|
|
type queryChannelInfo struct {
|
|
|
|
requestChannel string
|
|
|
|
responseChannel string
|
|
|
|
}
|
|
|
|
|
2021-01-15 15:28:54 +08:00
|
|
|
type QueryService struct {
|
2021-01-22 14:28:06 +08:00
|
|
|
loopCtx context.Context
|
|
|
|
loopCancel context.CancelFunc
|
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
queryServiceID uint64
|
2021-01-22 14:28:06 +08:00
|
|
|
replica metaReplica
|
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
dataServiceClient DataServiceInterface
|
|
|
|
masterServiceClient MasterServiceInterface
|
2021-02-18 16:26:02 +08:00
|
|
|
queryNodes map[int64]*queryNodeInfo
|
|
|
|
queryChannels []*queryChannelInfo
|
|
|
|
qcMutex *sync.Mutex
|
2021-01-22 14:28:06 +08:00
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
stateCode atomic.Value
|
|
|
|
isInit atomic.Value
|
|
|
|
enableGrpc bool
|
2021-02-08 14:30:54 +08:00
|
|
|
|
|
|
|
msFactory msgstream.Factory
|
2021-01-22 14:28:06 +08:00
|
|
|
}
|
|
|
|
|
2021-01-21 10:01:29 +08:00
|
|
|
func (qs *QueryService) Init() error {
|
2021-01-26 15:13:20 +08:00
|
|
|
Params.Init()
|
2021-01-22 14:28:06 +08:00
|
|
|
qs.isInit.Store(true)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-01-21 10:01:29 +08:00
|
|
|
func (qs *QueryService) Start() error {
|
2021-01-22 14:28:06 +08:00
|
|
|
isInit := qs.isInit.Load().(bool)
|
2021-02-05 17:57:41 +08:00
|
|
|
|
|
|
|
switch {
|
|
|
|
case !isInit:
|
2021-01-22 14:28:06 +08:00
|
|
|
return errors.New("call start before init")
|
2021-02-05 17:57:41 +08:00
|
|
|
case qs.dataServiceClient == nil:
|
|
|
|
return errors.New("dataService Client not set")
|
|
|
|
case qs.masterServiceClient == nil:
|
|
|
|
return errors.New("masterService Client not set")
|
2021-01-22 14:28:06 +08:00
|
|
|
}
|
2021-02-05 17:57:41 +08:00
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
qs.stateCode.Store(internalpb2.StateCode_HEALTHY)
|
|
|
|
return nil
|
2021-01-16 15:31:10 +08:00
|
|
|
}
|
|
|
|
|
2021-01-21 10:01:29 +08:00
|
|
|
func (qs *QueryService) Stop() error {
|
2021-01-22 14:28:06 +08:00
|
|
|
qs.loopCancel()
|
|
|
|
qs.stateCode.Store(internalpb2.StateCode_ABNORMAL)
|
|
|
|
return nil
|
2021-01-16 15:31:10 +08:00
|
|
|
}
|
|
|
|
|
2021-01-20 11:02:29 +08:00
|
|
|
func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) {
|
2021-01-22 14:28:06 +08:00
|
|
|
serviceComponentInfo := &internalpb2.ComponentInfo{
|
|
|
|
NodeID: Params.QueryServiceID,
|
|
|
|
StateCode: qs.stateCode.Load().(internalpb2.StateCode),
|
|
|
|
}
|
|
|
|
subComponentInfos := make([]*internalpb2.ComponentInfo, 0)
|
2021-01-26 15:13:20 +08:00
|
|
|
for nodeID, node := range qs.queryNodes {
|
|
|
|
componentStates, err := node.GetComponentStates()
|
2021-01-22 14:28:06 +08:00
|
|
|
if err != nil {
|
|
|
|
subComponentInfos = append(subComponentInfos, &internalpb2.ComponentInfo{
|
2021-02-04 17:47:19 +08:00
|
|
|
NodeID: nodeID,
|
2021-01-22 14:28:06 +08:00
|
|
|
StateCode: internalpb2.StateCode_ABNORMAL,
|
|
|
|
})
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
subComponentInfos = append(subComponentInfos, componentStates.State)
|
|
|
|
}
|
|
|
|
return &internalpb2.ComponentStates{
|
2021-01-26 15:13:20 +08:00
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
},
|
2021-01-22 14:28:06 +08:00
|
|
|
State: serviceComponentInfo,
|
|
|
|
SubcomponentStates: subComponentInfos,
|
|
|
|
}, nil
|
2021-01-16 15:31:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (qs *QueryService) GetTimeTickChannel() (string, error) {
|
2021-01-22 14:28:06 +08:00
|
|
|
return Params.TimeTickChannelName, nil
|
2021-01-16 15:31:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (qs *QueryService) GetStatisticsChannel() (string, error) {
|
2021-01-22 14:28:06 +08:00
|
|
|
return Params.StatsChannelName, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
|
2021-01-27 09:50:52 +08:00
|
|
|
fmt.Println("register query node =", req.Address)
|
2021-01-27 13:52:01 +08:00
|
|
|
// TODO:: add mutex
|
2021-02-18 16:26:02 +08:00
|
|
|
nodeID := req.Base.SourceID
|
|
|
|
if _, ok := qs.queryNodes[nodeID]; ok {
|
|
|
|
err := errors.New("nodeID already exists")
|
|
|
|
return &querypb.RegisterNodeResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
}, err
|
|
|
|
}
|
2021-01-22 14:28:06 +08:00
|
|
|
|
|
|
|
registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
|
2021-02-18 16:26:02 +08:00
|
|
|
client := nodeclient.NewClient(registerNodeAddress)
|
|
|
|
if err := client.Init(); err != nil {
|
|
|
|
return &querypb.RegisterNodeResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
},
|
|
|
|
InitParams: new(internalpb2.InitParams),
|
|
|
|
}, err
|
2021-01-22 14:28:06 +08:00
|
|
|
}
|
2021-02-18 16:26:02 +08:00
|
|
|
if err := client.Start(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
qs.queryNodes[nodeID] = newQueryNodeInfo(client)
|
2021-01-22 14:28:06 +08:00
|
|
|
|
2021-02-02 11:52:41 +08:00
|
|
|
//TODO::return init params to queryNode
|
2021-02-18 16:26:02 +08:00
|
|
|
startParams := []*commonpb.KeyValuePair{
|
|
|
|
{Key: "StatsChannelName", Value: Params.StatsChannelName},
|
|
|
|
{Key: "TimeTickChannelName", Value: Params.TimeTickChannelName},
|
|
|
|
}
|
|
|
|
qs.qcMutex.Lock()
|
|
|
|
for _, queryChannel := range qs.queryChannels {
|
|
|
|
startParams = append(startParams, &commonpb.KeyValuePair{
|
|
|
|
Key: "QueryChannelName",
|
|
|
|
Value: queryChannel.requestChannel,
|
|
|
|
})
|
|
|
|
startParams = append(startParams, &commonpb.KeyValuePair{
|
|
|
|
Key: "QueryResultChannelName",
|
|
|
|
Value: queryChannel.responseChannel,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
qs.qcMutex.Unlock()
|
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
return &querypb.RegisterNodeResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
},
|
|
|
|
InitParams: &internalpb2.InitParams{
|
2021-02-18 16:26:02 +08:00
|
|
|
NodeID: nodeID,
|
|
|
|
StartParams: startParams,
|
2021-01-22 14:28:06 +08:00
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) {
|
|
|
|
dbID := req.DbID
|
2021-02-09 17:09:26 +08:00
|
|
|
fmt.Println("show collection start, dbID = ", dbID)
|
2021-02-02 11:52:41 +08:00
|
|
|
collections, err := qs.replica.getCollections(dbID)
|
|
|
|
collectionIDs := make([]UniqueID, 0)
|
|
|
|
for _, collection := range collections {
|
|
|
|
collectionIDs = append(collectionIDs, collection.id)
|
|
|
|
}
|
2021-01-22 14:28:06 +08:00
|
|
|
if err != nil {
|
2021-02-02 11:52:41 +08:00
|
|
|
return &querypb.ShowCollectionResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
}, err
|
2021-01-22 14:28:06 +08:00
|
|
|
}
|
2021-02-09 17:09:26 +08:00
|
|
|
fmt.Println("show collection end")
|
2021-01-22 14:28:06 +08:00
|
|
|
return &querypb.ShowCollectionResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
},
|
|
|
|
CollectionIDs: collectionIDs,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
|
2021-01-26 15:13:20 +08:00
|
|
|
dbID := req.DbID
|
|
|
|
collectionID := req.CollectionID
|
2021-02-07 09:30:48 +08:00
|
|
|
schema := req.Schema
|
2021-01-26 15:13:20 +08:00
|
|
|
fn := func(err error) *commonpb.Status {
|
2021-02-02 11:52:41 +08:00
|
|
|
if err != nil {
|
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
}
|
|
|
|
}
|
2021-01-26 15:13:20 +08:00
|
|
|
return &commonpb.Status{
|
2021-02-02 11:52:41 +08:00
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
}
|
2021-02-04 17:47:19 +08:00
|
|
|
|
2021-02-07 09:30:48 +08:00
|
|
|
fmt.Println("load collection start, collectionID = ", collectionID)
|
|
|
|
_, err := qs.replica.getCollectionByID(dbID, collectionID)
|
|
|
|
if err == nil {
|
|
|
|
fmt.Println("load collection end, collection already exist, collectionID = ", collectionID)
|
2021-02-02 11:52:41 +08:00
|
|
|
return fn(nil), nil
|
|
|
|
}
|
2021-01-26 15:13:20 +08:00
|
|
|
|
2021-02-07 09:30:48 +08:00
|
|
|
err = qs.replica.addCollection(dbID, collectionID, schema)
|
|
|
|
if err != nil {
|
|
|
|
return fn(err), err
|
2021-02-04 19:34:35 +08:00
|
|
|
}
|
2021-02-07 09:30:48 +08:00
|
|
|
|
|
|
|
err = qs.watchDmChannels(dbID, collectionID)
|
2021-02-04 17:47:19 +08:00
|
|
|
if err != nil {
|
|
|
|
return fn(err), err
|
|
|
|
}
|
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
// get partitionIDs
|
|
|
|
showPartitionRequest := &milvuspb.ShowPartitionRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kShowPartitions,
|
|
|
|
},
|
|
|
|
CollectionID: collectionID,
|
|
|
|
}
|
|
|
|
|
|
|
|
showPartitionResponse, err := qs.masterServiceClient.ShowPartitions(showPartitionRequest)
|
|
|
|
if err != nil {
|
|
|
|
return fn(err), err
|
|
|
|
}
|
|
|
|
if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
2021-02-02 11:52:41 +08:00
|
|
|
return showPartitionResponse.Status, err
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
partitionIDs := showPartitionResponse.PartitionIDs
|
|
|
|
|
|
|
|
loadPartitionsRequest := &querypb.LoadPartitionRequest{
|
|
|
|
Base: req.Base,
|
|
|
|
DbID: dbID,
|
|
|
|
CollectionID: collectionID,
|
|
|
|
PartitionIDs: partitionIDs,
|
2021-02-07 09:30:48 +08:00
|
|
|
Schema: schema,
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
status, err := qs.LoadPartitions(loadPartitionsRequest)
|
|
|
|
|
2021-02-07 09:30:48 +08:00
|
|
|
fmt.Println("load collection end, collectionID = ", collectionID)
|
2021-01-26 15:13:20 +08:00
|
|
|
return status, err
|
2021-01-16 15:31:10 +08:00
|
|
|
}
|
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
2021-01-26 15:13:20 +08:00
|
|
|
dbID := req.DbID
|
|
|
|
collectionID := req.CollectionID
|
2021-02-07 15:56:57 +08:00
|
|
|
fmt.Println("release collection start, collectionID = ", collectionID)
|
2021-02-02 11:52:41 +08:00
|
|
|
partitions, err := qs.replica.getPartitions(dbID, collectionID)
|
2021-01-26 15:13:20 +08:00
|
|
|
if err != nil {
|
2021-02-02 11:52:41 +08:00
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
}, err
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
2021-02-02 11:52:41 +08:00
|
|
|
|
|
|
|
partitionIDs := make([]UniqueID, 0)
|
|
|
|
for _, partition := range partitions {
|
|
|
|
partitionIDs = append(partitionIDs, partition.id)
|
|
|
|
}
|
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
releasePartitionRequest := &querypb.ReleasePartitionRequest{
|
|
|
|
Base: req.Base,
|
|
|
|
DbID: dbID,
|
|
|
|
CollectionID: collectionID,
|
2021-02-02 11:52:41 +08:00
|
|
|
PartitionIDs: partitionIDs,
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
status, err := qs.ReleasePartitions(releasePartitionRequest)
|
|
|
|
|
2021-02-04 17:47:19 +08:00
|
|
|
err = qs.replica.releaseCollection(dbID, collectionID)
|
|
|
|
if err != nil {
|
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
}, err
|
|
|
|
}
|
2021-02-07 15:56:57 +08:00
|
|
|
|
|
|
|
fmt.Println("release collection end")
|
2021-02-04 17:47:19 +08:00
|
|
|
//TODO:: queryNode cancel subscribe dmChannels
|
2021-01-26 15:13:20 +08:00
|
|
|
return status, err
|
2021-01-15 15:28:54 +08:00
|
|
|
}
|
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
func (qs *QueryService) ShowPartitions(req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) {
|
|
|
|
dbID := req.DbID
|
|
|
|
collectionID := req.CollectionID
|
2021-02-02 11:52:41 +08:00
|
|
|
partitions, err := qs.replica.getPartitions(dbID, collectionID)
|
|
|
|
partitionIDs := make([]UniqueID, 0)
|
|
|
|
for _, partition := range partitions {
|
|
|
|
partitionIDs = append(partitionIDs, partition.id)
|
|
|
|
}
|
2021-01-22 14:28:06 +08:00
|
|
|
if err != nil {
|
2021-02-02 11:52:41 +08:00
|
|
|
return &querypb.ShowPartitionResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
}, err
|
2021-01-22 14:28:06 +08:00
|
|
|
}
|
|
|
|
return &querypb.ShowPartitionResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
},
|
|
|
|
PartitionIDs: partitionIDs,
|
|
|
|
}, nil
|
2021-01-15 15:28:54 +08:00
|
|
|
}
|
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*commonpb.Status, error) {
|
2021-02-02 11:52:41 +08:00
|
|
|
//TODO::suggest different partitions have different dm channel
|
2021-01-26 15:13:20 +08:00
|
|
|
dbID := req.DbID
|
|
|
|
collectionID := req.CollectionID
|
|
|
|
partitionIDs := req.PartitionIDs
|
2021-02-07 09:30:48 +08:00
|
|
|
schema := req.Schema
|
2021-02-04 17:47:19 +08:00
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
fn := func(err error) *commonpb.Status {
|
2021-02-04 17:47:19 +08:00
|
|
|
if err != nil {
|
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
}
|
|
|
|
}
|
2021-01-26 15:13:20 +08:00
|
|
|
return &commonpb.Status{
|
2021-02-04 17:47:19 +08:00
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
}
|
2021-02-07 09:30:48 +08:00
|
|
|
fmt.Println("load partitions start, partitionIDs = ", partitionIDs)
|
2021-01-26 15:13:20 +08:00
|
|
|
|
2021-02-04 17:47:19 +08:00
|
|
|
if len(partitionIDs) == 0 {
|
|
|
|
err := errors.New("partitionIDs are empty")
|
|
|
|
return fn(err), err
|
|
|
|
}
|
|
|
|
|
2021-02-07 09:30:48 +08:00
|
|
|
_, err := qs.replica.getCollectionByID(dbID, collectionID)
|
|
|
|
if err != nil {
|
|
|
|
err = qs.replica.addCollection(dbID, collectionID, schema)
|
|
|
|
if err != nil {
|
|
|
|
return fn(err), err
|
|
|
|
}
|
|
|
|
err = qs.watchDmChannels(dbID, collectionID)
|
|
|
|
if err != nil {
|
|
|
|
return fn(err), err
|
|
|
|
}
|
2021-02-04 17:47:19 +08:00
|
|
|
}
|
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
for _, partitionID := range partitionIDs {
|
2021-02-07 09:30:48 +08:00
|
|
|
_, err = qs.replica.getPartitionByID(dbID, collectionID, partitionID)
|
|
|
|
if err == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
err = qs.replica.addPartition(dbID, collectionID, partitionID)
|
2021-02-04 17:47:19 +08:00
|
|
|
if err != nil {
|
|
|
|
return fn(err), err
|
|
|
|
}
|
|
|
|
|
2021-01-26 15:13:20 +08:00
|
|
|
showSegmentRequest := &milvuspb.ShowSegmentRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kShowSegment,
|
|
|
|
},
|
|
|
|
CollectionID: collectionID,
|
|
|
|
PartitionID: partitionID,
|
|
|
|
}
|
|
|
|
showSegmentResponse, err := qs.masterServiceClient.ShowSegments(showSegmentRequest)
|
|
|
|
if err != nil {
|
|
|
|
return fn(err), err
|
|
|
|
}
|
|
|
|
segmentIDs := showSegmentResponse.SegmentIDs
|
2021-02-07 09:30:48 +08:00
|
|
|
if len(segmentIDs) == 0 {
|
|
|
|
loadSegmentRequest := &querypb.LoadSegmentRequest{
|
|
|
|
CollectionID: collectionID,
|
|
|
|
PartitionID: partitionID,
|
|
|
|
Schema: schema,
|
|
|
|
}
|
|
|
|
for _, node := range qs.queryNodes {
|
|
|
|
_, err := node.LoadSegments(loadSegmentRequest)
|
|
|
|
if err != nil {
|
|
|
|
return fn(err), nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
|
|
|
|
|
2021-02-02 10:58:39 +08:00
|
|
|
segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
|
2021-02-04 17:47:19 +08:00
|
|
|
channel2segs := make(map[string][]UniqueID)
|
2021-02-02 10:58:39 +08:00
|
|
|
resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{
|
|
|
|
SegmentIDs: segmentIDs,
|
|
|
|
})
|
|
|
|
if err != nil {
|
2021-02-04 17:47:19 +08:00
|
|
|
return fn(err), err
|
2021-02-02 10:58:39 +08:00
|
|
|
}
|
|
|
|
for _, state := range resp.States {
|
2021-02-09 17:09:26 +08:00
|
|
|
fmt.Println("segment ", state.SegmentID, " 's state is ", state.StartPosition)
|
2021-02-02 10:58:39 +08:00
|
|
|
segmentID := state.SegmentID
|
2021-01-26 15:13:20 +08:00
|
|
|
segmentStates[segmentID] = state
|
2021-02-04 17:47:19 +08:00
|
|
|
channelName := state.StartPosition.ChannelName
|
|
|
|
if _, ok := channel2segs[channelName]; !ok {
|
|
|
|
segments := make([]UniqueID, 0)
|
|
|
|
segments = append(segments, segmentID)
|
|
|
|
channel2segs[channelName] = segments
|
2021-01-26 15:13:20 +08:00
|
|
|
} else {
|
2021-02-04 17:47:19 +08:00
|
|
|
channel2segs[channelName] = append(channel2segs[channelName], segmentID)
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
}
|
2021-02-04 17:47:19 +08:00
|
|
|
|
|
|
|
for channel, segmentIDs := range channel2segs {
|
|
|
|
sort.Slice(segmentIDs, func(i, j int) bool {
|
|
|
|
return segmentStates[segmentIDs[i]].StartPosition.Timestamp < segmentStates[segmentIDs[j]].StartPosition.Timestamp
|
|
|
|
})
|
|
|
|
|
2021-02-07 09:30:48 +08:00
|
|
|
states := make([]*datapb.SegmentStateInfo, 0)
|
|
|
|
for _, id := range segmentIDs {
|
|
|
|
states = append(states, segmentStates[id])
|
|
|
|
}
|
2021-02-04 17:47:19 +08:00
|
|
|
loadSegmentRequest := &querypb.LoadSegmentRequest{
|
2021-02-07 09:30:48 +08:00
|
|
|
CollectionID: collectionID,
|
|
|
|
PartitionID: partitionID,
|
|
|
|
SegmentIDs: segmentIDs,
|
|
|
|
SegmentStates: states,
|
|
|
|
Schema: schema,
|
2021-02-04 17:47:19 +08:00
|
|
|
}
|
2021-02-07 09:30:48 +08:00
|
|
|
nodeID, err := qs.replica.getAssignedNodeIDByChannelName(dbID, collectionID, channel)
|
|
|
|
if err != nil {
|
|
|
|
return fn(err), err
|
|
|
|
}
|
|
|
|
queryNode := qs.queryNodes[nodeID]
|
|
|
|
//TODO:: seek when loadSegment may cause more msgs consumed
|
|
|
|
//TODO:: all query node should load partition's msg
|
|
|
|
status, err := queryNode.LoadSegments(loadSegmentRequest)
|
|
|
|
if err != nil {
|
|
|
|
return status, err
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
}
|
2021-02-07 09:30:48 +08:00
|
|
|
|
|
|
|
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
2021-02-04 17:47:19 +08:00
|
|
|
|
2021-02-07 09:30:48 +08:00
|
|
|
fmt.Println("load partitions end, partitionIDs = ", partitionIDs)
|
2021-01-26 15:13:20 +08:00
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
}, nil
|
2021-01-15 15:28:54 +08:00
|
|
|
}
|
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest) (*commonpb.Status, error) {
|
2021-01-26 15:13:20 +08:00
|
|
|
dbID := req.DbID
|
|
|
|
collectionID := req.CollectionID
|
|
|
|
partitionIDs := req.PartitionIDs
|
|
|
|
segmentIDs := make([]UniqueID, 0)
|
2021-02-07 15:56:57 +08:00
|
|
|
fmt.Println("start release partitions start, partitionIDs = ", partitionIDs)
|
2021-01-26 15:13:20 +08:00
|
|
|
for _, partitionID := range partitionIDs {
|
2021-02-09 10:36:31 +08:00
|
|
|
showSegmentRequest := &milvuspb.ShowSegmentRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kShowSegment,
|
|
|
|
},
|
|
|
|
CollectionID: collectionID,
|
|
|
|
PartitionID: partitionID,
|
|
|
|
}
|
|
|
|
showSegmentResponse, err := qs.masterServiceClient.ShowSegments(showSegmentRequest)
|
2021-01-26 15:13:20 +08:00
|
|
|
if err != nil {
|
2021-02-02 11:52:41 +08:00
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
}, err
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
2021-02-02 11:52:41 +08:00
|
|
|
|
2021-02-09 10:36:31 +08:00
|
|
|
segmentIDs = append(segmentIDs, showSegmentResponse.SegmentIDs...)
|
2021-02-04 17:47:19 +08:00
|
|
|
err = qs.replica.releasePartition(dbID, collectionID, partitionID)
|
|
|
|
if err != nil {
|
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
}, err
|
|
|
|
}
|
2021-01-26 15:13:20 +08:00
|
|
|
}
|
|
|
|
releaseSegmentRequest := &querypb.ReleaseSegmentRequest{
|
|
|
|
Base: req.Base,
|
|
|
|
DbID: dbID,
|
|
|
|
CollectionID: collectionID,
|
|
|
|
PartitionIDs: partitionIDs,
|
|
|
|
SegmentIDs: segmentIDs,
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, node := range qs.queryNodes {
|
|
|
|
status, err := node.client.ReleaseSegments(releaseSegmentRequest)
|
|
|
|
if err != nil {
|
|
|
|
return status, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-07 15:56:57 +08:00
|
|
|
fmt.Println("start release partitions end")
|
2021-02-04 17:47:19 +08:00
|
|
|
//TODO:: queryNode cancel subscribe dmChannels
|
2021-01-26 15:13:20 +08:00
|
|
|
return &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
}, nil
|
2021-01-15 15:28:54 +08:00
|
|
|
}
|
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
func (qs *QueryService) CreateQueryChannel() (*querypb.CreateQueryChannelResponse, error) {
|
2021-02-18 16:26:02 +08:00
|
|
|
channelID := len(qs.queryChannels)
|
2021-01-22 14:28:06 +08:00
|
|
|
allocatedQueryChannel := "query-" + strconv.FormatInt(int64(channelID), 10)
|
|
|
|
allocatedQueryResultChannel := "queryResult-" + strconv.FormatInt(int64(channelID), 10)
|
2021-01-15 15:28:54 +08:00
|
|
|
|
2021-02-18 16:26:02 +08:00
|
|
|
qs.qcMutex.Lock()
|
|
|
|
qs.queryChannels = append(qs.queryChannels, &queryChannelInfo{
|
|
|
|
requestChannel: allocatedQueryChannel,
|
|
|
|
responseChannel: allocatedQueryResultChannel,
|
|
|
|
})
|
|
|
|
|
2021-02-09 17:09:26 +08:00
|
|
|
addQueryChannelsRequest := &querypb.AddQueryChannelsRequest{
|
|
|
|
RequestChannelID: allocatedQueryChannel,
|
|
|
|
ResultChannelID: allocatedQueryResultChannel,
|
|
|
|
}
|
2021-02-18 16:26:02 +08:00
|
|
|
fmt.Println("query service create query channel, queryChannelName = ", allocatedQueryChannel)
|
|
|
|
for nodeID, node := range qs.queryNodes {
|
|
|
|
fmt.Println("node ", nodeID, " watch query channel")
|
2021-02-09 17:09:26 +08:00
|
|
|
_, err := node.AddQueryChannel(addQueryChannelsRequest)
|
|
|
|
if err != nil {
|
2021-02-18 16:26:02 +08:00
|
|
|
qs.qcMutex.Unlock()
|
2021-02-09 17:09:26 +08:00
|
|
|
return &querypb.CreateQueryChannelResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
}, err
|
|
|
|
}
|
|
|
|
}
|
2021-02-18 16:26:02 +08:00
|
|
|
qs.qcMutex.Unlock()
|
2021-02-09 17:09:26 +08:00
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
return &querypb.CreateQueryChannelResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
},
|
|
|
|
RequestChannel: allocatedQueryChannel,
|
|
|
|
ResultChannel: allocatedQueryResultChannel,
|
|
|
|
}, nil
|
2021-01-15 15:28:54 +08:00
|
|
|
}
|
|
|
|
|
2021-01-22 14:28:06 +08:00
|
|
|
func (qs *QueryService) GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) {
|
2021-01-26 15:13:20 +08:00
|
|
|
states, err := qs.replica.getPartitionStates(req.DbID, req.CollectionID, req.PartitionIDs)
|
|
|
|
if err != nil {
|
|
|
|
return &querypb.PartitionStatesResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
PartitionDescriptions: states,
|
|
|
|
}, err
|
|
|
|
}
|
|
|
|
return &querypb.PartitionStatesResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
},
|
|
|
|
PartitionDescriptions: states,
|
|
|
|
}, nil
|
2021-01-16 15:31:10 +08:00
|
|
|
}
|
|
|
|
|
2021-02-04 11:40:14 +08:00
|
|
|
func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
|
|
|
|
segmentInfos := make([]*querypb.SegmentInfo, 0)
|
|
|
|
for _, node := range qs.queryNodes {
|
|
|
|
segmentInfo, err := node.client.GetSegmentInfo(req)
|
|
|
|
if err != nil {
|
|
|
|
return &querypb.SegmentInfoResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
}, err
|
|
|
|
}
|
|
|
|
segmentInfos = append(segmentInfos, segmentInfo.Infos...)
|
|
|
|
}
|
|
|
|
return &querypb.SegmentInfoResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
|
|
},
|
|
|
|
Infos: segmentInfos,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2021-02-08 14:30:54 +08:00
|
|
|
func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) {
|
2021-02-18 16:26:02 +08:00
|
|
|
nodes := make(map[int64]*queryNodeInfo)
|
|
|
|
queryChannels := make([]*queryChannelInfo, 0)
|
2021-01-22 14:28:06 +08:00
|
|
|
ctx1, cancel := context.WithCancel(ctx)
|
2021-01-26 15:13:20 +08:00
|
|
|
replica := newMetaReplica()
|
2021-01-22 14:28:06 +08:00
|
|
|
service := &QueryService{
|
2021-02-18 16:26:02 +08:00
|
|
|
loopCtx: ctx1,
|
|
|
|
loopCancel: cancel,
|
|
|
|
replica: replica,
|
|
|
|
queryNodes: nodes,
|
|
|
|
queryChannels: queryChannels,
|
|
|
|
qcMutex: &sync.Mutex{},
|
|
|
|
msFactory: factory,
|
2021-01-22 14:28:06 +08:00
|
|
|
}
|
|
|
|
service.stateCode.Store(internalpb2.StateCode_INITIALIZING)
|
|
|
|
service.isInit.Store(false)
|
|
|
|
return service, nil
|
2021-01-15 15:28:54 +08:00
|
|
|
}
|
2021-01-26 15:13:20 +08:00
|
|
|
|
|
|
|
func (qs *QueryService) SetMasterService(masterService MasterServiceInterface) {
|
|
|
|
qs.masterServiceClient = masterService
|
|
|
|
}
|
|
|
|
|
|
|
|
func (qs *QueryService) SetDataService(dataService DataServiceInterface) {
|
|
|
|
qs.dataServiceClient = dataService
|
|
|
|
}
|
|
|
|
|
2021-02-07 09:30:48 +08:00
|
|
|
func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) error {
|
|
|
|
collection, err := qs.replica.getCollectionByID(0, collectionID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
channelRequest := datapb.InsertChannelRequest{
|
|
|
|
DbID: dbID,
|
|
|
|
CollectionID: collectionID,
|
|
|
|
}
|
|
|
|
resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if len(resp.Values) == 0 {
|
|
|
|
err = errors.New("haven't assign dm channel to collection")
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
dmChannels := resp.Values
|
2021-02-18 16:26:02 +08:00
|
|
|
watchedChannels2NodeID := make(map[string]int64)
|
2021-02-07 15:47:10 +08:00
|
|
|
unWatchedChannels := make([]string, 0)
|
|
|
|
for _, channel := range dmChannels {
|
|
|
|
findChannel := false
|
|
|
|
for nodeID, node := range qs.queryNodes {
|
|
|
|
watchedChannels := node.dmChannelNames
|
|
|
|
for _, watchedChannel := range watchedChannels {
|
|
|
|
if channel == watchedChannel {
|
|
|
|
findChannel = true
|
|
|
|
watchedChannels2NodeID[channel] = nodeID
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !findChannel {
|
|
|
|
unWatchedChannels = append(unWatchedChannels, channel)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
channels2NodeID := qs.shuffleChannelsToQueryNode(unWatchedChannels)
|
2021-02-07 09:30:48 +08:00
|
|
|
err = qs.replica.addDmChannels(dbID, collection.id, channels2NodeID)
|
2021-02-04 17:47:19 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-02-07 15:47:10 +08:00
|
|
|
err = qs.replica.addDmChannels(dbID, collection.id, watchedChannels2NodeID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-02-18 16:26:02 +08:00
|
|
|
node2channels := make(map[int64][]string)
|
2021-02-07 09:30:48 +08:00
|
|
|
for channel, nodeID := range channels2NodeID {
|
|
|
|
if _, ok := node2channels[nodeID]; ok {
|
|
|
|
node2channels[nodeID] = append(node2channels[nodeID], channel)
|
|
|
|
} else {
|
|
|
|
channels := make([]string, 0)
|
|
|
|
channels = append(channels, channel)
|
|
|
|
node2channels[nodeID] = channels
|
2021-02-04 17:47:19 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for nodeID, channels := range node2channels {
|
|
|
|
node := qs.queryNodes[nodeID]
|
|
|
|
request := &querypb.WatchDmChannelsRequest{
|
|
|
|
ChannelIDs: channels,
|
|
|
|
}
|
|
|
|
_, err := node.WatchDmChannels(request)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-02-07 09:30:48 +08:00
|
|
|
fmt.Println("query node ", nodeID, "watch channels = ", channels)
|
|
|
|
node.AddDmChannels(channels)
|
2021-02-04 17:47:19 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-18 16:26:02 +08:00
|
|
|
func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[string]int64 {
|
2021-02-04 17:47:19 +08:00
|
|
|
maxNumDMChannel := 0
|
2021-02-18 16:26:02 +08:00
|
|
|
res := make(map[string]int64)
|
2021-02-07 15:47:10 +08:00
|
|
|
if len(dmChannels) == 0 {
|
|
|
|
return res
|
|
|
|
}
|
2021-02-18 16:26:02 +08:00
|
|
|
node2lens := make(map[int64]int)
|
2021-02-04 17:47:19 +08:00
|
|
|
for id, node := range qs.queryNodes {
|
|
|
|
node2lens[id] = len(node.dmChannelNames)
|
|
|
|
}
|
|
|
|
offset := 0
|
|
|
|
for {
|
|
|
|
lastOffset := offset
|
|
|
|
for id, len := range node2lens {
|
|
|
|
if len >= maxNumDMChannel {
|
|
|
|
maxNumDMChannel = len
|
|
|
|
} else {
|
2021-02-07 09:30:48 +08:00
|
|
|
res[dmChannels[offset]] = id
|
2021-02-04 17:47:19 +08:00
|
|
|
node2lens[id]++
|
|
|
|
offset++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if lastOffset == offset {
|
|
|
|
for id := range node2lens {
|
2021-02-07 09:30:48 +08:00
|
|
|
res[dmChannels[offset]] = id
|
2021-02-04 17:47:19 +08:00
|
|
|
node2lens[id]++
|
|
|
|
offset++
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if offset == len(dmChannels) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return res
|
|
|
|
}
|