mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Fix querynodev2 search/query segments return unexpected grpc err (#26341)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
ec23b81f23
commit
123ad921e0
@ -685,12 +685,13 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
||||
zap.String("scope", req.GetScope().String()),
|
||||
)
|
||||
|
||||
failRet := WrapSearchResult(commonpb.ErrorCode_UnexpectedError, "")
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
return nil, merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))
|
||||
failRet.Status = merr.Status(merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())))
|
||||
return failRet, nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
failRet := WrapSearchResult(commonpb.ErrorCode_UnexpectedError, "")
|
||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
|
||||
defer func() {
|
||||
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
@ -712,21 +713,21 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
||||
err := merr.WrapErrCollectionNotLoaded(req.GetReq().GetCollectionID())
|
||||
log.Warn("failed to search segments", zap.Error(err))
|
||||
failRet.Status = merr.Status(err)
|
||||
return failRet, err
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
task := tasks.NewSearchTask(searchCtx, collection, node.manager, req)
|
||||
if err := node.scheduler.Add(task); err != nil {
|
||||
log.Warn("failed to search channel", zap.Error(err))
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
err := task.Wait()
|
||||
if err != nil {
|
||||
log.Warn("failed to search segments", zap.Error(err))
|
||||
failRet.Status.Reason = err.Error()
|
||||
return failRet, err
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
tr.CtxElapse(ctx, fmt.Sprintf("search segments done, channel = %s, segmentIDs = %v",
|
||||
@ -904,19 +905,21 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
|
||||
tr := timerecord.NewTimeRecorder("querySegments")
|
||||
collection := node.manager.Collection.Get(req.Req.GetCollectionID())
|
||||
if collection == nil {
|
||||
return nil, merr.WrapErrCollectionNotFound(req.Req.GetCollectionID())
|
||||
failRet.Status = merr.Status(merr.WrapErrCollectionNotLoaded(req.Req.GetCollectionID()))
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
// Send task to scheduler and wait until it finished.
|
||||
task := tasks.NewQueryTask(queryCtx, collection, node.manager, req)
|
||||
if err := node.scheduler.Add(task); err != nil {
|
||||
log.Warn("failed to add query task into scheduler", zap.Error(err))
|
||||
return nil, err
|
||||
failRet.Status = merr.Status(err)
|
||||
return failRet, nil
|
||||
}
|
||||
err := task.Wait()
|
||||
if err != nil {
|
||||
log.Warn("failed to query channel", zap.Error(err))
|
||||
failRet.Status.Reason = err.Error()
|
||||
failRet.Status = merr.Status(err)
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
|
@ -1191,6 +1191,52 @@ func (suite *ServiceSuite) TestSearch_Failed() {
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestSearchSegments_Unhealthy() {
|
||||
ctx := context.Background()
|
||||
|
||||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
|
||||
req := &querypb.SearchRequest{
|
||||
FromShardLeader: true,
|
||||
DmlChannels: []string{suite.vchannel},
|
||||
TotalChannelNum: 2,
|
||||
}
|
||||
|
||||
rsp, err := suite.node.SearchSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, rsp.GetStatus().GetErrorCode())
|
||||
suite.Equal(merr.Code(merr.ErrServiceNotReady), rsp.GetStatus().GetCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestSearchSegments_Failed() {
|
||||
ctx := context.Background()
|
||||
|
||||
// collection found
|
||||
req := &querypb.SearchRequest{
|
||||
Req: &internalpb.SearchRequest{
|
||||
CollectionID: -1, // not exist collection id
|
||||
},
|
||||
FromShardLeader: true,
|
||||
DmlChannels: []string{suite.vchannel},
|
||||
TotalChannelNum: 2,
|
||||
}
|
||||
|
||||
rsp, err := suite.node.SearchSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, rsp.GetStatus().GetErrorCode())
|
||||
suite.Equal(merr.Code(merr.ErrCollectionNotLoaded), rsp.GetStatus().GetCode())
|
||||
|
||||
suite.TestWatchDmChannelsInt64()
|
||||
|
||||
req.Req.CollectionID = suite.collectionID
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
cancel()
|
||||
rsp, err = suite.node.SearchSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, rsp.GetStatus().GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestSearchSegments_Normal() {
|
||||
ctx := context.Background()
|
||||
// pre
|
||||
@ -1290,6 +1336,35 @@ func (suite *ServiceSuite) TestQuery_Failed() {
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestQuerySegments_Failed() {
|
||||
ctx := context.Background()
|
||||
|
||||
req := &querypb.QueryRequest{
|
||||
Req: &internalpb.RetrieveRequest{
|
||||
CollectionID: -1,
|
||||
},
|
||||
FromShardLeader: true,
|
||||
DmlChannels: []string{suite.vchannel},
|
||||
}
|
||||
|
||||
rsp, err := suite.node.QuerySegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, rsp.GetStatus().GetErrorCode())
|
||||
suite.Equal(merr.Code(merr.ErrCollectionNotLoaded), rsp.GetStatus().GetCode())
|
||||
|
||||
suite.TestWatchDmChannelsInt64()
|
||||
|
||||
req.Req.CollectionID = suite.collectionID
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
cancel()
|
||||
|
||||
rsp, err = suite.node.QuerySegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, rsp.GetStatus().GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestQuerySegments_Normal() {
|
||||
ctx := context.Background()
|
||||
// pre
|
||||
|
Loading…
Reference in New Issue
Block a user