milvus/internal/datanode/data_sync_service_test.go

362 lines
9.1 KiB
Go
Raw Normal View History

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"bytes"
"context"
"encoding/binary"
"math"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
func getVchanInfo(info *testInfo) *datapb.VchannelInfo {
var ufs []*datapb.SegmentInfo
var fs []*datapb.SegmentInfo
if info.isValidCase {
ufs = []*datapb.SegmentInfo{{
CollectionID: info.ufCollID,
PartitionID: 1,
InsertChannel: info.ufchanName,
ID: info.ufSegID,
NumOfRows: info.ufNor,
DmlPosition: &internalpb.MsgPosition{},
}}
fs = []*datapb.SegmentInfo{{
CollectionID: info.fCollID,
PartitionID: 1,
InsertChannel: info.fchanName,
ID: info.fSegID,
NumOfRows: info.fNor,
DmlPosition: &internalpb.MsgPosition{},
}}
} else {
ufs = []*datapb.SegmentInfo{}
}
vi := &datapb.VchannelInfo{
CollectionID: info.collID,
ChannelName: info.chanName,
SeekPosition: &internalpb.MsgPosition{},
UnflushedSegments: ufs,
FlushedSegments: fs,
}
return vi
}
type testInfo struct {
isValidCase bool
replicaNil bool
inMsgFactory msgstream.Factory
collID UniqueID
chanName string
ufCollID UniqueID
ufSegID UniqueID
ufchanName string
ufNor int64
fCollID UniqueID
fSegID UniqueID
fchanName string
fNor int64
description string
}
func TestDataSyncService_newDataSyncService(te *testing.T) {
ctx := context.Background()
tests := []*testInfo{
{false, false, &mockMsgStreamFactory{false, true},
0, "by-dev-rootcoord-dml_test",
0, 0, "", 0,
0, 0, "", 0,
"SetParamsReturnError"},
{true, false, &mockMsgStreamFactory{true, true},
0, "by-dev-rootcoord-dml_test",
1, 0, "", 0,
1, 1, "", 0,
"CollID 0 mismach with seginfo collID 1"},
{true, false, &mockMsgStreamFactory{true, true},
1, "by-dev-rootcoord-dml_1",
1, 0, "by-dev-rootcoord-dml_2", 0,
1, 1, "by-dev-rootcoord-dml_3", 0,
"chanName c1 mismach with seginfo chanName c2"},
{true, false, &mockMsgStreamFactory{true, true},
1, "by-dev-rootcoord-dml_1",
1, 0, "by-dev-rootcoord-dml_1", 0,
1, 1, "by-dev-rootcoord-dml_2", 0,
"add normal segments"},
{false, false, &mockMsgStreamFactory{true, false},
0, "by-dev-rootcoord-dml",
0, 0, "", 0,
0, 0, "", 0,
"error when newinsertbufernode"},
{false, true, &mockMsgStreamFactory{true, false},
0, "by-dev-rootcoord-dml",
0, 0, "", 0,
0, 0, "", 0,
"replica nil"},
}
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
df := &DataCoordFactory{}
replica, err := newReplica(context.Background(), &RootCoordFactory{}, test.collID)
assert.Nil(t, err)
if test.replicaNil {
replica = nil
}
ds, err := newDataSyncService(ctx,
make(chan flushMsg),
replica,
NewAllocatorFactory(),
test.inMsgFactory,
getVchanInfo(test),
make(chan UniqueID),
df,
newCache(),
)
if !test.isValidCase {
assert.Error(t, err)
assert.Nil(t, ds)
} else {
assert.NoError(t, err)
assert.NotNil(t, ds)
// start
ds.fg = nil
ds.start()
}
})
}
}
// NOTE: start pulsar before test
func TestDataSyncService_Start(t *testing.T) {
t.Skip()
const ctxTimeInMillisecond = 2000
delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), delay)
defer cancel()
// init data node
pulsarURL := Params.PulsarAddress
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1")
2021-06-21 17:28:03 +08:00
mockRootCoord := &RootCoordFactory{}
Fix bugs (#5676) * Remove redundant session startup Signed-off-by: sunby <bingyi.sun@zilliz.com> * Register datanode after start success Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix meta snap shot Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix datanode message stream channel Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix bugs when drop empty collection Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix bug of getting pchan statistics from task scheduler Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix i/dist/dataservice test code Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix epoch lifetime not applied Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * fix datanode flowgraph dd node Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix handle datanode timetick bug Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix proxynode Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Apply extended seal policy Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add check for time tick Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix check Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix the repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the bug when send statistics of pchan Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the repack function when craete dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix bugs Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix describe collection Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix bug when send timestamp statistics Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add length check before flush request Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix SaveBinlog bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Add more log in datanode Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Put SegmentState.Flushing as the last one in enum to fit the client Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix params in GetInsertBinlogPaths Signed-off-by: sunby <bingyi.sun@zilliz.com> * Rename policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove unused ddl functions and fields Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Remove pchan when drop collection Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Add balanced assignment policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix master ut Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add lock in session manager Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for debug Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix some logic bug and typo Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix recover bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Get collection scheme of a specific timestamp Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Change CheckPoint to SegmentInfo in VchannelInfo Signed-off-by: sunby <bingyi.sun@zilliz.com> * Recover Unflushed segment numOfRows Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix dataservice unit tests Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: yefu.chen <yefu.chen@zilliz.com> Co-authored-by: yangxuan <xuan.yang@zilliz.com> Co-authored-by: dragondriver <jiquan.long@zilliz.com> Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
2021-06-08 19:25:37 +08:00
collectionID := UniqueID(1)
flushChan := make(chan flushMsg, 100)
replica, err := newReplica(context.Background(), mockRootCoord, collectionID)
assert.Nil(t, err)
Fix bugs (#5676) * Remove redundant session startup Signed-off-by: sunby <bingyi.sun@zilliz.com> * Register datanode after start success Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix meta snap shot Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix datanode message stream channel Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix bugs when drop empty collection Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix bug of getting pchan statistics from task scheduler Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix i/dist/dataservice test code Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix epoch lifetime not applied Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * fix datanode flowgraph dd node Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix handle datanode timetick bug Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix proxynode Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Apply extended seal policy Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add check for time tick Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix check Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix the repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the bug when send statistics of pchan Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the repack function when craete dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix bugs Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix describe collection Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix bug when send timestamp statistics Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add length check before flush request Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix SaveBinlog bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Add more log in datanode Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Put SegmentState.Flushing as the last one in enum to fit the client Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix params in GetInsertBinlogPaths Signed-off-by: sunby <bingyi.sun@zilliz.com> * Rename policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove unused ddl functions and fields Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Remove pchan when drop collection Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Add balanced assignment policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix master ut Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add lock in session manager Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for debug Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix some logic bug and typo Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix recover bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Get collection scheme of a specific timestamp Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Change CheckPoint to SegmentInfo in VchannelInfo Signed-off-by: sunby <bingyi.sun@zilliz.com> * Recover Unflushed segment numOfRows Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix dataservice unit tests Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: yefu.chen <yefu.chen@zilliz.com> Co-authored-by: yangxuan <xuan.yang@zilliz.com> Co-authored-by: dragondriver <jiquan.long@zilliz.com> Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
2021-06-08 19:25:37 +08:00
allocFactory := NewAllocatorFactory(1)
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": pulsarURL,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
insertChannelName := "data_sync_service_test_dml"
ddlChannelName := "data_sync_service_test_ddl"
Params.FlushInsertBufferSize = 1
ufs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
InsertChannel: insertChannelName,
ID: 0,
NumOfRows: 0,
DmlPosition: &internalpb.MsgPosition{},
}}
fs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
ID: 1,
NumOfRows: 0,
DmlPosition: &internalpb.MsgPosition{},
}}
vchan := &datapb.VchannelInfo{
CollectionID: collMeta.ID,
Fix bugs (#5676) * Remove redundant session startup Signed-off-by: sunby <bingyi.sun@zilliz.com> * Register datanode after start success Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix meta snap shot Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix datanode message stream channel Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix bugs when drop empty collection Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix bug of getting pchan statistics from task scheduler Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix i/dist/dataservice test code Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix epoch lifetime not applied Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * fix datanode flowgraph dd node Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix handle datanode timetick bug Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix proxynode Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Apply extended seal policy Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add check for time tick Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix check Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix the repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the bug when send statistics of pchan Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the repack function when craete dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix bugs Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix describe collection Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix bug when send timestamp statistics Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add length check before flush request Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix SaveBinlog bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Add more log in datanode Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Put SegmentState.Flushing as the last one in enum to fit the client Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix params in GetInsertBinlogPaths Signed-off-by: sunby <bingyi.sun@zilliz.com> * Rename policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove unused ddl functions and fields Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Remove pchan when drop collection Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Add balanced assignment policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix master ut Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add lock in session manager Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for debug Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix some logic bug and typo Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix recover bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Get collection scheme of a specific timestamp Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Change CheckPoint to SegmentInfo in VchannelInfo Signed-off-by: sunby <bingyi.sun@zilliz.com> * Recover Unflushed segment numOfRows Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix dataservice unit tests Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: yefu.chen <yefu.chen@zilliz.com> Co-authored-by: yangxuan <xuan.yang@zilliz.com> Co-authored-by: dragondriver <jiquan.long@zilliz.com> Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
2021-06-08 19:25:37 +08:00
ChannelName: insertChannelName,
UnflushedSegments: ufs,
FlushedSegments: fs,
}
signalCh := make(chan UniqueID, 100)
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache())
assert.Nil(t, err)
Fix bugs (#5676) * Remove redundant session startup Signed-off-by: sunby <bingyi.sun@zilliz.com> * Register datanode after start success Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix meta snap shot Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix datanode message stream channel Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix bugs when drop empty collection Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix bug of getting pchan statistics from task scheduler Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix i/dist/dataservice test code Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix epoch lifetime not applied Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * fix datanode flowgraph dd node Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix handle datanode timetick bug Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix proxynode Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Apply extended seal policy Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add check for time tick Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix check Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix the repack function of dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the bug when send statistics of pchan Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Fix the repack function when craete dml stream Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix bugs Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * fix describe collection Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix bug when send timestamp statistics Signed-off-by: dragondriver <jiquan.long@zilliz.com> * fix data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add length check before flush request Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for data node Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix SaveBinlog bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Add more log in datanode Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Put SegmentState.Flushing as the last one in enum to fit the client Signed-off-by: sunby <bingyi.sun@zilliz.com> * Fix params in GetInsertBinlogPaths Signed-off-by: sunby <bingyi.sun@zilliz.com> * Rename policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * Remove unused ddl functions and fields Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Remove pchan when drop collection Signed-off-by: dragondriver <jiquan.long@zilliz.com> * Add balanced assignment policy Signed-off-by: sunby <bingyi.sun@zilliz.com> * fix master ut Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Add lock in session manager Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * add log for debug Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * Fix some logic bug and typo Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix recover bugs Signed-off-by: sunby <bingyi.sun@zilliz.com> * Get collection scheme of a specific timestamp Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Change CheckPoint to SegmentInfo in VchannelInfo Signed-off-by: sunby <bingyi.sun@zilliz.com> * Recover Unflushed segment numOfRows Signed-off-by: yangxuan <xuan.yang@zilliz.com> * Fix dataservice unit tests Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: yefu.chen <yefu.chen@zilliz.com> Co-authored-by: yangxuan <xuan.yang@zilliz.com> Co-authored-by: dragondriver <jiquan.long@zilliz.com> Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
2021-06-08 19:25:37 +08:00
// sync.replica.addCollection(collMeta.ID, collMeta.Schema)
sync.start()
timeRange := TimeRange{
timestampMin: 0,
timestampMax: math.MaxUint64,
}
dataFactory := NewDataFactory()
insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName)
msgPack := msgstream.MsgPack{
BeginTs: timeRange.timestampMin,
EndTs: timeRange.timestampMax,
Msgs: insertMessages,
StartPositions: []*internalpb.MsgPosition{{
ChannelName: insertChannelName,
}},
EndPositions: []*internalpb.MsgPosition{{
ChannelName: insertChannelName,
}},
}
// generate timeTick
timeTickMsgPack := msgstream.MsgPack{}
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(0),
EndTimestamp: Timestamp(0),
HashValues: []uint32{0},
},
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: UniqueID(0),
Timestamp: math.MaxUint64,
SourceID: 0,
},
},
}
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
// pulsar produce
assert.NoError(t, err)
insertStream, _ := msFactory.NewMsgStream(ctx)
insertStream.AsProducer([]string{insertChannelName})
ddStream, _ := msFactory.NewMsgStream(ctx)
ddStream.AsProducer([]string{ddlChannelName})
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
// dataSync
<-sync.ctx.Done()
sync.close()
}
func genBytes() (rawData []byte) {
const DIM = 2
const N = 1
// Float vector
var fvector = [DIM]float32{1, 2}
for _, ele := range fvector {
buf := make([]byte, 4)
common.Endian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...)
}
// Binary vector
// Dimension of binary vector is 32
// size := 4, = 32 / 8
var bvector = []byte{255, 255, 255, 0}
rawData = append(rawData, bvector...)
// Bool
var fieldBool = true
buf := new(bytes.Buffer)
if err := binary.Write(buf, common.Endian, fieldBool); err != nil {
panic(err)
}
rawData = append(rawData, buf.Bytes()...)
// int8
var dataInt8 int8 = 100
bint8 := new(bytes.Buffer)
if err := binary.Write(bint8, common.Endian, dataInt8); err != nil {
panic(err)
}
rawData = append(rawData, bint8.Bytes()...)
log.Debug("Rawdata length:", zap.Int("Length of rawData", len(rawData)))
return
}
func TestBytesReader(t *testing.T) {
rawData := genBytes()
// Bytes Reader is able to recording the position
rawDataReader := bytes.NewReader(rawData)
var fvector []float32 = make([]float32, 2)
err := binary.Read(rawDataReader, common.Endian, &fvector)
assert.Nil(t, err)
assert.ElementsMatch(t, fvector, []float32{1, 2})
var bvector []byte = make([]byte, 4)
err = binary.Read(rawDataReader, common.Endian, &bvector)
assert.Nil(t, err)
assert.ElementsMatch(t, bvector, []byte{255, 255, 255, 0})
var fieldBool bool
err = binary.Read(rawDataReader, common.Endian, &fieldBool)
assert.Nil(t, err)
assert.Equal(t, true, fieldBool)
var dataInt8 int8
err = binary.Read(rawDataReader, common.Endian, &dataInt8)
assert.Nil(t, err)
assert.Equal(t, int8(100), dataInt8)
}