Fix query node panic when watching dm channels (#21402)

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2022-12-28 10:19:31 +08:00 committed by GitHub
parent 6a29a964df
commit 2aed48c433
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 7 deletions

View File

@ -377,8 +377,14 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmC
ErrorCode: commonpb.ErrorCode_Success,
}, nil
})
ret, _ := future.Await()
return ret.(*commonpb.Status), nil
ret, err := future.Await()
if status, ok := ret.(*commonpb.Status); ok {
return status, nil
}
log.Warn("fail to convert the *commonpb.Status", zap.Any("ret", ret), zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, nil
}
func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) {

View File

@ -20,14 +20,11 @@ import (
"context"
"encoding/json"
"math/rand"
"runtime"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/common"
@ -35,9 +32,14 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestImpl_GetComponentStates(t *testing.T) {
@ -115,6 +117,16 @@ func TestImpl_WatchDmChannels(t *testing.T) {
status, err := node.WatchDmChannels(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
originPool := node.taskPool
defer func() {
node.taskPool = originPool
}()
node.taskPool, _ = concurrency.NewPool(runtime.GOMAXPROCS(0), ants.WithPreAlloc(true))
node.taskPool.Release()
status, err = node.WatchDmChannels(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
})
t.Run("target not match", func(t *testing.T) {
@ -192,7 +204,6 @@ func TestImpl_WatchDmChannels(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
})
}
func TestImpl_UnsubDmChannel(t *testing.T) {

View File

@ -67,3 +67,7 @@ func (pool *Pool) Cap() int {
func (pool *Pool) Running() int {
return pool.inner.Running()
}
func (pool *Pool) Release() {
pool.inner.Release()
}