mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Fix wrong error codes & names (#26904)
Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
parent
a6c6d7301c
commit
45d9fb5929
@ -55,7 +55,7 @@ func (h *Handlers) checkDatabase(c *gin.Context, dbName string) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrDatabaseNotfound), HTTPReturnMessage: merr.ErrDatabaseNotfound.Error()})
|
||||
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrDatabaseNotFound), HTTPReturnMessage: merr.ErrDatabaseNotFound.Error()})
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -1168,7 +1168,7 @@ func TestDatabaseNotFound(t *testing.T) {
|
||||
assert.Equal(t, w.Body.String(), "{\"code\":200,\"data\":[]}")
|
||||
})
|
||||
|
||||
errorStr := PrintErr(merr.ErrDatabaseNotfound)
|
||||
errorStr := PrintErr(merr.ErrDatabaseNotFound)
|
||||
paths := map[string][]string{
|
||||
errorStr: {
|
||||
URIPrefix + VectorCollectionsPath + "?dbName=test",
|
||||
|
@ -31,7 +31,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
@ -942,7 +941,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
|
||||
/* Step I: Check if key exists */
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
|
||||
return merr.WrapErrMqTopicNotFound(topicName)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
@ -968,7 +967,7 @@ func (rmq *rocksmq) ForceSeek(topicName string, groupName string, msgID UniqueID
|
||||
/* Step I: Check if key exists */
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
|
||||
return merr.WrapErrMqTopicNotFound(topicName)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
@ -1159,7 +1158,7 @@ func (rmq *rocksmq) CheckTopicValid(topic string) error {
|
||||
|
||||
_, ok := topicMu.Load(topic)
|
||||
if !ok {
|
||||
return merr.WrapErrTopicNotFound(topic, "failed to get topic")
|
||||
return merr.WrapErrMqTopicNotFound(topic, "failed to get topic")
|
||||
}
|
||||
|
||||
latestMsgID, err := rmq.GetLatestMsg(topic)
|
||||
@ -1168,7 +1167,7 @@ func (rmq *rocksmq) CheckTopicValid(topic string) error {
|
||||
}
|
||||
|
||||
if latestMsgID != DefaultMessageID {
|
||||
return merr.WrapErrTopicNotEmpty(topic, "topic is not empty")
|
||||
return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty")
|
||||
}
|
||||
log.Info("created topic is empty")
|
||||
return nil
|
||||
|
@ -1189,7 +1189,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) {
|
||||
channelName1 := "topic1"
|
||||
// topic not exist
|
||||
err = rmq.CheckTopicValid(channelName1)
|
||||
assert.Equal(t, true, errors.Is(err, merr.ErrTopicNotFound))
|
||||
assert.Equal(t, true, errors.Is(err, merr.ErrMqTopicNotFound))
|
||||
|
||||
channelName2 := "topic2"
|
||||
// topic is not empty
|
||||
@ -1208,7 +1208,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = rmq.CheckTopicValid(channelName2)
|
||||
assert.Equal(t, true, errors.Is(err, merr.ErrTopicNotEmpty))
|
||||
assert.Equal(t, true, errors.Is(err, merr.ErrMqTopicNotEmpty))
|
||||
|
||||
channelName3 := "topic3"
|
||||
// pass
|
||||
|
@ -810,7 +810,7 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi
|
||||
if err != nil {
|
||||
log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err))
|
||||
// stop retry if consumer topic not exist
|
||||
if errors.Is(err, mqwrapper.ErrTopicNotExist) {
|
||||
if errors.Is(err, merr.ErrMqTopicNotFound) {
|
||||
return retry.Unrecoverable(err)
|
||||
}
|
||||
return err
|
||||
|
@ -1,6 +0,0 @@
|
||||
package mqwrapper
|
||||
|
||||
import "github.com/cockroachdb/errors"
|
||||
|
||||
// ErrTopicNotExist topic not exist error.
|
||||
var ErrTopicNotExist = errors.New("topic not exist")
|
@ -235,8 +235,11 @@ func (kc *Consumer) CheckTopicValid(topic string) error {
|
||||
if err != nil {
|
||||
switch v := err.(type) {
|
||||
case kafka.Error:
|
||||
if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownPartition || v.Code() == kafka.ErrUnknownTopicOrPart {
|
||||
return merr.WrapErrTopicNotFound(topic, "topic get latest msg ID failed, topic or partition does not exists")
|
||||
|
||||
if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownTopicOrPart {
|
||||
return merr.WrapErrMqTopicNotFound(topic, err.Error())
|
||||
} else {
|
||||
return merr.WrapErrMqInternal(err)
|
||||
}
|
||||
default:
|
||||
return err
|
||||
@ -245,7 +248,7 @@ func (kc *Consumer) CheckTopicValid(topic string) error {
|
||||
|
||||
// check topic is empty
|
||||
if !latestMsgID.AtEarliestPosition() {
|
||||
return merr.WrapErrTopicNotEmpty(topic, "topic is not empty")
|
||||
return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty")
|
||||
}
|
||||
log.Info("created topic is empty")
|
||||
|
||||
|
@ -164,7 +164,7 @@ func (nc *Consumer) CheckTopicValid(topic string) error {
|
||||
// check if topic valid or exist.
|
||||
streamInfo, err := nc.js.StreamInfo(topic)
|
||||
if errors.Is(err, nats.ErrStreamNotFound) {
|
||||
return merr.WrapErrTopicNotFound(topic, err.Error())
|
||||
return merr.WrapErrMqTopicNotFound(topic, err.Error())
|
||||
} else if err != nil {
|
||||
log.Warn("fail to get stream info of nats", zap.String("topic", nc.topic), zap.Error(err))
|
||||
return errors.Wrap(err, "failed to get stream info of nats jetstream")
|
||||
@ -172,7 +172,7 @@ func (nc *Consumer) CheckTopicValid(topic string) error {
|
||||
|
||||
// check if topic stream is empty.
|
||||
if streamInfo.State.Msgs > 0 {
|
||||
return merr.WrapErrTopicNotEmpty(topic, "stream in nats is not empty")
|
||||
return merr.WrapErrMqTopicNotEmpty(topic, "stream in nats is not empty")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -230,7 +230,7 @@ func TestCheckTopicValid(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = consumer.CheckTopicValid(topic)
|
||||
assert.ErrorIs(t, err, merr.ErrTopicNotEmpty)
|
||||
assert.ErrorIs(t, err, merr.ErrMqTopicNotEmpty)
|
||||
|
||||
consumer.Close()
|
||||
err = consumer.CheckTopicValid(topic)
|
||||
|
@ -165,7 +165,7 @@ func (pc *Consumer) CheckTopicValid(topic string) error {
|
||||
}
|
||||
|
||||
if !latestMsgID.AtEarliestPosition() {
|
||||
return merr.WrapErrTopicNotEmpty(topic, "topic is not empty")
|
||||
return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty")
|
||||
}
|
||||
log.Info("created topic is empty", zap.String("topic", topic))
|
||||
return nil
|
||||
|
@ -48,9 +48,9 @@ var (
|
||||
ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
|
||||
|
||||
// Partition related
|
||||
ErrPartitionNotFound = newMilvusError("partition not found", 202, false)
|
||||
ErrPartitionNotLoaded = newMilvusError("partition not loaded", 203, false)
|
||||
ErrPartitionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
|
||||
ErrPartitionNotFound = newMilvusError("partition not found", 200, false)
|
||||
ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false)
|
||||
ErrPartitionNotFullyLoaded = newMilvusError("partition not fully loaded", 202, true)
|
||||
|
||||
// ResourceGroup related
|
||||
ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false)
|
||||
@ -75,9 +75,9 @@ var (
|
||||
ErrIndexNotFound = newMilvusError("index not found", 700, false)
|
||||
|
||||
// Database related
|
||||
ErrDatabaseNotfound = newMilvusError("database not found", 800, false)
|
||||
ErrDatabaseNotFound = newMilvusError("database not found", 800, false)
|
||||
ErrDatabaseNumLimitExceeded = newMilvusError("exceeded the limit number of database", 801, false)
|
||||
ErrInvalidedDatabaseName = newMilvusError("invalided database name", 802, false)
|
||||
ErrDatabaseInvalidName = newMilvusError("invalid database name", 802, false)
|
||||
|
||||
// Node related
|
||||
ErrNodeNotFound = newMilvusError("node not found", 901, false)
|
||||
@ -96,15 +96,16 @@ var (
|
||||
// Metrics related
|
||||
ErrMetricNotFound = newMilvusError("metric not found", 1200, false)
|
||||
|
||||
// Topic related
|
||||
ErrTopicNotFound = newMilvusError("topic not found", 1300, false)
|
||||
ErrTopicNotEmpty = newMilvusError("topic not empty", 1301, false)
|
||||
// Message queue related
|
||||
ErrMqTopicNotFound = newMilvusError("topic not found", 1300, false)
|
||||
ErrMqTopicNotEmpty = newMilvusError("topic not empty", 1301, false)
|
||||
ErrMqInternal = newMilvusError("message queue internal error", 1302, false)
|
||||
|
||||
// field related
|
||||
ErrFieldNotFound = newMilvusError("field not found", 1700, false)
|
||||
|
||||
// high-level restful api related
|
||||
ErrNeedAuthenticate = newMilvusError("user hasn't authenticate", 1800, false)
|
||||
ErrNeedAuthenticate = newMilvusError("user hasn't authenticated", 1800, false)
|
||||
ErrIncorrectParameterFormat = newMilvusError("can only accept json format request", 1801, false)
|
||||
ErrMissingRequiredParameters = newMilvusError("missing required parameters", 1802, false)
|
||||
ErrMarshalCollectionSchema = newMilvusError("fail to marshal collection schema", 1803, false)
|
||||
|
@ -126,9 +126,10 @@ func (s *ErrSuite) TestWrap() {
|
||||
// Metrics related
|
||||
s.ErrorIs(WrapErrMetricNotFound("unknown", "failed to get metric"), ErrMetricNotFound)
|
||||
|
||||
// Topic related
|
||||
s.ErrorIs(WrapErrTopicNotFound("unknown", "failed to get topic"), ErrTopicNotFound)
|
||||
s.ErrorIs(WrapErrTopicNotEmpty("unknown", "topic is not empty"), ErrTopicNotEmpty)
|
||||
// Message queue related
|
||||
s.ErrorIs(WrapErrMqTopicNotFound("unknown", "failed to get topic"), ErrMqTopicNotFound)
|
||||
s.ErrorIs(WrapErrMqTopicNotEmpty("unknown", "topic is not empty"), ErrMqTopicNotEmpty)
|
||||
s.ErrorIs(WrapErrMqInternal(errors.New("unknown"), "failed to consume"), ErrMqInternal)
|
||||
|
||||
// field related
|
||||
s.ErrorIs(WrapErrFieldNotFound("meta", "failed to get field"), ErrFieldNotFound)
|
||||
|
@ -205,7 +205,7 @@ func WrapErrServiceDiskLimitExceeded(predict, limit float32, msg ...string) erro
|
||||
}
|
||||
|
||||
func WrapErrDatabaseNotFound(database any, msg ...string) error {
|
||||
err := wrapWithField(ErrDatabaseNotfound, "database", database)
|
||||
err := wrapWithField(ErrDatabaseNotFound, "database", database)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
}
|
||||
@ -221,7 +221,7 @@ func WrapErrDatabaseResourceLimitExceeded(msg ...string) error {
|
||||
}
|
||||
|
||||
func WrapErrInvalidedDatabaseName(database any, msg ...string) error {
|
||||
err := wrapWithField(ErrInvalidedDatabaseName, "database", database)
|
||||
err := wrapWithField(ErrDatabaseInvalidName, "database", database)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
}
|
||||
@ -484,17 +484,25 @@ func WrapErrMetricNotFound(name string, msg ...string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Topic related
|
||||
func WrapErrTopicNotFound(name string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrTopicNotFound, "topic=%s", name)
|
||||
// Message queue related
|
||||
func WrapErrMqTopicNotFound(name string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrMqTopicNotFound, "topic=%s", name)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrTopicNotEmpty(name string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrTopicNotEmpty, "topic=%s", name)
|
||||
func WrapErrMqTopicNotEmpty(name string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrMqTopicNotEmpty, "topic=%s", name)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrMqInternal(err error, msg ...string) error {
|
||||
err = errors.Wrapf(ErrMqInternal, "internal=%v", err)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user