diff --git a/internal/distributed/proxy/httpserver/handler_v1.go b/internal/distributed/proxy/httpserver/handler_v1.go index df4e4b83cb..9ae787ac2b 100644 --- a/internal/distributed/proxy/httpserver/handler_v1.go +++ b/internal/distributed/proxy/httpserver/handler_v1.go @@ -55,7 +55,7 @@ func (h *Handlers) checkDatabase(c *gin.Context, dbName string) bool { return true } } - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrDatabaseNotfound), HTTPReturnMessage: merr.ErrDatabaseNotfound.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrDatabaseNotFound), HTTPReturnMessage: merr.ErrDatabaseNotFound.Error()}) return false } diff --git a/internal/distributed/proxy/httpserver/handler_v1_test.go b/internal/distributed/proxy/httpserver/handler_v1_test.go index 4fdcfe3ec3..d83fa80d21 100644 --- a/internal/distributed/proxy/httpserver/handler_v1_test.go +++ b/internal/distributed/proxy/httpserver/handler_v1_test.go @@ -1168,7 +1168,7 @@ func TestDatabaseNotFound(t *testing.T) { assert.Equal(t, w.Body.String(), "{\"code\":200,\"data\":[]}") }) - errorStr := PrintErr(merr.ErrDatabaseNotfound) + errorStr := PrintErr(merr.ErrDatabaseNotFound) paths := map[string][]string{ errorStr: { URIPrefix + VectorCollectionsPath + "?dbName=test", diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 312aa534b2..0a380c66eb 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -31,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/kv" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -942,7 +941,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err /* Step I: Check if key exists */ ll, ok := topicMu.Load(topicName) if !ok { - return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) + return merr.WrapErrMqTopicNotFound(topicName) } lock, ok := ll.(*sync.Mutex) if !ok { @@ -968,7 +967,7 @@ func (rmq *rocksmq) ForceSeek(topicName string, groupName string, msgID UniqueID /* Step I: Check if key exists */ ll, ok := topicMu.Load(topicName) if !ok { - return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) + return merr.WrapErrMqTopicNotFound(topicName) } lock, ok := ll.(*sync.Mutex) if !ok { @@ -1159,7 +1158,7 @@ func (rmq *rocksmq) CheckTopicValid(topic string) error { _, ok := topicMu.Load(topic) if !ok { - return merr.WrapErrTopicNotFound(topic, "failed to get topic") + return merr.WrapErrMqTopicNotFound(topic, "failed to get topic") } latestMsgID, err := rmq.GetLatestMsg(topic) @@ -1168,7 +1167,7 @@ func (rmq *rocksmq) CheckTopicValid(topic string) error { } if latestMsgID != DefaultMessageID { - return merr.WrapErrTopicNotEmpty(topic, "topic is not empty") + return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty") } log.Info("created topic is empty") return nil diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index b089857d10..3c89f7381b 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -1189,7 +1189,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) { channelName1 := "topic1" // topic not exist err = rmq.CheckTopicValid(channelName1) - assert.Equal(t, true, errors.Is(err, merr.ErrTopicNotFound)) + assert.Equal(t, true, errors.Is(err, merr.ErrMqTopicNotFound)) channelName2 := "topic2" // topic is not empty @@ -1208,7 +1208,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) { assert.NoError(t, err) err = rmq.CheckTopicValid(channelName2) - assert.Equal(t, true, errors.Is(err, merr.ErrTopicNotEmpty)) + assert.Equal(t, true, errors.Is(err, merr.ErrMqTopicNotEmpty)) channelName3 := "topic3" // pass diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 0e5921926c..864d3aab53 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -810,7 +810,7 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi if err != nil { log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) // stop retry if consumer topic not exist - if errors.Is(err, mqwrapper.ErrTopicNotExist) { + if errors.Is(err, merr.ErrMqTopicNotFound) { return retry.Unrecoverable(err) } return err diff --git a/pkg/mq/msgstream/mqwrapper/errors.go b/pkg/mq/msgstream/mqwrapper/errors.go deleted file mode 100644 index 9683d9f297..0000000000 --- a/pkg/mq/msgstream/mqwrapper/errors.go +++ /dev/null @@ -1,6 +0,0 @@ -package mqwrapper - -import "github.com/cockroachdb/errors" - -// ErrTopicNotExist topic not exist error. -var ErrTopicNotExist = errors.New("topic not exist") diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index 2b49792299..edfadea737 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -235,8 +235,11 @@ func (kc *Consumer) CheckTopicValid(topic string) error { if err != nil { switch v := err.(type) { case kafka.Error: - if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownPartition || v.Code() == kafka.ErrUnknownTopicOrPart { - return merr.WrapErrTopicNotFound(topic, "topic get latest msg ID failed, topic or partition does not exists") + + if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownTopicOrPart { + return merr.WrapErrMqTopicNotFound(topic, err.Error()) + } else { + return merr.WrapErrMqInternal(err) } default: return err @@ -245,7 +248,7 @@ func (kc *Consumer) CheckTopicValid(topic string) error { // check topic is empty if !latestMsgID.AtEarliestPosition() { - return merr.WrapErrTopicNotEmpty(topic, "topic is not empty") + return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty") } log.Info("created topic is empty") diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer.go index c972638f91..132b71571f 100644 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer.go @@ -164,7 +164,7 @@ func (nc *Consumer) CheckTopicValid(topic string) error { // check if topic valid or exist. streamInfo, err := nc.js.StreamInfo(topic) if errors.Is(err, nats.ErrStreamNotFound) { - return merr.WrapErrTopicNotFound(topic, err.Error()) + return merr.WrapErrMqTopicNotFound(topic, err.Error()) } else if err != nil { log.Warn("fail to get stream info of nats", zap.String("topic", nc.topic), zap.Error(err)) return errors.Wrap(err, "failed to get stream info of nats jetstream") @@ -172,7 +172,7 @@ func (nc *Consumer) CheckTopicValid(topic string) error { // check if topic stream is empty. if streamInfo.State.Msgs > 0 { - return merr.WrapErrTopicNotEmpty(topic, "stream in nats is not empty") + return merr.WrapErrMqTopicNotEmpty(topic, "stream in nats is not empty") } return nil } diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer_test.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer_test.go index 11f500546b..acc68e07b9 100644 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer_test.go +++ b/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer_test.go @@ -230,7 +230,7 @@ func TestCheckTopicValid(t *testing.T) { assert.NoError(t, err) err = consumer.CheckTopicValid(topic) - assert.ErrorIs(t, err, merr.ErrTopicNotEmpty) + assert.ErrorIs(t, err, merr.ErrMqTopicNotEmpty) consumer.Close() err = consumer.CheckTopicValid(topic) diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go index eb6c99d4c7..f2904bb27b 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go @@ -165,7 +165,7 @@ func (pc *Consumer) CheckTopicValid(topic string) error { } if !latestMsgID.AtEarliestPosition() { - return merr.WrapErrTopicNotEmpty(topic, "topic is not empty") + return merr.WrapErrMqTopicNotEmpty(topic, "topic is not empty") } log.Info("created topic is empty", zap.String("topic", topic)) return nil diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 84c92b2e94..a36a9ef9ba 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -48,9 +48,9 @@ var ( ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) // Partition related - ErrPartitionNotFound = newMilvusError("partition not found", 202, false) - ErrPartitionNotLoaded = newMilvusError("partition not loaded", 203, false) - ErrPartitionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) + ErrPartitionNotFound = newMilvusError("partition not found", 200, false) + ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false) + ErrPartitionNotFullyLoaded = newMilvusError("partition not fully loaded", 202, true) // ResourceGroup related ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false) @@ -75,9 +75,9 @@ var ( ErrIndexNotFound = newMilvusError("index not found", 700, false) // Database related - ErrDatabaseNotfound = newMilvusError("database not found", 800, false) + ErrDatabaseNotFound = newMilvusError("database not found", 800, false) ErrDatabaseNumLimitExceeded = newMilvusError("exceeded the limit number of database", 801, false) - ErrInvalidedDatabaseName = newMilvusError("invalided database name", 802, false) + ErrDatabaseInvalidName = newMilvusError("invalid database name", 802, false) // Node related ErrNodeNotFound = newMilvusError("node not found", 901, false) @@ -96,15 +96,16 @@ var ( // Metrics related ErrMetricNotFound = newMilvusError("metric not found", 1200, false) - // Topic related - ErrTopicNotFound = newMilvusError("topic not found", 1300, false) - ErrTopicNotEmpty = newMilvusError("topic not empty", 1301, false) + // Message queue related + ErrMqTopicNotFound = newMilvusError("topic not found", 1300, false) + ErrMqTopicNotEmpty = newMilvusError("topic not empty", 1301, false) + ErrMqInternal = newMilvusError("message queue internal error", 1302, false) // field related ErrFieldNotFound = newMilvusError("field not found", 1700, false) // high-level restful api related - ErrNeedAuthenticate = newMilvusError("user hasn't authenticate", 1800, false) + ErrNeedAuthenticate = newMilvusError("user hasn't authenticated", 1800, false) ErrIncorrectParameterFormat = newMilvusError("can only accept json format request", 1801, false) ErrMissingRequiredParameters = newMilvusError("missing required parameters", 1802, false) ErrMarshalCollectionSchema = newMilvusError("fail to marshal collection schema", 1803, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 4710ff7eb2..5cdf35c9ee 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -126,9 +126,10 @@ func (s *ErrSuite) TestWrap() { // Metrics related s.ErrorIs(WrapErrMetricNotFound("unknown", "failed to get metric"), ErrMetricNotFound) - // Topic related - s.ErrorIs(WrapErrTopicNotFound("unknown", "failed to get topic"), ErrTopicNotFound) - s.ErrorIs(WrapErrTopicNotEmpty("unknown", "topic is not empty"), ErrTopicNotEmpty) + // Message queue related + s.ErrorIs(WrapErrMqTopicNotFound("unknown", "failed to get topic"), ErrMqTopicNotFound) + s.ErrorIs(WrapErrMqTopicNotEmpty("unknown", "topic is not empty"), ErrMqTopicNotEmpty) + s.ErrorIs(WrapErrMqInternal(errors.New("unknown"), "failed to consume"), ErrMqInternal) // field related s.ErrorIs(WrapErrFieldNotFound("meta", "failed to get field"), ErrFieldNotFound) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 74250fc1f8..79fa1ea639 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -205,7 +205,7 @@ func WrapErrServiceDiskLimitExceeded(predict, limit float32, msg ...string) erro } func WrapErrDatabaseNotFound(database any, msg ...string) error { - err := wrapWithField(ErrDatabaseNotfound, "database", database) + err := wrapWithField(ErrDatabaseNotFound, "database", database) if len(msg) > 0 { err = errors.Wrap(err, strings.Join(msg, "; ")) } @@ -221,7 +221,7 @@ func WrapErrDatabaseResourceLimitExceeded(msg ...string) error { } func WrapErrInvalidedDatabaseName(database any, msg ...string) error { - err := wrapWithField(ErrInvalidedDatabaseName, "database", database) + err := wrapWithField(ErrDatabaseInvalidName, "database", database) if len(msg) > 0 { err = errors.Wrap(err, strings.Join(msg, "; ")) } @@ -484,17 +484,25 @@ func WrapErrMetricNotFound(name string, msg ...string) error { return err } -// Topic related -func WrapErrTopicNotFound(name string, msg ...string) error { - err := errors.Wrapf(ErrTopicNotFound, "topic=%s", name) +// Message queue related +func WrapErrMqTopicNotFound(name string, msg ...string) error { + err := errors.Wrapf(ErrMqTopicNotFound, "topic=%s", name) if len(msg) > 0 { err = errors.Wrap(err, strings.Join(msg, "; ")) } return err } -func WrapErrTopicNotEmpty(name string, msg ...string) error { - err := errors.Wrapf(ErrTopicNotEmpty, "topic=%s", name) +func WrapErrMqTopicNotEmpty(name string, msg ...string) error { + err := errors.Wrapf(ErrMqTopicNotEmpty, "topic=%s", name) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + +func WrapErrMqInternal(err error, msg ...string) error { + err = errors.Wrapf(ErrMqInternal, "internal=%v", err) if len(msg) > 0 { err = errors.Wrap(err, strings.Join(msg, "; ")) }