Fix Load returns incorrect error code (#19318)

- Load parameters mismatched returns code 5
- Check whether nodes is enough to load

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2022-09-21 17:54:50 +08:00 committed by GitHub
parent d379a23dae
commit 10c0ff1211
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 106 additions and 16 deletions

View File

@ -2,6 +2,7 @@ package querycoordv2
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -10,6 +11,7 @@ import (
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/job"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
@ -286,3 +288,10 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
}
return info, nil
}
func errCode(err error) commonpb.ErrorCode {
if errors.Is(err, job.ErrLoadParameterMismatched) {
return commonpb.ErrorCode_IllegalArgument
}
return commonpb.ErrorCode_UnexpectedError
}

View File

@ -9,4 +9,5 @@ var (
// Load errors
ErrCollectionLoaded = errors.New("CollectionLoaded")
ErrLoadParameterMismatched = errors.New("LoadParameterMismatched")
ErrNoEnoughNode = errors.New("NoEnoughNode")
)

View File

@ -133,7 +133,7 @@ func (job *LoadCollectionJob) PreExecute() error {
if job.meta.Exist(req.GetCollectionID()) {
old := job.meta.GetCollection(req.GetCollectionID())
if old == nil {
msg := "collection with different load type existed, please release it first"
msg := "load the partition after load collection is not supported"
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
}
@ -147,6 +147,12 @@ func (job *LoadCollectionJob) PreExecute() error {
return ErrCollectionLoaded
}
if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) {
msg := "no enough nodes to create replicas"
log.Warn(msg)
return utils.WrapError(msg, ErrNoEnoughNode)
}
return nil
}
@ -324,7 +330,7 @@ func (job *LoadPartitionJob) PreExecute() error {
if job.meta.Exist(req.GetCollectionID()) {
old := job.meta.GetCollection(req.GetCollectionID())
if old != nil {
msg := "collection with different load type existed, please release it first"
msg := "load the partition after load collection is not supported"
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
}
@ -348,6 +354,12 @@ func (job *LoadPartitionJob) PreExecute() error {
return ErrCollectionLoaded
}
if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) {
msg := "no enough nodes to create replicas"
log.Warn(msg)
return utils.WrapError(msg, ErrNoEnoughNode)
}
return nil
}

View File

@ -105,6 +105,7 @@ func (suite *JobSuite) SetupTest() {
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store)
suite.targetMgr = meta.NewTargetManager()
suite.nodeMgr = session.NewNodeManager()
suite.nodeMgr.Add(&session.NodeInfo{})
suite.handoffObserver = observers.NewHandoffObserver(
suite.store,
suite.meta,
@ -240,6 +241,35 @@ func (suite *JobSuite) TestLoadCollection() {
}
}
func (suite *JobSuite) TestLoadCollectionWithReplicas() {
ctx := context.Background()
// Test load collection
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
// Load with 3 replica
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
ReplicaNumber: 3,
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.targetMgr,
suite.broker,
suite.nodeMgr,
suite.handoffObserver,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrNoEnoughNode)
}
}
func (suite *JobSuite) TestLoadPartition() {
ctx := context.Background()
@ -375,6 +405,36 @@ func (suite *JobSuite) TestLoadPartition() {
}
}
func (suite *JobSuite) TestLoadPartitionWithReplicas() {
ctx := context.Background()
// Test load partitions
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}
// Load with 3 replica
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
ReplicaNumber: 3,
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.targetMgr,
suite.broker,
suite.nodeMgr,
suite.handoffObserver,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrNoEnoughNode)
}
}
func (suite *JobSuite) TestReleaseCollection() {
ctx := context.Background()

View File

@ -38,6 +38,7 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
}, nil
}
isGetAll := false
collectionSet := typeutil.NewUniqueSet(req.GetCollectionIDs()...)
if len(req.GetCollectionIDs()) == 0 {
for _, collection := range s.meta.GetAllCollections() {
@ -46,28 +47,35 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
for _, partition := range s.meta.GetAllPartitions() {
collectionSet.Insert(partition.GetCollectionID())
}
isGetAll = true
}
collections := collectionSet.Collect()
resp := &querypb.ShowCollectionsResponse{
Status: successStatus,
CollectionIDs: collections,
InMemoryPercentages: make([]int64, len(collectionSet)),
QueryServiceAvailable: make([]bool, len(collectionSet)),
CollectionIDs: make([]int64, 0, len(collectionSet)),
InMemoryPercentages: make([]int64, 0, len(collectionSet)),
QueryServiceAvailable: make([]bool, 0, len(collectionSet)),
}
for i, collectionID := range collections {
for _, collectionID := range collections {
log := log.With(zap.Int64("collectionID", collectionID))
percentage := s.meta.CollectionManager.GetLoadPercentage(collectionID)
if percentage < 0 {
if isGetAll {
// The collection is released during this,
// ignore it
continue
}
err := fmt.Errorf("collection %d has not been loaded to memory or load failed", collectionID)
log.Warn("show collection failed", zap.Error(err))
return &querypb.ShowCollectionsResponse{
Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()),
}, nil
}
resp.InMemoryPercentages[i] = int64(percentage)
resp.QueryServiceAvailable[i] = s.checkAnyReplicaAvailable(collectionID)
resp.CollectionIDs = append(resp.CollectionIDs, collectionID)
resp.InMemoryPercentages = append(resp.InMemoryPercentages, int64(percentage))
resp.QueryServiceAvailable = append(resp.QueryServiceAvailable, s.checkAnyReplicaAvailable(collectionID))
}
return resp, nil
@ -79,7 +87,7 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
zap.Int64("collectionID", req.GetCollectionID()),
)
log.Info("show partitions", zap.Int64s("partitions", req.GetPartitionIDs()))
log.Info("show partitions request received", zap.Int64s("partitions", req.GetPartitionIDs()))
if s.status.Load() != internalpb.StateCode_Healthy {
msg := "failed to show partitions"
@ -184,7 +192,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
msg := "failed to load collection"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil
return utils.WrapStatus(errCode(err), msg, err), nil
}
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
@ -264,7 +272,7 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
msg := "failed to load partitions"
log.Warn(msg, zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil
return utils.WrapStatus(errCode(err), msg, err), nil
}
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()

View File

@ -271,7 +271,7 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() {
}
resp, err := server.LoadCollection(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
}
@ -286,7 +286,7 @@ func (suite *ServiceSuite) TestLoadCollectionFailed() {
}
resp, err := server.LoadCollection(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
}
}
@ -345,7 +345,7 @@ func (suite *ServiceSuite) TestLoadPartitionFailed() {
}
resp, err := server.LoadPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
}
@ -360,7 +360,7 @@ func (suite *ServiceSuite) TestLoadPartitionFailed() {
}
resp, err := server.LoadPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
}
@ -375,7 +375,7 @@ func (suite *ServiceSuite) TestLoadPartitionFailed() {
}
resp, err := server.LoadPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
suite.Contains(resp.Reason, job.ErrLoadParameterMismatched.Error())
}
}