2022-04-20 16:15:41 +08:00
|
|
|
package querynode
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"testing"
|
|
|
|
|
2022-09-02 10:42:59 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/common"
|
2022-04-25 11:51:46 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
2022-04-20 16:15:41 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
|
|
|
"github.com/stretchr/testify/assert"
|
2022-04-25 11:51:46 +08:00
|
|
|
"github.com/stretchr/testify/require"
|
2022-04-20 16:15:41 +08:00
|
|
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestShardClusterService(t *testing.T) {
|
|
|
|
client := v3client.New(embedetcdServer.Server)
|
|
|
|
defer client.Close()
|
|
|
|
session := sessionutil.NewSession(context.Background(), "/by-dev/sessions/unittest/querynode/", client)
|
|
|
|
clusterService := newShardClusterService(client, session, nil)
|
|
|
|
|
|
|
|
assert.NotPanics(t, func() {
|
2022-11-15 13:21:07 +08:00
|
|
|
clusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, defaultVersion)
|
2022-04-20 16:15:41 +08:00
|
|
|
})
|
|
|
|
|
|
|
|
shardCluster, ok := clusterService.getShardCluster(defaultDMLChannel)
|
|
|
|
assert.True(t, ok)
|
|
|
|
assert.NotNil(t, shardCluster)
|
|
|
|
|
|
|
|
_, ok = clusterService.getShardCluster("non-exist-channel")
|
|
|
|
assert.False(t, ok)
|
|
|
|
|
|
|
|
err := clusterService.releaseShardCluster(defaultDMLChannel)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
|
|
|
|
err = clusterService.releaseShardCluster("non-exist-channel")
|
|
|
|
assert.Error(t, err)
|
|
|
|
}
|
2022-04-25 11:51:46 +08:00
|
|
|
|
2022-04-27 10:41:46 +08:00
|
|
|
func TestShardClusterService_SyncReplicaSegments(t *testing.T) {
|
|
|
|
qn, err := genSimpleQueryNode(context.Background())
|
|
|
|
require.NoError(t, err)
|
2022-11-17 14:23:08 +08:00
|
|
|
defer qn.Stop()
|
2022-04-27 10:41:46 +08:00
|
|
|
|
|
|
|
client := v3client.New(embedetcdServer.Server)
|
|
|
|
defer client.Close()
|
|
|
|
session := sessionutil.NewSession(context.Background(), "/by-dev/sessions/unittest/querynode/", client)
|
|
|
|
clusterService := newShardClusterService(client, session, qn)
|
|
|
|
|
|
|
|
t.Run("sync non-exist shard cluster", func(t *testing.T) {
|
|
|
|
err := clusterService.SyncReplicaSegments(defaultDMLChannel, nil)
|
|
|
|
assert.Error(t, err)
|
|
|
|
})
|
|
|
|
|
2022-10-20 16:35:28 +08:00
|
|
|
t.Run("sync initailizing shard cluster", func(t *testing.T) {
|
2022-11-15 13:21:07 +08:00
|
|
|
clusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, defaultVersion)
|
2022-10-20 16:35:28 +08:00
|
|
|
|
|
|
|
sc, ok := clusterService.getShardCluster(defaultDMLChannel)
|
|
|
|
require.True(t, ok)
|
|
|
|
assert.NotPanics(t, func() {
|
|
|
|
err := clusterService.SyncReplicaSegments(defaultDMLChannel, []*querypb.ReplicaSegmentsInfo{
|
|
|
|
{
|
|
|
|
NodeId: 1,
|
|
|
|
PartitionId: defaultPartitionID,
|
|
|
|
|
|
|
|
SegmentIds: []int64{1},
|
|
|
|
Versions: []int64{1},
|
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
assert.NoError(t, err)
|
|
|
|
assert.Nil(t, sc.currentVersion)
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
2022-04-27 10:41:46 +08:00
|
|
|
t.Run("sync shard cluster", func(t *testing.T) {
|
2022-11-15 13:21:07 +08:00
|
|
|
clusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, defaultVersion)
|
2022-04-27 10:41:46 +08:00
|
|
|
|
2022-10-20 16:35:28 +08:00
|
|
|
sc, ok := clusterService.getShardCluster(defaultDMLChannel)
|
|
|
|
require.True(t, ok)
|
|
|
|
sc.SetupFirstVersion()
|
|
|
|
|
2022-04-27 10:41:46 +08:00
|
|
|
err := clusterService.SyncReplicaSegments(defaultDMLChannel, []*querypb.ReplicaSegmentsInfo{
|
|
|
|
{
|
|
|
|
NodeId: 1,
|
|
|
|
PartitionId: defaultPartitionID,
|
|
|
|
|
|
|
|
SegmentIds: []int64{1},
|
2022-09-28 12:10:54 +08:00
|
|
|
Versions: []int64{1},
|
2022-04-27 10:41:46 +08:00
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
assert.NoError(t, err)
|
|
|
|
|
|
|
|
cs, ok := clusterService.getShardCluster(defaultDMLChannel)
|
|
|
|
require.True(t, ok)
|
|
|
|
segment, ok := cs.getSegment(1)
|
|
|
|
assert.True(t, ok)
|
2022-09-02 10:42:59 +08:00
|
|
|
assert.Equal(t, common.InvalidNodeID, segment.nodeID)
|
2022-04-27 10:41:46 +08:00
|
|
|
assert.Equal(t, defaultPartitionID, segment.partitionID)
|
|
|
|
assert.Equal(t, segmentStateLoaded, segment.state)
|
|
|
|
})
|
|
|
|
}
|
2022-11-02 14:59:34 +08:00
|
|
|
|
|
|
|
func TestShardClusterService_close(t *testing.T) {
|
|
|
|
client := v3client.New(embedetcdServer.Server)
|
|
|
|
defer client.Close()
|
|
|
|
session := sessionutil.NewSession(context.Background(), "/by-dev/sessions/unittest/querynode/", client)
|
|
|
|
clusterService := newShardClusterService(client, session, nil)
|
|
|
|
|
|
|
|
t.Run("close ok", func(t *testing.T) {
|
2022-11-15 13:21:07 +08:00
|
|
|
clusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, defaultVersion)
|
2022-11-02 14:59:34 +08:00
|
|
|
|
|
|
|
cnt := 0
|
|
|
|
clusterService.clusters.Range(func(key, value any) bool {
|
|
|
|
cnt++
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
assert.Equal(t, 1, cnt)
|
|
|
|
|
|
|
|
err := clusterService.close()
|
|
|
|
assert.NoError(t, err)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("close fail", func(t *testing.T) {
|
|
|
|
clusterService.clusters.Store("key", "error")
|
|
|
|
err := clusterService.close()
|
|
|
|
assert.Error(t, err)
|
|
|
|
})
|
|
|
|
}
|