milvus/internal/proxy/replicate_stream_manager_test.go

86 lines
2.4 KiB
Go
Raw Normal View History

package proxy
import (
"context"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/resource"
)
func TestReplicateManager(t *testing.T) {
factory := newMockMsgStreamFactory()
resourceManager := resource.NewManager(time.Second, 2*time.Second, nil)
manager := NewReplicateStreamManager(context.Background(), factory, resourceManager)
{
factory.f = func(ctx context.Context) (msgstream.MsgStream, error) {
return nil, errors.New("mock msgstream fail")
}
_, err := manager.GetReplicateMsgStream(context.Background(), "test")
assert.Error(t, err)
}
{
mockMsgStream := newMockMsgStream()
i := 0
mockMsgStream.setRepack = func(repackFunc msgstream.RepackFunc) {
i++
}
mockMsgStream.asProducer = func(producers []string) {
i++
}
mockMsgStream.enableProduce = func(b bool) {
i++
}
mockMsgStream.close = func() {
i++
}
factory.f = func(ctx context.Context) (msgstream.MsgStream, error) {
return mockMsgStream, nil
}
_, err := manager.GetReplicateMsgStream(context.Background(), "test")
assert.NoError(t, err)
assert.Equal(t, 3, i)
time.Sleep(time.Second)
_, err = manager.GetReplicateMsgStream(context.Background(), "test")
assert.NoError(t, err)
assert.Equal(t, 3, i)
res := resourceManager.Delete(ReplicateMsgStreamTyp, "test")
assert.NotNil(t, res)
assert.Eventually(t, func() bool {
return resourceManager.Delete(ReplicateMsgStreamTyp, "test") == nil
}, time.Second*4, time.Millisecond*500)
_, err = manager.GetReplicateMsgStream(context.Background(), "test")
assert.NoError(t, err)
assert.Equal(t, 7, i)
}
{
res := resourceManager.Delete(ReplicateMsgStreamTyp, "test")
assert.NotNil(t, res)
assert.Eventually(t, func() bool {
return resourceManager.Delete(ReplicateMsgStreamTyp, "test") == nil
}, time.Second*4, time.Millisecond*500)
res, err := resourceManager.Get(ReplicateMsgStreamTyp, "test", func() (resource.Resource, error) {
return resource.NewResource(resource.WithObj("str")), nil
})
assert.NoError(t, err)
assert.Equal(t, "str", res.Get())
_, err = manager.GetReplicateMsgStream(context.Background(), "test")
assert.ErrorIs(t, err, merr.ErrInvalidStreamObj)
}
{
assert.NotNil(t, manager.GetMsgDispatcher())
}
}