enhance: wal adaptor implementation (#34122)

issue: #33285

- add adaptor to implement walimpls into wal interface.
- implement timetick sorted and filtering scanner.
- add test for wal.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
chyezh 2024-07-04 15:23:08 +08:00 committed by GitHub
parent e4cece8de8
commit 7611128e57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 2342 additions and 31 deletions

2
go.mod
View File

@ -65,6 +65,7 @@ require (
require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70
require (
github.com/cockroachdb/redact v1.1.3
github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
@ -102,7 +103,6 @@ require (
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/cilium/ebpf v0.11.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/confluentinc/confluent-kafka-go v1.9.1 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect

View File

@ -0,0 +1,302 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_grpc
import (
context "context"
metadata "google.golang.org/grpc/metadata"
mock "github.com/stretchr/testify/mock"
)
// MockClientStream is an autogenerated mock type for the ClientStream type
type MockClientStream struct {
mock.Mock
}
type MockClientStream_Expecter struct {
mock *mock.Mock
}
func (_m *MockClientStream) EXPECT() *MockClientStream_Expecter {
return &MockClientStream_Expecter{mock: &_m.Mock}
}
// CloseSend provides a mock function with given fields:
func (_m *MockClientStream) CloseSend() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockClientStream_CloseSend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CloseSend'
type MockClientStream_CloseSend_Call struct {
*mock.Call
}
// CloseSend is a helper method to define mock.On call
func (_e *MockClientStream_Expecter) CloseSend() *MockClientStream_CloseSend_Call {
return &MockClientStream_CloseSend_Call{Call: _e.mock.On("CloseSend")}
}
func (_c *MockClientStream_CloseSend_Call) Run(run func()) *MockClientStream_CloseSend_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockClientStream_CloseSend_Call) Return(_a0 error) *MockClientStream_CloseSend_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockClientStream_CloseSend_Call) RunAndReturn(run func() error) *MockClientStream_CloseSend_Call {
_c.Call.Return(run)
return _c
}
// Context provides a mock function with given fields:
func (_m *MockClientStream) Context() context.Context {
ret := _m.Called()
var r0 context.Context
if rf, ok := ret.Get(0).(func() context.Context); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(context.Context)
}
}
return r0
}
// MockClientStream_Context_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Context'
type MockClientStream_Context_Call struct {
*mock.Call
}
// Context is a helper method to define mock.On call
func (_e *MockClientStream_Expecter) Context() *MockClientStream_Context_Call {
return &MockClientStream_Context_Call{Call: _e.mock.On("Context")}
}
func (_c *MockClientStream_Context_Call) Run(run func()) *MockClientStream_Context_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockClientStream_Context_Call) Return(_a0 context.Context) *MockClientStream_Context_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockClientStream_Context_Call) RunAndReturn(run func() context.Context) *MockClientStream_Context_Call {
_c.Call.Return(run)
return _c
}
// Header provides a mock function with given fields:
func (_m *MockClientStream) Header() (metadata.MD, error) {
ret := _m.Called()
var r0 metadata.MD
var r1 error
if rf, ok := ret.Get(0).(func() (metadata.MD, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() metadata.MD); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(metadata.MD)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockClientStream_Header_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Header'
type MockClientStream_Header_Call struct {
*mock.Call
}
// Header is a helper method to define mock.On call
func (_e *MockClientStream_Expecter) Header() *MockClientStream_Header_Call {
return &MockClientStream_Header_Call{Call: _e.mock.On("Header")}
}
func (_c *MockClientStream_Header_Call) Run(run func()) *MockClientStream_Header_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockClientStream_Header_Call) Return(_a0 metadata.MD, _a1 error) *MockClientStream_Header_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockClientStream_Header_Call) RunAndReturn(run func() (metadata.MD, error)) *MockClientStream_Header_Call {
_c.Call.Return(run)
return _c
}
// RecvMsg provides a mock function with given fields: m
func (_m *MockClientStream) RecvMsg(m interface{}) error {
ret := _m.Called(m)
var r0 error
if rf, ok := ret.Get(0).(func(interface{}) error); ok {
r0 = rf(m)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockClientStream_RecvMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecvMsg'
type MockClientStream_RecvMsg_Call struct {
*mock.Call
}
// RecvMsg is a helper method to define mock.On call
// - m interface{}
func (_e *MockClientStream_Expecter) RecvMsg(m interface{}) *MockClientStream_RecvMsg_Call {
return &MockClientStream_RecvMsg_Call{Call: _e.mock.On("RecvMsg", m)}
}
func (_c *MockClientStream_RecvMsg_Call) Run(run func(m interface{})) *MockClientStream_RecvMsg_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(interface{}))
})
return _c
}
func (_c *MockClientStream_RecvMsg_Call) Return(_a0 error) *MockClientStream_RecvMsg_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockClientStream_RecvMsg_Call) RunAndReturn(run func(interface{}) error) *MockClientStream_RecvMsg_Call {
_c.Call.Return(run)
return _c
}
// SendMsg provides a mock function with given fields: m
func (_m *MockClientStream) SendMsg(m interface{}) error {
ret := _m.Called(m)
var r0 error
if rf, ok := ret.Get(0).(func(interface{}) error); ok {
r0 = rf(m)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockClientStream_SendMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendMsg'
type MockClientStream_SendMsg_Call struct {
*mock.Call
}
// SendMsg is a helper method to define mock.On call
// - m interface{}
func (_e *MockClientStream_Expecter) SendMsg(m interface{}) *MockClientStream_SendMsg_Call {
return &MockClientStream_SendMsg_Call{Call: _e.mock.On("SendMsg", m)}
}
func (_c *MockClientStream_SendMsg_Call) Run(run func(m interface{})) *MockClientStream_SendMsg_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(interface{}))
})
return _c
}
func (_c *MockClientStream_SendMsg_Call) Return(_a0 error) *MockClientStream_SendMsg_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockClientStream_SendMsg_Call) RunAndReturn(run func(interface{}) error) *MockClientStream_SendMsg_Call {
_c.Call.Return(run)
return _c
}
// Trailer provides a mock function with given fields:
func (_m *MockClientStream) Trailer() metadata.MD {
ret := _m.Called()
var r0 metadata.MD
if rf, ok := ret.Get(0).(func() metadata.MD); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(metadata.MD)
}
}
return r0
}
// MockClientStream_Trailer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Trailer'
type MockClientStream_Trailer_Call struct {
*mock.Call
}
// Trailer is a helper method to define mock.On call
func (_e *MockClientStream_Expecter) Trailer() *MockClientStream_Trailer_Call {
return &MockClientStream_Trailer_Call{Call: _e.mock.On("Trailer")}
}
func (_c *MockClientStream_Trailer_Call) Run(run func()) *MockClientStream_Trailer_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockClientStream_Trailer_Call) Return(_a0 metadata.MD) *MockClientStream_Trailer_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockClientStream_Trailer_Call) RunAndReturn(run func() metadata.MD) *MockClientStream_Trailer_Call {
_c.Call.Return(run)
return _c
}
// NewMockClientStream creates a new instance of MockClientStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockClientStream(t interface {
mock.TestingT
Cleanup(func())
}) *MockClientStream {
mock := &MockClientStream{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -34,6 +34,7 @@ message VChannelInfo {
string name = 1;
}
// DeliverPolicy is the policy to deliver message.
message DeliverPolicy {
oneof policy {
google.protobuf.Empty all = 1; // deliver all messages.
@ -42,3 +43,48 @@ message DeliverPolicy {
MessageID start_after = 4; // deliver message after this message id. (startAfter, ...]
}
}
// DeliverFilter is the filter to deliver message.
message DeliverFilter {
oneof filter {
DeliverFilterTimeTickGT time_tick_gt = 1;
DeliverFilterTimeTickGTE time_tick_gte = 2;
DeliverFilterVChannel vchannel = 3;
}
}
// DeliverFilterTimeTickGT is the filter to deliver message with time tick greater than this value.
message DeliverFilterTimeTickGT {
uint64 time_tick = 1; // deliver message with time tick greater than this value.
}
// DeliverFilterTimeTickGTE is the filter to deliver message with time tick greater than or equal to this value.
message DeliverFilterTimeTickGTE {
uint64 time_tick = 1; // deliver message with time tick greater than or equal to this value.
}
// DeliverFilterVChannel is the filter to deliver message with vchannel name.
message DeliverFilterVChannel {
string vchannel = 1; // deliver message with vchannel name.
}
// StreamingCode is the error code for log internal component.
enum StreamingCode {
STREAMING_CODE_OK = 0;
STREAMING_CODE_CHANNEL_EXIST = 1; // channel already exist
STREAMING_CODE_CHANNEL_NOT_EXIST = 2; // channel not exist
STREAMING_CODE_CHANNEL_FENCED = 3; // channel is fenced
STREAMING_CODE_ON_SHUTDOWN = 4; // component is on shutdown
STREAMING_CODE_INVALID_REQUEST_SEQ = 5; // invalid request sequence
STREAMING_CODE_UNMATCHED_CHANNEL_TERM = 6; // unmatched channel term
STREAMING_CODE_IGNORED_OPERATION = 7; // ignored operation
STREAMING_CODE_INNER = 8; // underlying service failure.
STREAMING_CODE_EOF = 9; // end of stream, generated by grpc status.
STREAMING_CODE_UNKNOWN = 999; // unknown error
}
// StreamingError is the error type for log internal component.
message StreamingError {
StreamingCode code = 1;
string cause = 2;
}

View File

@ -2,6 +2,8 @@ package adaptor
import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
)
@ -22,11 +24,12 @@ func (b builderAdaptorImpl) Name() string {
}
func (b builderAdaptorImpl) Build() (wal.Opener, error) {
_, err := b.builder.Build()
o, err := b.builder.Build()
if err != nil {
return nil, err
}
return nil, nil
// TODO: wait for implementation.
// return adaptImplsToOpener(o), nil
// Add all interceptor here.
return adaptImplsToOpener(o, []interceptors.InterceptorBuilder{
timetick.NewInterceptorBuilder(),
}), nil
}

View File

@ -0,0 +1,86 @@
package adaptor
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ wal.Opener = (*openerAdaptorImpl)(nil)
// adaptImplsToOpener creates a new wal opener with opener impls.
func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.InterceptorBuilder) wal.Opener {
return &openerAdaptorImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
opener: opener,
idAllocator: util.NewIDAllocator(),
walInstances: typeutil.NewConcurrentMap[int64, wal.WAL](),
interceptorBuilders: builders,
}
}
// openerAdaptorImpl is the wrapper of OpenerImpls to Opener.
type openerAdaptorImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
opener walimpls.OpenerImpls
idAllocator *util.IDAllocator
walInstances *typeutil.ConcurrentMap[int64, wal.WAL] // store all wal instances allocated by these allocator.
interceptorBuilders []interceptors.InterceptorBuilder
}
// Open opens a wal instance for the channel.
func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) {
if o.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal opener is on shutdown")
}
defer o.lifetime.Done()
id := o.idAllocator.Allocate()
log := log.With(zap.Any("channel", opt.Channel), zap.Int64("id", id))
l, err := o.opener.Open(ctx, &walimpls.OpenOption{
Channel: opt.Channel,
})
if err != nil {
log.Warn("open wal failed", zap.Error(err))
return nil, err
}
// wrap the wal into walExtend with cleanup function and interceptors.
wal := adaptImplsToWAL(l, o.interceptorBuilders, func() {
o.walInstances.Remove(id)
log.Info("wal deleted from allocator")
})
o.walInstances.Insert(id, wal)
log.Info("new wal created")
metrics.StreamingNodeWALTotal.WithLabelValues(paramtable.GetStringNodeID()).Inc()
return wal, nil
}
// Close the wal opener, release the underlying resources.
func (o *openerAdaptorImpl) Close() {
o.lifetime.SetState(lifetime.Stopped)
o.lifetime.Wait()
o.lifetime.Close()
// close all wal instances.
o.walInstances.Range(func(id int64, l wal.WAL) bool {
l.Close()
log.Info("close wal by opener", zap.Int64("id", id), zap.Any("channel", l.Channel()))
return true
})
// close the opener
o.opener.Close()
}

View File

@ -0,0 +1,117 @@
package adaptor
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func TestMain(m *testing.M) {
paramtable.Init()
m.Run()
}
func TestOpenerAdaptorFailure(t *testing.T) {
basicOpener := mock_walimpls.NewMockOpenerImpls(t)
errExpected := errors.New("test")
basicOpener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, boo *walimpls.OpenOption) (walimpls.WALImpls, error) {
return nil, errExpected
})
opener := adaptImplsToOpener(basicOpener, nil)
l, err := opener.Open(context.Background(), &wal.OpenOption{})
assert.ErrorIs(t, err, errExpected)
assert.Nil(t, l)
}
func TestOpenerAdaptor(t *testing.T) {
// Build basic opener.
basicOpener := mock_walimpls.NewMockOpenerImpls(t)
basicOpener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, boo *walimpls.OpenOption) (walimpls.WALImpls, error) {
wal := mock_walimpls.NewMockWALImpls(t)
wal.EXPECT().Channel().Return(boo.Channel)
wal.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil
})
wal.EXPECT().Close().Run(func() {})
return wal, nil
})
basicOpener.EXPECT().Close().Run(func() {})
// Create a opener with mock basic opener.
opener := adaptImplsToOpener(basicOpener, nil)
// Test in concurrency env.
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
wal, err := opener.Open(context.Background(), &wal.OpenOption{
Channel: types.PChannelInfo{
Name: fmt.Sprintf("test_%d", i),
Term: int64(i),
ServerID: 1,
},
})
if err != nil {
assert.Nil(t, wal)
assertShutdownError(t, err)
return
}
assert.NotNil(t, wal)
for {
msgID, err := wal.Append(context.Background(), nil)
time.Sleep(time.Millisecond * 10)
if err != nil {
assert.Nil(t, msgID)
assertShutdownError(t, err)
return
}
}
}(i)
}
time.Sleep(time.Second * 1)
opener.Close()
// All wal should be closed with Opener.
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-time.After(time.Second * 3):
t.Errorf("opener close should be fast")
case <-ch:
}
// open a wal after opener closed should return shutdown error.
_, err := opener.Open(context.Background(), &wal.OpenOption{
Channel: types.PChannelInfo{
Name: "test_after_close",
Term: int64(1),
ServerID: 1,
},
})
assertShutdownError(t, err)
}

View File

@ -0,0 +1,121 @@
package adaptor
import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/helper"
)
var _ wal.Scanner = (*scannerAdaptorImpl)(nil)
// newScannerAdaptor creates a new scanner adaptor.
func newScannerAdaptor(
name string,
l walimpls.WALImpls,
readOption wal.ReadOption,
cleanup func(),
) wal.Scanner {
s := &scannerAdaptorImpl{
innerWAL: l,
readOption: readOption,
sendingCh: make(chan message.ImmutableMessage, 1),
reorderBuffer: utility.NewReOrderBuffer(),
pendingQueue: utility.NewImmutableMessageQueue(),
cleanup: cleanup,
ScannerHelper: helper.NewScannerHelper(name),
}
go s.executeConsume()
return s
}
// scannerAdaptorImpl is a wrapper of ScannerImpls to extend it into a Scanner interface.
type scannerAdaptorImpl struct {
*helper.ScannerHelper
innerWAL walimpls.WALImpls
readOption wal.ReadOption
sendingCh chan message.ImmutableMessage
reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now.
pendingQueue *utility.ImmutableMessageQueue //
cleanup func()
}
// Chan returns the channel of message.
func (s *scannerAdaptorImpl) Chan() <-chan message.ImmutableMessage {
return s.sendingCh
}
// Close the scanner, release the underlying resources.
// Return the error same with `Error`
func (s *scannerAdaptorImpl) Close() error {
err := s.ScannerHelper.Close()
if s.cleanup != nil {
s.cleanup()
}
return err
}
func (s *scannerAdaptorImpl) executeConsume() {
defer close(s.sendingCh)
innerScanner, err := s.innerWAL.Read(s.Context(), walimpls.ReadOption{
Name: s.Name(),
DeliverPolicy: s.readOption.DeliverPolicy,
})
if err != nil {
s.Finish(err)
return
}
defer innerScanner.Close()
for {
// generate the event channel and do the event loop.
// TODO: Consume from local cache.
upstream, sending := s.getEventCh(innerScanner)
select {
case <-s.Context().Done():
s.Finish(err)
return
case msg, ok := <-upstream:
if !ok {
s.Finish(innerScanner.Error())
return
}
s.handleUpstream(msg)
case sending <- s.pendingQueue.Next():
s.pendingQueue.UnsafeAdvance()
}
}
}
func (s *scannerAdaptorImpl) getEventCh(scanner walimpls.ScannerImpls) (<-chan message.ImmutableMessage, chan<- message.ImmutableMessage) {
if s.pendingQueue.Len() == 0 {
// If pending queue is empty,
// no more message can be sent,
// we always need to recv message from upstream to avoid starve.
return scanner.Chan(), nil
}
// TODO: configurable pending count.
if s.pendingQueue.Len()+s.reorderBuffer.Len() > 1024 {
return nil, s.sendingCh
}
return scanner.Chan(), s.sendingCh
}
func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
if msg.MessageType() == message.MessageTypeTimeTick {
// If the time tick message incoming,
// the reorder buffer can be consumed into a pending queue with latest timetick.
// TODO: !!! should we drop the unexpected broken timetick rule message.
s.pendingQueue.Add(s.reorderBuffer.PopUtilTimeTick(msg.TimeTick()))
return
}
// Filtering the message if needed.
if s.readOption.MessageFilter != nil && !s.readOption.MessageFilter(msg) {
return
}
// otherwise add message into reorder buffer directly.
s.reorderBuffer.Push(msg)
}

View File

@ -0,0 +1,29 @@
package adaptor
import (
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
)
func TestScannerAdaptorReadError(t *testing.T) {
err := errors.New("read error")
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err)
s := newScannerAdaptor("scanner", l, wal.ReadOption{
DeliverPolicy: options.DeliverPolicyAll(),
MessageFilter: nil,
}, func() {})
defer s.Close()
<-s.Chan()
<-s.Done()
assert.ErrorIs(t, s.Error(), err)
}

View File

@ -0,0 +1,31 @@
package adaptor
import (
"fmt"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)
type scannerRegistry struct {
channel types.PChannelInfo
idAllocator *util.IDAllocator
}
// AllocateScannerName a scanner name for a scanner.
// The scanner name should be persistent on meta for garbage clean up.
func (m *scannerRegistry) AllocateScannerName() (string, error) {
name := m.newSubscriptionName()
// TODO: persistent the subscription name on meta.
return name, nil
}
func (m *scannerRegistry) RegisterNewScanner(string, wal.Scanner) {
}
// newSubscriptionName generates a new subscription name.
func (m *scannerRegistry) newSubscriptionName() string {
id := m.idAllocator.Allocate()
return fmt.Sprintf("%s/%d/%d", m.channel.Name, m.channel.Term, id)
}

View File

@ -0,0 +1,152 @@
package adaptor
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ wal.WAL = (*walAdaptorImpl)(nil)
// adaptImplsToWAL creates a new wal from wal impls.
func adaptImplsToWAL(
basicWAL walimpls.WALImpls,
builders []interceptors.InterceptorBuilder,
cleanup func(),
) wal.WAL {
param := interceptors.InterceptorBuildParam{
WALImpls: basicWAL,
WAL: syncutil.NewFuture[wal.WAL](),
}
interceptor := buildInterceptor(builders, param)
wal := &walAdaptorImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
idAllocator: util.NewIDAllocator(),
inner: basicWAL,
// TODO: make the pool size configurable.
appendExecutionPool: conc.NewPool[struct{}](10),
interceptor: interceptor,
scannerRegistry: scannerRegistry{
channel: basicWAL.Channel(),
idAllocator: util.NewIDAllocator(),
},
scanners: typeutil.NewConcurrentMap[int64, wal.Scanner](),
cleanup: cleanup,
}
param.WAL.Set(wal)
return wal
}
// walAdaptorImpl is a wrapper of WALImpls to extend it into a WAL interface.
type walAdaptorImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
idAllocator *util.IDAllocator
inner walimpls.WALImpls
appendExecutionPool *conc.Pool[struct{}]
interceptor interceptors.InterceptorWithReady
scannerRegistry scannerRegistry
scanners *typeutil.ConcurrentMap[int64, wal.Scanner]
cleanup func()
}
// Channel returns the channel info of wal.
func (w *walAdaptorImpl) Channel() types.PChannelInfo {
return w.inner.Channel()
}
// Append writes a record to the log.
func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
if w.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal is on shutdown")
}
defer w.lifetime.Done()
// Check if interceptor is ready.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-w.interceptor.Ready():
}
// Execute the interceptor and wal append.
return w.interceptor.DoAppend(ctx, msg, w.inner.Append)
}
// AppendAsync writes a record to the log asynchronously.
func (w *walAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) {
if w.lifetime.Add(lifetime.IsWorking) != nil {
cb(nil, status.NewOnShutdownError("wal is on shutdown"))
return
}
// Submit async append to a background execution pool.
_ = w.appendExecutionPool.Submit(func() (struct{}, error) {
defer w.lifetime.Done()
msgID, err := w.inner.Append(ctx, msg)
cb(msgID, err)
return struct{}{}, nil
})
}
// Read returns a scanner for reading records from the wal.
func (w *walAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Scanner, error) {
if w.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal is on shutdown")
}
defer w.lifetime.Done()
name, err := w.scannerRegistry.AllocateScannerName()
if err != nil {
return nil, err
}
// wrap the scanner with cleanup function.
id := w.idAllocator.Allocate()
s := newScannerAdaptor(name, w.inner, opts, func() {
w.scanners.Remove(id)
})
w.scanners.Insert(id, s)
return s, nil
}
// Close overrides Scanner Close function.
func (w *walAdaptorImpl) Close() {
w.lifetime.SetState(lifetime.Stopped)
w.lifetime.Wait()
w.lifetime.Close()
// close all wal instances.
w.scanners.Range(func(id int64, s wal.Scanner) bool {
s.Close()
log.Info("close scanner by wal extend", zap.Int64("id", id), zap.Any("channel", w.Channel()))
return true
})
w.inner.Close()
w.interceptor.Close()
w.appendExecutionPool.Free()
w.cleanup()
}
// newWALWithInterceptors creates a new wal with interceptors.
func buildInterceptor(builders []interceptors.InterceptorBuilder, param interceptors.InterceptorBuildParam) interceptors.InterceptorWithReady {
// Build all interceptors.
builtIterceptors := make([]interceptors.BasicInterceptor, 0, len(builders))
for _, b := range builders {
builtIterceptors = append(builtIterceptors, b.Build(param))
}
return interceptors.NewChainedInterceptor(builtIterceptors...)
}

View File

@ -0,0 +1,163 @@
package adaptor
import (
"context"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_interceptors"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
)
func TestWalAdaptorReadFail(t *testing.T) {
l := mock_walimpls.NewMockWALImpls(t)
expectedErr := errors.New("test")
l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, ro walimpls.ReadOption) (walimpls.ScannerImpls, error) {
return nil, expectedErr
})
lAdapted := adaptImplsToWAL(l, nil, func() {})
scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{})
assert.NoError(t, err)
assert.NotNil(t, scanner)
assert.ErrorIs(t, scanner.Error(), expectedErr)
}
func TestWALAdaptor(t *testing.T) {
// Create a mock WAL implementation
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil
})
l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, ro walimpls.ReadOption) (walimpls.ScannerImpls, error) {
scanner := mock_walimpls.NewMockScannerImpls(t)
ch := make(chan message.ImmutableMessage, 1)
scanner.EXPECT().Chan().Return(ch)
scanner.EXPECT().Close().RunAndReturn(func() error {
close(ch)
return nil
})
return scanner, nil
})
l.EXPECT().Close().Return()
lAdapted := adaptImplsToWAL(l, nil, func() {})
assert.NotNil(t, lAdapted.Channel())
_, err := lAdapted.Append(context.Background(), nil)
assert.NoError(t, err)
lAdapted.AppendAsync(context.Background(), nil, func(mi message.MessageID, err error) {
assert.Nil(t, err)
})
// Test in concurrency env.
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{})
if err != nil {
assertShutdownError(t, err)
return
}
assert.NoError(t, err)
<-scanner.Chan()
}(i)
}
time.Sleep(time.Second * 1)
lAdapted.Close()
// All wal should be closed with Opener.
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-time.After(time.Second * 3):
t.Errorf("wal close should be fast")
case <-ch:
}
_, err = lAdapted.Append(context.Background(), nil)
assertShutdownError(t, err)
lAdapted.AppendAsync(context.Background(), nil, func(mi message.MessageID, err error) {
assertShutdownError(t, err)
})
_, err = lAdapted.Read(context.Background(), wal.ReadOption{})
assertShutdownError(t, err)
}
func assertShutdownError(t *testing.T, err error) {
e := status.AsStreamingError(err)
assert.Equal(t, e.Code, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN)
}
func TestNoInterceptor(t *testing.T) {
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil
})
l.EXPECT().Close().Run(func() {})
lWithInterceptors := adaptImplsToWAL(l, nil, func() {})
_, err := lWithInterceptors.Append(context.Background(), nil)
assert.NoError(t, err)
lWithInterceptors.Close()
}
func TestWALWithInterceptor(t *testing.T) {
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil
})
l.EXPECT().Close().Run(func() {})
b := mock_interceptors.NewMockInterceptorBuilder(t)
readyCh := make(chan struct{})
b.EXPECT().Build(mock.Anything).RunAndReturn(func(ibp interceptors.InterceptorBuildParam) interceptors.BasicInterceptor {
interceptor := mock_interceptors.NewMockInterceptorWithReady(t)
interceptor.EXPECT().Ready().Return(readyCh)
interceptor.EXPECT().DoAppend(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage, f func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) {
return f(ctx, mm)
})
interceptor.EXPECT().Close().Run(func() {})
return interceptor
})
lWithInterceptors := adaptImplsToWAL(l, []interceptors.InterceptorBuilder{b}, func() {})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// Interceptor is not ready, so the append/read will be blocked until timeout.
_, err := lWithInterceptors.Append(ctx, nil)
assert.ErrorIs(t, err, context.DeadlineExceeded)
// Interceptor is ready, so the append/read will return soon.
close(readyCh)
_, err = lWithInterceptors.Append(context.Background(), nil)
assert.NoError(t, err)
lWithInterceptors.Close()
}

View File

@ -0,0 +1,339 @@
package adaptor_test
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/remeh/sizedwaitgroup"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource/timestamp"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
type walTestFramework struct {
b wal.OpenerBuilder
t *testing.T
messageCount int
}
func TestWAL(t *testing.T) {
rc := timestamp.NewMockRootCoordClient(t)
resource.InitForTest(resource.OptRootCoordClient(rc))
b := registry.MustGetBuilder(walimplstest.WALName)
f := &walTestFramework{
b: b,
t: t,
messageCount: 1000,
}
f.Run()
}
func (f *walTestFramework) Run() {
wg := sync.WaitGroup{}
loopCnt := 3
wg.Add(loopCnt)
o, err := f.b.Build()
assert.NoError(f.t, err)
assert.NotNil(f.t, o)
defer o.Close()
for i := 0; i < loopCnt; i++ {
go func(i int) {
defer wg.Done()
f.runOnce(fmt.Sprintf("pchannel-%d", i), o)
}(i)
}
wg.Wait()
}
func (f *walTestFramework) runOnce(pchannel string, o wal.Opener) {
f2 := &testOneWALFramework{
t: f.t,
opener: o,
pchannel: pchannel,
messageCount: f.messageCount,
term: 1,
}
f2.Run()
}
type testOneWALFramework struct {
t *testing.T
opener wal.Opener
written []message.ImmutableMessage
pchannel string
messageCount int
term int
}
func (f *testOneWALFramework) Run() {
ctx := context.Background()
for ; f.term <= 3; f.term++ {
pChannel := types.PChannelInfo{
Name: f.pchannel,
Term: int64(f.term),
ServerID: 1,
}
w, err := f.opener.Open(ctx, &wal.OpenOption{
Channel: pChannel,
})
assert.NoError(f.t, err)
assert.NotNil(f.t, w)
assert.Equal(f.t, pChannel.Name, w.Channel().Name)
assert.Equal(f.t, pChannel.ServerID, w.Channel().ServerID)
f.testReadAndWrite(ctx, w)
// close the wal
w.Close()
}
}
func (f *testOneWALFramework) testReadAndWrite(ctx context.Context, w wal.WAL) {
// Test read and write.
wg := sync.WaitGroup{}
wg.Add(3)
var newWritten []message.ImmutableMessage
var read1, read2 []message.ImmutableMessage
go func() {
defer wg.Done()
var err error
newWritten, err = f.testAppend(ctx, w)
assert.NoError(f.t, err)
}()
go func() {
defer wg.Done()
var err error
read1, err = f.testRead(ctx, w)
assert.NoError(f.t, err)
}()
go func() {
defer wg.Done()
var err error
read2, err = f.testRead(ctx, w)
assert.NoError(f.t, err)
}()
wg.Wait()
// read result should be sorted by timetick.
f.assertSortByTimeTickMessageList(read1)
f.assertSortByTimeTickMessageList(read2)
// all written messages should be read.
sort.Sort(sortByMessageID(newWritten))
f.written = append(f.written, newWritten...)
sort.Sort(sortByMessageID(read1))
sort.Sort(sortByMessageID(read2))
f.assertEqualMessageList(f.written, read1)
f.assertEqualMessageList(f.written, read2)
// test read with option
f.testReadWithOption(ctx, w)
}
func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]message.ImmutableMessage, error) {
messages := make([]message.ImmutableMessage, f.messageCount)
swg := sizedwaitgroup.New(10)
for i := 0; i < f.messageCount-1; i++ {
swg.Add()
go func(i int) {
defer swg.Done()
time.Sleep(time.Duration(5+rand.Int31n(10)) * time.Millisecond)
// ...rocksmq has a dirty implement of properties,
// without commonpb.MsgHeader, it can not work.
header := commonpb.MsgHeader{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: int64(i),
},
}
payload, err := proto.Marshal(&header)
if err != nil {
panic(err)
}
properties := map[string]string{
"id": fmt.Sprintf("%d", i),
"const": "t",
}
typ := message.MessageTypeUnknown
msg := message.NewMutableMessageBuilder().
WithMessageType(typ).
WithPayload(payload).
WithProperties(properties).
BuildMutable()
id, err := w.Append(ctx, msg)
assert.NoError(f.t, err)
assert.NotNil(f.t, id)
messages[i] = msg.IntoImmutableMessage(id)
}(i)
}
swg.Wait()
// send a final hint message
header := commonpb.MsgHeader{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: int64(f.messageCount - 1),
},
}
payload, err := proto.Marshal(&header)
if err != nil {
panic(err)
}
properties := map[string]string{
"id": fmt.Sprintf("%d", f.messageCount-1),
"const": "t",
"term": strconv.FormatInt(int64(f.term), 10),
}
msg := message.NewMutableMessageBuilder().
WithPayload(payload).
WithProperties(properties).
WithMessageType(message.MessageTypeUnknown).
BuildMutable()
id, err := w.Append(ctx, msg)
assert.NoError(f.t, err)
messages[f.messageCount-1] = msg.IntoImmutableMessage(id)
return messages, nil
}
func (f *testOneWALFramework) testRead(ctx context.Context, w wal.WAL) ([]message.ImmutableMessage, error) {
s, err := w.Read(ctx, wal.ReadOption{
DeliverPolicy: options.DeliverPolicyAll(),
})
assert.NoError(f.t, err)
defer s.Close()
expectedCnt := f.messageCount + len(f.written)
msgs := make([]message.ImmutableMessage, 0, expectedCnt)
for {
msg, ok := <-s.Chan()
assert.NotNil(f.t, msg)
assert.True(f.t, ok)
msgs = append(msgs, msg)
termString, ok := msg.Properties().Get("term")
if !ok {
continue
}
term, err := strconv.ParseInt(termString, 10, 64)
if err != nil {
panic(err)
}
if int(term) == f.term {
break
}
}
return msgs, nil
}
func (f *testOneWALFramework) testReadWithOption(ctx context.Context, w wal.WAL) {
loopCount := 5
wg := sync.WaitGroup{}
wg.Add(loopCount)
for i := 0; i < loopCount; i++ {
go func() {
defer wg.Done()
idx := rand.Int31n(int32(len(f.written)))
// Test other read options.
// Test start from some message and timetick is gte than it.
readFromMsg := f.written[idx]
s, err := w.Read(ctx, wal.ReadOption{
DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsg.LastConfirmedMessageID()),
MessageFilter: func(im message.ImmutableMessage) bool {
return im.TimeTick() >= readFromMsg.TimeTick()
},
})
assert.NoError(f.t, err)
maxTimeTick := f.maxTimeTickWritten()
msgCount := 0
lastTimeTick := readFromMsg.TimeTick() - 1
for {
msg, ok := <-s.Chan()
msgCount++
assert.NotNil(f.t, msg)
assert.True(f.t, ok)
assert.Greater(f.t, msg.TimeTick(), lastTimeTick)
lastTimeTick = msg.TimeTick()
if msg.TimeTick() >= maxTimeTick {
break
}
}
// shouldn't lost any message.
assert.Equal(f.t, f.countTheTimeTick(readFromMsg.TimeTick()), msgCount)
s.Close()
}()
}
wg.Wait()
}
func (f *testOneWALFramework) assertSortByTimeTickMessageList(msgs []message.ImmutableMessage) {
for i := 1; i < len(msgs); i++ {
assert.Less(f.t, msgs[i-1].TimeTick(), msgs[i].TimeTick())
}
}
func (f *testOneWALFramework) assertEqualMessageList(msgs1 []message.ImmutableMessage, msgs2 []message.ImmutableMessage) {
assert.Equal(f.t, len(msgs2), len(msgs1))
for i := 0; i < len(msgs1); i++ {
assert.True(f.t, msgs1[i].MessageID().EQ(msgs2[i].MessageID()))
// assert.True(f.t, bytes.Equal(msgs1[i].Payload(), msgs2[i].Payload()))
id1, ok1 := msgs1[i].Properties().Get("id")
id2, ok2 := msgs2[i].Properties().Get("id")
assert.True(f.t, ok1)
assert.True(f.t, ok2)
assert.Equal(f.t, id1, id2)
id1, ok1 = msgs1[i].Properties().Get("const")
id2, ok2 = msgs2[i].Properties().Get("const")
assert.True(f.t, ok1)
assert.True(f.t, ok2)
assert.Equal(f.t, id1, id2)
}
}
func (f *testOneWALFramework) countTheTimeTick(begin uint64) int {
cnt := 0
for _, m := range f.written {
if m.TimeTick() >= begin {
cnt++
}
}
return cnt
}
func (f *testOneWALFramework) maxTimeTickWritten() uint64 {
maxTimeTick := uint64(0)
for _, m := range f.written {
if m.TimeTick() > maxTimeTick {
maxTimeTick = m.TimeTick()
}
}
return maxTimeTick
}
type sortByMessageID []message.ImmutableMessage
func (a sortByMessageID) Len() int {
return len(a)
}
func (a sortByMessageID) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a sortByMessageID) Less(i, j int) bool {
return a[i].MessageID().LT(a[j].MessageID())
}

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
)
var _ interceptors.AppendInterceptor = (*timeTickAppendInterceptor)(nil)
@ -65,32 +66,11 @@ func (impl *timeTickAppendInterceptor) executeSyncTimeTick(interval time.Duratio
logger.Info("start to sync time tick...")
defer logger.Info("sync time tick stopped")
// Send first timetick message to wal before interceptor is ready.
for count := 0; ; count++ {
// Sent first timetick message to wal before ready.
// New TT is always greater than all tt on previous streamingnode.
// A fencing operation of underlying WAL is needed to make exclusive produce of topic.
// Otherwise, the TT principle may be violated.
// And sendTsMsg must be done, to help ackManager to get first LastConfirmedMessageID
// !!! Send a timetick message into walimpls directly is safe.
select {
case <-impl.ctx.Done():
return
default:
}
if err := impl.sendTsMsg(impl.ctx, underlyingWALImpls.Append); err != nil {
log.Warn("send first timestamp message failed", zap.Error(err), zap.Int("retryCount", count))
// TODO: exponential backoff.
time.Sleep(50 * time.Millisecond)
continue
}
break
if err := impl.blockUntilSyncTimeTickReady(underlyingWALImpls); err != nil {
logger.Warn("sync first time tick failed", zap.Error(err))
return
}
// interceptor is ready now.
close(impl.ready)
logger.Info("start to sync time ready")
// interceptor is ready, wait for the final wal object is ready to use.
wal := param.WAL.Get()
@ -111,6 +91,38 @@ func (impl *timeTickAppendInterceptor) executeSyncTimeTick(interval time.Duratio
}
}
// blockUntilSyncTimeTickReady blocks until the first time tick message is sent.
func (impl *timeTickAppendInterceptor) blockUntilSyncTimeTickReady(underlyingWALImpls walimpls.WALImpls) error {
logger := log.With(zap.Any("channel", underlyingWALImpls.Channel()))
logger.Info("start to sync first time tick")
defer logger.Info("sync first time tick done")
// Send first timetick message to wal before interceptor is ready.
for count := 0; ; count++ {
// Sent first timetick message to wal before ready.
// New TT is always greater than all tt on previous streamingnode.
// A fencing operation of underlying WAL is needed to make exclusive produce of topic.
// Otherwise, the TT principle may be violated.
// And sendTsMsg must be done, to help ackManager to get first LastConfirmedMessageID
// !!! Send a timetick message into walimpls directly is safe.
select {
case <-impl.ctx.Done():
return impl.ctx.Err()
default:
}
if err := impl.sendTsMsg(impl.ctx, underlyingWALImpls.Append); err != nil {
logger.Warn("send first timestamp message failed", zap.Error(err), zap.Int("retryCount", count))
// TODO: exponential backoff.
time.Sleep(50 * time.Millisecond)
continue
}
break
}
// interceptor is ready now.
close(impl.ready)
return nil
}
// syncAcknowledgedDetails syncs the timestamp acknowledged details.
func (impl *timeTickAppendInterceptor) syncAcknowledgedDetails() {
// Sync up and get last confirmed timestamp.

View File

@ -5,10 +5,12 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/options"
)
type MessageFilter = func(message.ImmutableMessage) bool
// ReadOption is the option for reading records from the wal.
type ReadOption struct {
DeliverPolicy options.DeliverPolicy
DeliverOrder options.DeliverOrder
MessageFilter MessageFilter
}
// Scanner is the interface for reading records from the wal.

View File

@ -0,0 +1,51 @@
package utility
import "github.com/milvus-io/milvus/pkg/streaming/util/message"
// NewImmutableMessageQueue create a new immutable message queue.
func NewImmutableMessageQueue() *ImmutableMessageQueue {
return &ImmutableMessageQueue{
pendings: make([][]message.ImmutableMessage, 0),
cnt: 0,
}
}
// ImmutableMessageQueue is a queue of messages.
type ImmutableMessageQueue struct {
pendings [][]message.ImmutableMessage
cnt int
}
// Len return the queue size.
func (pq *ImmutableMessageQueue) Len() int {
return pq.cnt
}
// Add add a slice of message as pending one
func (pq *ImmutableMessageQueue) Add(msgs []message.ImmutableMessage) {
if len(msgs) == 0 {
return
}
pq.pendings = append(pq.pendings, msgs)
pq.cnt += len(msgs)
}
// Next return the next message in pending queue.
func (pq *ImmutableMessageQueue) Next() message.ImmutableMessage {
if len(pq.pendings) != 0 && len(pq.pendings[0]) != 0 {
return pq.pendings[0][0]
}
return nil
}
// UnsafeAdvance do a advance without check.
// !!! Should only be called `Next` do not return nil.
func (pq *ImmutableMessageQueue) UnsafeAdvance() {
if len(pq.pendings[0]) == 1 {
pq.pendings = pq.pendings[1:]
pq.cnt--
return
}
pq.pendings[0] = pq.pendings[0][1:]
pq.cnt--
}

View File

@ -0,0 +1,25 @@
package utility
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
func TestImmutableMessageQueue(t *testing.T) {
q := NewImmutableMessageQueue()
for i := 0; i < 100; i++ {
q.Add([]message.ImmutableMessage{
mock_message.NewMockImmutableMessage(t),
})
assert.Equal(t, i+1, q.Len())
}
for i := 100; i > 0; i-- {
assert.NotNil(t, q.Next())
q.UnsafeAdvance()
assert.Equal(t, i-1, q.Len())
}
}

View File

@ -0,0 +1,45 @@
package utility
import (
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ typeutil.HeapInterface = (*immutableMessageHeap)(nil)
// immutableMessageHeap is a heap underlying represent of timestampAck.
type immutableMessageHeap []message.ImmutableMessage
// Len returns the length of the heap.
func (h immutableMessageHeap) Len() int {
return len(h)
}
// Less returns true if the element at index i is less than the element at index j.
func (h immutableMessageHeap) Less(i, j int) bool {
return h[i].TimeTick() < h[j].TimeTick()
}
// Swap swaps the elements at indexes i and j.
func (h immutableMessageHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Push pushes the last one at len.
func (h *immutableMessageHeap) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(message.ImmutableMessage))
}
// Pop pop the last one at len.
func (h *immutableMessageHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// Peek returns the element at the top of the heap.
func (h *immutableMessageHeap) Peek() interface{} {
return (*h)[0]
}

View File

@ -0,0 +1,29 @@
package utility
import (
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestImmutableMessageHeap(t *testing.T) {
h := typeutil.NewHeap[message.ImmutableMessage](&immutableMessageHeap{})
timeticks := rand.Perm(25)
for _, timetick := range timeticks {
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(uint64(timetick + 1))
h.Push(msg)
}
lastOneTimeTick := uint64(0)
for h.Len() != 0 {
msg := h.Pop()
assert.Greater(t, msg.TimeTick(), lastOneTimeTick)
lastOneTimeTick = msg.TimeTick()
}
}

View File

@ -0,0 +1,38 @@
package utility
import (
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick.
type ReOrderByTimeTickBuffer struct {
messageHeap typeutil.Heap[message.ImmutableMessage]
}
// NewReOrderBuffer creates a new ReOrderBuffer.
func NewReOrderBuffer() *ReOrderByTimeTickBuffer {
return &ReOrderByTimeTickBuffer{
messageHeap: typeutil.NewHeap[message.ImmutableMessage](&immutableMessageHeap{}),
}
}
// Push pushes a message into the buffer.
func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) {
r.messageHeap.Push(msg)
}
// PopUtilTimeTick pops all messages whose time tick is less than or equal to the given time tick.
// The result is sorted by time tick in ascending order.
func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.ImmutableMessage {
var res []message.ImmutableMessage
for r.messageHeap.Len() > 0 && r.messageHeap.Peek().TimeTick() <= timetick {
res = append(res, r.messageHeap.Pop())
}
return res
}
// Len returns the number of messages in the buffer.
func (r *ReOrderByTimeTickBuffer) Len() int {
return r.messageHeap.Len()
}

View File

@ -0,0 +1,43 @@
package utility
import (
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
)
func TestReOrderByTimeTickBuffer(t *testing.T) {
buf := NewReOrderBuffer()
timeticks := rand.Perm(25)
for i, timetick := range timeticks {
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(uint64(timetick + 1))
buf.Push(msg)
assert.Equal(t, i+1, buf.Len())
}
result := buf.PopUtilTimeTick(0)
assert.Len(t, result, 0)
result = buf.PopUtilTimeTick(1)
assert.Len(t, result, 1)
for _, msg := range result {
assert.LessOrEqual(t, msg.TimeTick(), uint64(1))
}
result = buf.PopUtilTimeTick(10)
assert.Len(t, result, 9)
for _, msg := range result {
assert.LessOrEqual(t, msg.TimeTick(), uint64(10))
assert.Greater(t, msg.TimeTick(), uint64(1))
}
result = buf.PopUtilTimeTick(25)
assert.Len(t, result, 15)
for _, msg := range result {
assert.Greater(t, msg.TimeTick(), uint64(10))
assert.LessOrEqual(t, msg.TimeTick(), uint64(25))
}
}

View File

@ -11,3 +11,11 @@ packages:
Opener:
Scanner:
WAL:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors:
interfaces:
Interceptor:
InterceptorWithReady:
InterceptorBuilder:
google.golang.org/grpc:
interfaces:
ClientStream:

View File

@ -0,0 +1,47 @@
package status
import (
"context"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Check if the error is canceled.
// Used in client side.
func IsCanceled(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
if errors.Is(err, context.Canceled) {
return true
}
if se, ok := err.(interface {
GRPCStatus() *status.Status
}); ok {
switch se.GRPCStatus().Code() {
case codes.Canceled, codes.DeadlineExceeded:
return true
// It may be a special unavailable error, but we don't enable here.
// From etcd implementation:
// case codes.Unavailable:
// msg := se.GRPCStatus().Message()
// // client-side context cancel or deadline exceeded with TLS ("http2.errClientDisconnected")
// // "rpc error: code = Unavailable desc = client disconnected"
// if msg == "client disconnected" {
// return true
// }
// // "grpc/transport.ClientTransport.CloseStream" on canceled streams
// // "rpc error: code = Unavailable desc = stream error: stream ID 21; CANCEL")
// if strings.HasPrefix(msg, "stream error: ") && strings.HasSuffix(msg, "; CANCEL") {
// return true
// }
}
}
return false
}

View File

@ -0,0 +1,19 @@
package status
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestIsCanceled(t *testing.T) {
assert.False(t, IsCanceled(nil))
assert.True(t, IsCanceled(context.DeadlineExceeded))
assert.True(t, IsCanceled(context.Canceled))
assert.True(t, IsCanceled(status.Error(codes.Canceled, "test")))
assert.True(t, IsCanceled(ConvertStreamingError("test", status.Error(codes.Canceled, "test"))))
assert.False(t, IsCanceled(ConvertStreamingError("test", status.Error(codes.Unknown, "test"))))
}

View File

@ -0,0 +1,34 @@
package status
import (
"google.golang.org/grpc"
)
// NewClientStreamWrapper returns a grpc.ClientStream that wraps the given stream.
func NewClientStreamWrapper(method string, stream grpc.ClientStream) grpc.ClientStream {
if stream == nil {
return nil
}
return &clientStreamWrapper{
method: method,
ClientStream: stream,
}
}
// clientStreamWrapper wraps a grpc.ClientStream and converts errors to Status.
type clientStreamWrapper struct {
method string
grpc.ClientStream
}
// Convert the error to a Status and return it.
func (s *clientStreamWrapper) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
return ConvertStreamingError(s.method, err)
}
// Convert the error to a Status and return it.
func (s *clientStreamWrapper) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
return ConvertStreamingError(s.method, err)
}

View File

@ -0,0 +1,33 @@
package status
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/mocks/google.golang.org/mock_grpc"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
)
func TestClientStreamWrapper(t *testing.T) {
s := mock_grpc.NewMockClientStream(t)
s.EXPECT().SendMsg(mock.Anything).Return(NewGRPCStatusFromStreamingError(NewOnShutdownError("test")).Err())
s.EXPECT().RecvMsg(mock.Anything).Return(NewGRPCStatusFromStreamingError(NewOnShutdownError("test")).Err())
w := NewClientStreamWrapper("method", s)
err := w.SendMsg(context.Background())
assert.NotNil(t, err)
streamingErr := AsStreamingError(err)
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, streamingErr.Code)
assert.Contains(t, streamingErr.Cause, "test")
err = w.RecvMsg(context.Background())
assert.NotNil(t, err)
streamingErr = AsStreamingError(err)
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, streamingErr.Code)
assert.Contains(t, streamingErr.Cause, "test")
assert.Nil(t, NewClientStreamWrapper("method", nil))
}

View File

@ -0,0 +1,101 @@
package status
import (
"context"
"fmt"
"io"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
)
var streamingErrorToGRPCStatus = map[streamingpb.StreamingCode]codes.Code{
streamingpb.StreamingCode_STREAMING_CODE_OK: codes.OK,
streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_EXIST: codes.AlreadyExists,
streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST: codes.FailedPrecondition,
streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_FENCED: codes.FailedPrecondition,
streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN: codes.FailedPrecondition,
streamingpb.StreamingCode_STREAMING_CODE_INVALID_REQUEST_SEQ: codes.FailedPrecondition,
streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM: codes.FailedPrecondition,
streamingpb.StreamingCode_STREAMING_CODE_IGNORED_OPERATION: codes.FailedPrecondition,
streamingpb.StreamingCode_STREAMING_CODE_INNER: codes.Unavailable,
streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN: codes.Unknown,
}
// NewGRPCStatusFromStreamingError converts StreamingError to grpc status.
// Should be called at server-side.
func NewGRPCStatusFromStreamingError(e *StreamingError) *status.Status {
if e == nil || e.Code == streamingpb.StreamingCode_STREAMING_CODE_OK {
return status.New(codes.OK, "")
}
code, ok := streamingErrorToGRPCStatus[e.Code]
if !ok {
code = codes.Unknown
}
// Attach streaming error to detail.
st := status.New(code, "")
newST, err := st.WithDetails(e.AsPBError())
if err != nil {
return status.New(code, fmt.Sprintf("convert streaming error failed, detail: %s", e.Cause))
}
return newST
}
// StreamingClientStatus is a wrapper of grpc status.
// Should be used in client side.
type StreamingClientStatus struct {
*status.Status
method string
}
// ConvertStreamingError convert error to StreamingStatus.
// Used in client side.
func ConvertStreamingError(method string, err error) error {
if err == nil {
return nil
}
if errors.IsAny(err, context.DeadlineExceeded, context.Canceled, io.EOF) {
return err
}
rpcStatus := status.Convert(err)
e := &StreamingClientStatus{
Status: rpcStatus,
method: method,
}
return e
}
// TryIntoStreamingError try to convert StreamingStatus to StreamingError.
func (s *StreamingClientStatus) TryIntoStreamingError() *StreamingError {
if s == nil {
return nil
}
for _, detail := range s.Details() {
if detail, ok := detail.(*streamingpb.StreamingError); ok {
return New(detail.Code, detail.Cause)
}
}
return nil
}
// For converting with status.Status.
// !!! DO NOT Delete this method. IsCanceled function use it.
func (s *StreamingClientStatus) GRPCStatus() *status.Status {
if s == nil {
return nil
}
return s.Status
}
// Error implements StreamingStatus as error.
func (s *StreamingClientStatus) Error() string {
if streamingErr := s.TryIntoStreamingError(); streamingErr != nil {
return fmt.Sprintf("%s; streaming error: code = %s, cause = %s; rpc error: code = %s, desc = %s", s.method, streamingErr.Code.String(), streamingErr.Cause, s.Code(), s.Message())
}
return fmt.Sprintf("%s; rpc error: code = %s, desc = %s", s.method, s.Code(), s.Message())
}

View File

@ -0,0 +1,48 @@
package status
import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
)
func TestStreamingStatus(t *testing.T) {
err := ConvertStreamingError("test", nil)
assert.Nil(t, err)
err = ConvertStreamingError("test", errors.Wrap(context.DeadlineExceeded, "test"))
assert.NotNil(t, err)
assert.ErrorIs(t, err, context.DeadlineExceeded)
err = ConvertStreamingError("test", errors.New("test"))
assert.NotNil(t, err)
streamingErr := AsStreamingError(err)
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN, streamingErr.Code)
assert.Contains(t, streamingErr.Cause, "test; rpc error: code = Unknown, desc = test")
err = ConvertStreamingError("test", NewGRPCStatusFromStreamingError(NewOnShutdownError("test")).Err())
assert.NotNil(t, err)
streamingErr = AsStreamingError(err)
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, streamingErr.Code)
assert.Contains(t, streamingErr.Cause, "test")
assert.Contains(t, err.Error(), "streaming error")
}
func TestNewGRPCStatusFromStreamingError(t *testing.T) {
st := NewGRPCStatusFromStreamingError(nil)
assert.Equal(t, codes.OK, st.Code())
st = NewGRPCStatusFromStreamingError(
NewOnShutdownError("test"),
)
assert.Equal(t, codes.FailedPrecondition, st.Code())
st = NewGRPCStatusFromStreamingError(
New(10086, "test"),
)
assert.Equal(t, codes.Unknown, st.Code())
}

View File

@ -0,0 +1,119 @@
package status
import (
"fmt"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
)
var _ error = (*StreamingError)(nil)
// StreamingError is the error type for streaming internal module.
// Should be used at logic layer.
type (
StreamingError streamingpb.StreamingError
StreamingCode streamingpb.StreamingCode
)
// Error implements StreamingError as error.
func (e *StreamingError) Error() string {
return fmt.Sprintf("code: %s, cause: %s", e.Code.String(), e.Cause)
}
// AsPBError convert StreamingError to streamingpb.StreamingError.
func (e *StreamingError) AsPBError() *streamingpb.StreamingError {
return (*streamingpb.StreamingError)(e)
}
// IsWrongStreamingNode returns true if the error is caused by wrong streamingnode.
// Client should report these error to coord and block until new assignment term coming.
func (e *StreamingError) IsWrongStreamingNode() bool {
return e.Code == streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM || // channel term not match
e.Code == streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST || // channel do not exist on streamingnode
e.Code == streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_FENCED // channel fenced on these node.
}
// NewOnShutdownError creates a new StreamingError with code STREAMING_CODE_ON_SHUTDOWN.
func NewOnShutdownError(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, format, args...)
}
// NewUnknownError creates a new StreamingError with code STREAMING_CODE_UNKNOWN.
func NewUnknownError(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN, format, args...)
}
// NewInvalidRequestSeq creates a new StreamingError with code STREAMING_CODE_INVALID_REQUEST_SEQ.
func NewInvalidRequestSeq(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_INVALID_REQUEST_SEQ, format, args...)
}
// NewChannelExist creates a new StreamingError with code StreamingCode_STREAMING_CODE_CHANNEL_EXIST.
func NewChannelExist(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_EXIST, format, args...)
}
// NewChannelNotExist creates a new StreamingError with code STREAMING_CODE_CHANNEL_NOT_EXIST.
func NewChannelNotExist(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST, format, args...)
}
// NewUnmatchedChannelTerm creates a new StreamingError with code StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM.
func NewUnmatchedChannelTerm(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM, format, args...)
}
// NewIgnoreOperation creates a new StreamingError with code STREAMING_CODE_IGNORED_OPERATION.
func NewIgnoreOperation(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_IGNORED_OPERATION, format, args...)
}
// NewInner creates a new StreamingError with code STREAMING_CODE_INNER.
func NewInner(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_INNER, format, args...)
}
// New creates a new StreamingError with the given code and cause.
func New(code streamingpb.StreamingCode, format string, args ...interface{}) *StreamingError {
if len(args) == 0 {
return &StreamingError{
Code: code,
Cause: format,
}
}
return &StreamingError{
Code: code,
Cause: redact.Sprintf(format, args...).StripMarkers(),
}
}
// As implements StreamingError as error.
func AsStreamingError(err error) *StreamingError {
if err == nil {
return nil
}
// If the error is a StreamingError, return it directly.
var e *StreamingError
if errors.As(err, &e) {
return e
}
// If the error is StreamingStatus,
var st *StreamingClientStatus
if errors.As(err, &st) {
e = st.TryIntoStreamingError()
if e != nil {
return e
}
}
// Return a default StreamingError.
return &StreamingError{
Code: streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN,
Cause: err.Error(),
}
}

View File

@ -0,0 +1,65 @@
package status
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
)
func TestStreamingError(t *testing.T) {
streamingErr := NewOnShutdownError("test")
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_ON_SHUTDOWN, cause: test")
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr := streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, pbErr.Code)
streamingErr = NewUnknownError("test")
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_UNKNOWN, cause: test")
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN, pbErr.Code)
streamingErr = NewInvalidRequestSeq("test")
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_INVALID_REQUEST_SEQ, cause: test")
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_INVALID_REQUEST_SEQ, pbErr.Code)
streamingErr = NewChannelExist("test")
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_CHANNEL_EXIST, cause: test")
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_EXIST, pbErr.Code)
streamingErr = NewChannelNotExist("test")
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_CHANNEL_NOT_EXIST, cause: test")
assert.True(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST, pbErr.Code)
streamingErr = NewUnmatchedChannelTerm("test")
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_UNMATCHED_CHANNEL_TERM, cause: test")
assert.True(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM, pbErr.Code)
streamingErr = NewIgnoreOperation("test")
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_IGNORED_OPERATION, cause: test")
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_IGNORED_OPERATION, pbErr.Code)
streamingErr = NewInner("test")
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_INNER, cause: test")
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_INNER, pbErr.Code)
streamingErr = NewOnShutdownError("test, %d", 1)
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_ON_SHUTDOWN, cause: test, 1")
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, pbErr.Code)
}

View File

@ -0,0 +1,17 @@
package util
import (
"go.uber.org/atomic"
)
func NewIDAllocator() *IDAllocator {
return &IDAllocator{}
}
type IDAllocator struct {
underlying atomic.Int64
}
func (ida *IDAllocator) Allocate() int64 {
return ida.underlying.Inc()
}

View File

@ -29,6 +29,7 @@ const (
AbandonLabel = "abandon"
SuccessLabel = "success"
FailLabel = "fail"
CancelLabel = "cancel"
TotalLabel = "total"
HybridSearchLabel = "hybrid_search"

View File

@ -0,0 +1,180 @@
package metrics
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
subsystemStreamingServiceClient = "streaming"
StreamingServiceClientProducerAvailable = "available"
StreamingServiceClientProducerUnAvailable = "unavailable"
)
var (
logServiceClientRegisterOnce sync.Once
// from 64 bytes to 5MB
bytesBuckets = prometheus.ExponentialBucketsRange(64, 5242880, 10)
// from 1ms to 5s
secondsBuckets = prometheus.ExponentialBucketsRange(0.001, 5, 10)
// Client side metrics
StreamingServiceClientProducerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "producer_total",
Help: "Total of producers",
}, statusLabelName)
StreamingServiceClientConsumerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "consumer_total",
Help: "Total of consumers",
}, statusLabelName)
StreamingServiceClientProduceBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "produce_bytes",
Help: "Bytes of produced message",
Buckets: bytesBuckets,
})
StreamingServiceClientConsumeBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "consume_bytes",
Help: "Bytes of consumed message",
Buckets: bytesBuckets,
})
StreamingServiceClientProduceDurationSeconds = newStreamingServiceClientHistogramVec(
prometheus.HistogramOpts{
Name: "produce_duration_seconds",
Help: "Duration of client produce",
Buckets: secondsBuckets,
},
statusLabelName,
)
// StreamingCoord metrics
StreamingCoordPChannelTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "pchannel_total",
Help: "Total of pchannels",
})
// StreamingCoordVChannelTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
// Name: "vchannel_total",
// Help: "Total of vchannels",
// })
StreamingCoordAssignmentListenerTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "assignment_listener_total",
Help: "Total of assignment listener",
})
StreamingCoordAssignmentInfo = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "assignment_info",
Help: "Info of assignment",
}, "global_version", "local_version")
// StreamingNode metrics
StreamingNodeWALTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "wal_total",
Help: "Total of wal",
})
StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "producer_total",
Help: "Total of producers",
})
StreamingNodeConsumerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "consumer_total",
Help: "Total of consumers",
})
StreamingNodeProduceBytes = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "produce_bytes",
Help: "Bytes of produced message",
Buckets: bytesBuckets,
})
StreamingNodeConsumeBytes = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "consume_bytes",
Help: "Bytes of consumed message",
Buckets: bytesBuckets,
})
StreamingNodeProduceDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "produce_duration_seconds",
Help: "Duration of producing message",
Buckets: secondsBuckets,
}, statusLabelName)
)
func RegisterStreamingServiceClient(registry *prometheus.Registry) {
logServiceClientRegisterOnce.Do(func() {
registry.MustRegister(StreamingServiceClientProducerTotal)
registry.MustRegister(StreamingServiceClientConsumerTotal)
registry.MustRegister(StreamingServiceClientProduceBytes)
registry.MustRegister(StreamingServiceClientConsumeBytes)
registry.MustRegister(StreamingServiceClientProduceDurationSeconds)
})
}
// RegisterStreamingCoord registers log service metrics
func RegisterStreamingCoord(registry *prometheus.Registry) {
registry.MustRegister(StreamingCoordPChannelTotal)
registry.MustRegister(StreamingCoordAssignmentListenerTotal)
registry.MustRegister(StreamingCoordAssignmentInfo)
}
// RegisterStreamingNode registers log service metrics
func RegisterStreamingNode(registry *prometheus.Registry) {
registry.MustRegister(StreamingNodeWALTotal)
registry.MustRegister(StreamingNodeProducerTotal)
registry.MustRegister(StreamingNodeConsumerTotal)
registry.MustRegister(StreamingNodeProduceBytes)
registry.MustRegister(StreamingNodeConsumeBytes)
registry.MustRegister(StreamingNodeProduceDurationSeconds)
}
func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace
opts.Subsystem = typeutil.StreamingCoordRole
labels := mergeLabel(extra...)
return prometheus.NewGaugeVec(opts, labels)
}
func newStreamingServiceClientGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemStreamingServiceClient
labels := mergeLabel(extra...)
return prometheus.NewGaugeVec(opts, labels)
}
func newStreamingServiceClientHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemStreamingServiceClient
labels := mergeLabel(extra...)
return prometheus.NewHistogramVec(opts, labels)
}
func newStreamingNodeGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace
opts.Subsystem = typeutil.StreamingNodeRole
labels := mergeLabel(extra...)
return prometheus.NewGaugeVec(opts, labels)
}
func newStreamingNodeHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec {
opts.Namespace = milvusNamespace
opts.Subsystem = typeutil.StreamingNodeRole
labels := mergeLabel(extra...)
return prometheus.NewHistogramVec(opts, labels)
}
func mergeLabel(extra ...string) []string {
labels := make([]string, 0, 1+len(extra))
labels = append(labels, nodeIDLabelName)
labels = append(labels, extra...)
return labels
}

View File

@ -48,6 +48,10 @@ const (
IndexNodeRole = "indexnode"
// MixtureRole is a constant represents Mixture running modtoe
MixtureRole = "mixture"
// StreamingCoord is a constant represent StreamingCoord
StreamingCoordRole = "streamingcoord"
// StreamingNode is a constant represent StreamingNode
StreamingNodeRole = "streamingnode"
)
var (
@ -60,6 +64,7 @@ var (
IndexNodeRole,
DataCoordRole,
DataNodeRole,
StreamingNodeRole,
)
serverTypeList = serverTypeSet.Collect()
)