milvus/internal/master/segment_manager_test.go
neza2017 ae26a81550 Change to use Marshal as text
Signed-off-by: neza2017 <yefu.chen@zilliz.com>
2020-11-26 10:13:23 +08:00

370 lines
10 KiB
Go

package master
import (
"context"
"log"
"sync/atomic"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
var mt *metaTable
var segMgr *SegmentManager
var collName = "coll_segmgr_test"
var collID = int64(1001)
var partitionTag = "test"
var kvBase *kv.EtcdKV
var master *Master
var masterCancelFunc context.CancelFunc
func setup() {
Params.Init()
etcdAddress := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
panic(err)
}
rootPath := "/test/root"
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix())
if err != nil {
panic(err)
}
kvBase = kv.NewEtcdKV(cli, rootPath)
tmpMt, err := NewMetaTable(kvBase)
if err != nil {
panic(err)
}
mt = tmpMt
if mt.HasCollection(collID) {
err := mt.DeleteCollection(collID)
if err != nil {
panic(err)
}
}
err = mt.AddCollection(&pb.CollectionMeta{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: collName,
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
})
if err != nil {
panic(err)
}
err = mt.AddPartition(collID, partitionTag)
if err != nil {
panic(err)
}
var cnt int64
Params.TopicNum = 5
Params.QueryNodeNum = 3
Params.SegmentSize = 536870912 / 1024 / 1024
Params.SegmentSizeFactor = 0.75
Params.DefaultRecordSize = 1024
Params.MinSegIDAssignCnt = 1048576 / 1024
Params.SegIDAssignExpiration = 2000
segMgr = NewSegmentManager(mt,
func() (UniqueID, error) {
val := atomic.AddInt64(&cnt, 1)
return val, nil
},
func() (Timestamp, error) {
val := atomic.AddInt64(&cnt, 1)
phy := time.Now().UnixNano() / int64(time.Millisecond)
ts := tsoutil.ComposeTS(phy, val)
return ts, nil
},
)
}
func teardown() {
err := mt.DeleteCollection(collID)
if err != nil {
log.Fatalf(err.Error())
}
kvBase.Close()
}
func TestSegmentManager_AssignSegmentID(t *testing.T) {
setup()
defer teardown()
reqs := []*internalpb.SegIDRequest{
{CollName: collName, PartitionTag: partitionTag, Count: 25000, ChannelID: 0},
{CollName: collName, PartitionTag: partitionTag, Count: 10000, ChannelID: 1},
{CollName: collName, PartitionTag: partitionTag, Count: 30000, ChannelID: 2},
{CollName: collName, PartitionTag: partitionTag, Count: 25000, ChannelID: 3},
{CollName: collName, PartitionTag: partitionTag, Count: 10000, ChannelID: 4},
}
segAssigns, err := segMgr.AssignSegmentID(reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(25000), segAssigns[0].Count)
assert.Equal(t, uint32(10000), segAssigns[1].Count)
assert.Equal(t, uint32(30000), segAssigns[2].Count)
assert.Equal(t, uint32(25000), segAssigns[3].Count)
assert.Equal(t, uint32(10000), segAssigns[4].Count)
assert.Equal(t, segAssigns[0].SegID, segAssigns[1].SegID)
assert.Equal(t, segAssigns[2].SegID, segAssigns[3].SegID)
newReqs := []*internalpb.SegIDRequest{
{CollName: collName, PartitionTag: partitionTag, Count: 500000, ChannelID: 0},
}
// test open a new segment
newAssign, err := segMgr.AssignSegmentID(newReqs)
assert.Nil(t, err)
assert.NotNil(t, newAssign)
assert.Equal(t, uint32(500000), newAssign[0].Count)
assert.NotEqual(t, segAssigns[0].SegID, newAssign[0].SegID)
// test assignment expiration
time.Sleep(3 * time.Second)
assignAfterExpiration, err := segMgr.AssignSegmentID(newReqs)
assert.Nil(t, err)
assert.NotNil(t, assignAfterExpiration)
assert.Equal(t, uint32(500000), assignAfterExpiration[0].Count)
assert.Equal(t, segAssigns[0].SegID, assignAfterExpiration[0].SegID)
// test invalid params
newReqs[0].CollName = "wrong_collname"
_, err = segMgr.AssignSegmentID(newReqs)
assert.Error(t, errors.Errorf("can not find collection with id=%d", collID), err)
newReqs[0].Count = 1000000
_, err = segMgr.AssignSegmentID(newReqs)
assert.Error(t, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold",
1000000, 1024*1000000), err)
}
func TestSegmentManager_SegmentStats(t *testing.T) {
setup()
defer teardown()
ts, err := segMgr.globalTSOAllocator()
assert.Nil(t, err)
err = mt.AddSegment(&pb.SegmentMeta{
SegmentID: 100,
CollectionID: collID,
PartitionTag: partitionTag,
ChannelStart: 0,
ChannelEnd: 1,
OpenTime: ts,
})
assert.Nil(t, err)
stats := internalpb.QueryNodeSegStats{
MsgType: internalpb.MsgType_kQueryNodeSegStats,
PeerID: 1,
SegStats: []*internalpb.SegmentStats{
{SegmentID: 100, MemorySize: 2500000, NumRows: 25000, RecentlyModified: true},
},
}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{1},
}
msg := msgstream.QueryNodeSegStatsMsg{
QueryNodeSegStats: stats,
BaseMsg: baseMsg,
}
var tsMsg msgstream.TsMsg = &msg
msgPack := msgstream.MsgPack{
Msgs: make([]msgstream.TsMsg, 0),
}
msgPack.Msgs = append(msgPack.Msgs, tsMsg)
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
assert.Nil(t, err)
segMeta, _ := mt.GetSegmentByID(100)
assert.Equal(t, int64(100), segMeta.SegmentID)
assert.Equal(t, int64(2500000), segMeta.MemSize)
assert.Equal(t, int64(25000), segMeta.NumRows)
// close segment
stats.SegStats[0].NumRows = 600000
stats.SegStats[0].MemorySize = int64(0.8 * segMgr.segmentThreshold)
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
assert.Nil(t, err)
segMeta, _ = mt.GetSegmentByID(100)
assert.Equal(t, int64(100), segMeta.SegmentID)
assert.NotEqual(t, uint64(0), segMeta.CloseTime)
}
func startupMaster() {
Params.Init()
etcdAddress := Params.EtcdAddress
rootPath := "/test/root"
ctx, cancel := context.WithCancel(context.TODO())
masterCancelFunc = cancel
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
panic(err)
}
_, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix())
if err != nil {
panic(err)
}
Params = ParamTable{
Address: Params.Address,
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: rootPath,
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},
WriteNodeIDList: []typeutil.UniqueID{3, 4},
TopicNum: 5,
QueryNodeNum: 3,
SoftTimeTickBarrierInterval: 300,
// segment
SegmentSize: 536870912 / 1024 / 1024,
SegmentSizeFactor: 0.75,
DefaultRecordSize: 1024,
MinSegIDAssignCnt: 1048576 / 1024,
MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt,
SegIDAssignExpiration: 2000,
// msgChannel
ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"},
WriteNodeTimeTickChannelNames: []string{"write3", "write4"},
InsertChannelNames: []string{"dm0", "dm1"},
K2SChannelNames: []string{"k2s0", "k2s1"},
QueryNodeStatsChannelName: "statistic",
MsgChannelSubName: Params.MsgChannelSubName,
}
master, err = CreateServer(ctx)
if err != nil {
panic(err)
}
err = master.Run(10013)
if err != nil {
panic(err)
}
}
func shutdownMaster() {
masterCancelFunc()
master.Close()
}
func TestSegmentManager_RPC(t *testing.T) {
startupMaster()
defer shutdownMaster()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
dialContext, err := grpc.DialContext(ctx, "127.0.0.1:10013", grpc.WithInsecure(), grpc.WithBlock())
assert.Nil(t, err)
defer dialContext.Close()
client := masterpb.NewMasterClient(dialContext)
schema := &schemapb.CollectionSchema{
Name: collName,
Description: "test coll",
AutoID: false,
Fields: []*schemapb.FieldSchema{},
}
schemaBytes, err := proto.Marshal(schema)
assert.Nil(t, err)
_, err = client.CreateCollection(ctx, &internalpb.CreateCollectionRequest{
MsgType: internalpb.MsgType_kCreateCollection,
ReqID: 1,
Timestamp: 100,
ProxyID: 1,
Schema: &commonpb.Blob{Value: schemaBytes},
})
assert.Nil(t, err)
_, err = client.CreatePartition(ctx, &internalpb.CreatePartitionRequest{
MsgType: internalpb.MsgType_kCreatePartition,
ReqID: 2,
Timestamp: 101,
ProxyID: 1,
PartitionName: &servicepb.PartitionName{
CollectionName: collName,
Tag: partitionTag,
},
})
assert.Nil(t, err)
resp, err := client.AssignSegmentID(ctx, &internalpb.AssignSegIDRequest{
PeerID: 1,
Role: internalpb.PeerRole_Proxy,
PerChannelReq: []*internalpb.SegIDRequest{
{Count: 10000, ChannelID: 0, CollName: collName, PartitionTag: partitionTag},
},
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_SUCCESS, resp.Status.ErrorCode)
assignments := resp.GetPerChannelAssignment()
assert.Equal(t, 1, len(assignments))
assert.Equal(t, collName, assignments[0].CollName)
assert.Equal(t, partitionTag, assignments[0].PartitionTag)
assert.Equal(t, int32(0), assignments[0].ChannelID)
assert.Equal(t, uint32(10000), assignments[0].Count)
// test stats
segID := assignments[0].SegID
pulsarAddress := Params.PulsarAddress
ms := msgstream.NewPulsarMsgStream(ctx, 1024)
ms.SetPulsarClient(pulsarAddress)
ms.CreatePulsarProducers([]string{"statistic"})
ms.Start()
defer ms.Close()
err = ms.Produce(&msgstream.MsgPack{
BeginTs: 102,
EndTs: 104,
Msgs: []msgstream.TsMsg{
&msgstream.QueryNodeSegStatsMsg{
QueryNodeSegStats: internalpb.QueryNodeSegStats{
MsgType: internalpb.MsgType_kQueryNodeSegStats,
PeerID: 1,
SegStats: []*internalpb.SegmentStats{
{SegmentID: segID, MemorySize: 600000000, NumRows: 1000000, RecentlyModified: true},
},
},
BaseMsg: msgstream.BaseMsg{
HashValues: []int32{0},
},
},
},
})
assert.Nil(t, err)
time.Sleep(500 * time.Millisecond)
segMeta, err := master.metaTable.GetSegmentByID(segID)
assert.Nil(t, err)
assert.NotEqual(t, uint64(0), segMeta.GetCloseTime())
assert.Equal(t, int64(600000000), segMeta.GetMemSize())
}