2021-01-16 10:12:14 +08:00
|
|
|
package querynode
|
2020-12-24 20:55:40 +08:00
|
|
|
|
|
|
|
import (
|
2021-01-06 18:19:44 +08:00
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"math"
|
2020-12-24 20:55:40 +08:00
|
|
|
"math/rand"
|
|
|
|
"sort"
|
2021-01-06 18:19:44 +08:00
|
|
|
"strconv"
|
2020-12-24 20:55:40 +08:00
|
|
|
"testing"
|
|
|
|
|
2021-01-06 18:19:44 +08:00
|
|
|
"github.com/golang/protobuf/proto"
|
2020-12-24 20:55:40 +08:00
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
|
2021-01-15 14:38:36 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/indexnode"
|
2020-12-29 14:43:40 +08:00
|
|
|
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
|
2020-12-24 20:55:40 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
2021-01-20 10:02:59 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
|
|
|
|
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
|
2020-12-24 20:55:40 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
2021-01-18 19:32:08 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
2021-01-22 09:36:18 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
2020-12-26 14:16:51 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
|
2021-01-29 15:22:24 +08:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/storage"
|
2020-12-24 20:55:40 +08:00
|
|
|
)
|
|
|
|
|
2021-01-30 16:02:10 +08:00
|
|
|
func TestLoadService_LoadIndex_FloatVector(t *testing.T) {
|
2021-01-15 15:28:54 +08:00
|
|
|
node := newQueryNodeMock()
|
2020-12-24 20:55:40 +08:00
|
|
|
collectionID := rand.Int63n(1000000)
|
|
|
|
segmentID := rand.Int63n(1000000)
|
|
|
|
initTestMeta(t, node, "collection0", collectionID, segmentID)
|
|
|
|
|
2021-01-30 16:02:10 +08:00
|
|
|
// loadService and statsService
|
2021-01-12 18:03:24 +08:00
|
|
|
suffix := "-test-search" + strconv.FormatInt(rand.Int63n(1000000), 10)
|
2021-01-06 18:19:44 +08:00
|
|
|
oldSearchChannelNames := Params.SearchChannelNames
|
2021-01-12 18:03:24 +08:00
|
|
|
newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix)
|
2021-01-06 18:19:44 +08:00
|
|
|
Params.SearchChannelNames = newSearchChannelNames
|
|
|
|
|
|
|
|
oldSearchResultChannelNames := Params.SearchChannelNames
|
2021-01-12 18:03:24 +08:00
|
|
|
newSearchResultChannelNames := makeNewChannelNames(oldSearchResultChannelNames, suffix)
|
2021-01-06 18:19:44 +08:00
|
|
|
Params.SearchResultChannelNames = newSearchResultChannelNames
|
2021-01-12 18:03:24 +08:00
|
|
|
|
|
|
|
oldLoadIndexChannelNames := Params.LoadIndexChannelNames
|
|
|
|
newLoadIndexChannelNames := makeNewChannelNames(oldLoadIndexChannelNames, suffix)
|
|
|
|
Params.LoadIndexChannelNames = newLoadIndexChannelNames
|
|
|
|
|
|
|
|
oldStatsChannelName := Params.StatsChannelName
|
|
|
|
newStatsChannelNames := makeNewChannelNames([]string{oldStatsChannelName}, suffix)
|
|
|
|
Params.StatsChannelName = newStatsChannelNames[0]
|
2021-01-06 18:19:44 +08:00
|
|
|
go node.Start()
|
|
|
|
|
|
|
|
//generate insert data
|
|
|
|
const msgLength = 1000
|
|
|
|
const receiveBufSize = 1024
|
|
|
|
const DIM = 16
|
|
|
|
var insertRowBlob []*commonpb.Blob
|
|
|
|
var timestamps []uint64
|
|
|
|
var rowIDs []int64
|
|
|
|
var hashValues []uint32
|
|
|
|
for n := 0; n < msgLength; n++ {
|
|
|
|
rowData := make([]byte, 0)
|
|
|
|
for i := 0; i < DIM; i++ {
|
|
|
|
vec := make([]byte, 4)
|
|
|
|
binary.LittleEndian.PutUint32(vec, math.Float32bits(float32(n*i)))
|
|
|
|
rowData = append(rowData, vec...)
|
|
|
|
}
|
|
|
|
age := make([]byte, 4)
|
|
|
|
binary.LittleEndian.PutUint32(age, 1)
|
|
|
|
rowData = append(rowData, age...)
|
|
|
|
blob := &commonpb.Blob{
|
|
|
|
Value: rowData,
|
|
|
|
}
|
|
|
|
insertRowBlob = append(insertRowBlob, blob)
|
|
|
|
timestamps = append(timestamps, uint64(n))
|
|
|
|
rowIDs = append(rowIDs, int64(n))
|
|
|
|
hashValues = append(hashValues, uint32(n))
|
|
|
|
}
|
|
|
|
|
|
|
|
var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
HashValues: hashValues,
|
|
|
|
},
|
2021-01-18 19:32:08 +08:00
|
|
|
InsertRequest: internalpb2.InsertRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kInsert,
|
|
|
|
MsgID: 0,
|
|
|
|
Timestamp: timestamps[0],
|
|
|
|
SourceID: 0,
|
|
|
|
},
|
2021-01-06 18:19:44 +08:00
|
|
|
CollectionName: "collection0",
|
2021-01-18 19:32:08 +08:00
|
|
|
PartitionName: "default",
|
2021-01-06 18:19:44 +08:00
|
|
|
SegmentID: segmentID,
|
2021-01-18 19:32:08 +08:00
|
|
|
ChannelID: "0",
|
2021-01-06 18:19:44 +08:00
|
|
|
Timestamps: timestamps,
|
|
|
|
RowIDs: rowIDs,
|
|
|
|
RowData: insertRowBlob,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
insertMsgPack := msgstream.MsgPack{
|
|
|
|
BeginTs: 0,
|
|
|
|
EndTs: math.MaxUint64,
|
|
|
|
Msgs: []msgstream.TsMsg{insertMsg},
|
|
|
|
}
|
|
|
|
|
|
|
|
// generate timeTick
|
|
|
|
timeTickMsg := &msgstream.TimeTickMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
BeginTimestamp: 0,
|
|
|
|
EndTimestamp: 0,
|
|
|
|
HashValues: []uint32{0},
|
|
|
|
},
|
2021-01-18 19:32:08 +08:00
|
|
|
TimeTickMsg: internalpb2.TimeTickMsg{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kTimeTick,
|
|
|
|
MsgID: 0,
|
|
|
|
Timestamp: math.MaxUint64,
|
|
|
|
SourceID: 0,
|
|
|
|
},
|
2021-01-06 18:19:44 +08:00
|
|
|
},
|
|
|
|
}
|
|
|
|
timeTickMsgPack := &msgstream.MsgPack{
|
|
|
|
Msgs: []msgstream.TsMsg{timeTickMsg},
|
|
|
|
}
|
|
|
|
|
|
|
|
// pulsar produce
|
|
|
|
insertChannels := Params.InsertChannelNames
|
|
|
|
ddChannels := Params.DDChannelNames
|
|
|
|
|
2021-01-20 10:02:59 +08:00
|
|
|
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
2021-01-06 18:19:44 +08:00
|
|
|
insertStream.SetPulsarClient(Params.PulsarAddress)
|
|
|
|
insertStream.CreatePulsarProducers(insertChannels)
|
2021-01-20 10:02:59 +08:00
|
|
|
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
2021-01-06 18:19:44 +08:00
|
|
|
ddStream.SetPulsarClient(Params.PulsarAddress)
|
|
|
|
ddStream.CreatePulsarProducers(ddChannels)
|
|
|
|
|
|
|
|
var insertMsgStream msgstream.MsgStream = insertStream
|
|
|
|
insertMsgStream.Start()
|
|
|
|
var ddMsgStream msgstream.MsgStream = ddStream
|
|
|
|
ddMsgStream.Start()
|
|
|
|
|
|
|
|
err := insertMsgStream.Produce(&insertMsgPack)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
err = insertMsgStream.Broadcast(timeTickMsgPack)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
err = ddMsgStream.Broadcast(timeTickMsgPack)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
|
|
|
|
// generator searchRowData
|
|
|
|
var searchRowData []float32
|
|
|
|
for i := 0; i < DIM; i++ {
|
|
|
|
searchRowData = append(searchRowData, float32(42*i))
|
|
|
|
}
|
|
|
|
|
|
|
|
//generate search data and send search msg
|
|
|
|
dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }"
|
|
|
|
var searchRowByteData []byte
|
|
|
|
for i := range searchRowData {
|
|
|
|
vec := make([]byte, 4)
|
|
|
|
binary.LittleEndian.PutUint32(vec, math.Float32bits(searchRowData[i]))
|
|
|
|
searchRowByteData = append(searchRowByteData, vec...)
|
|
|
|
}
|
2021-01-22 09:36:18 +08:00
|
|
|
placeholderValue := milvuspb.PlaceholderValue{
|
2021-01-06 18:19:44 +08:00
|
|
|
Tag: "$0",
|
2021-01-22 09:36:18 +08:00
|
|
|
Type: milvuspb.PlaceholderType_VECTOR_FLOAT,
|
2021-01-06 18:19:44 +08:00
|
|
|
Values: [][]byte{searchRowByteData},
|
|
|
|
}
|
2021-01-22 09:36:18 +08:00
|
|
|
placeholderGroup := milvuspb.PlaceholderGroup{
|
|
|
|
Placeholders: []*milvuspb.PlaceholderValue{&placeholderValue},
|
2021-01-06 18:19:44 +08:00
|
|
|
}
|
|
|
|
placeGroupByte, err := proto.Marshal(&placeholderGroup)
|
|
|
|
if err != nil {
|
|
|
|
log.Print("marshal placeholderGroup failed")
|
|
|
|
}
|
2021-01-22 09:36:18 +08:00
|
|
|
query := milvuspb.SearchRequest{
|
2021-01-06 18:19:44 +08:00
|
|
|
CollectionName: "collection0",
|
2021-01-22 09:36:18 +08:00
|
|
|
PartitionNames: []string{"default"},
|
2021-01-06 18:19:44 +08:00
|
|
|
Dsl: dslString,
|
|
|
|
PlaceholderGroup: placeGroupByte,
|
|
|
|
}
|
|
|
|
queryByte, err := proto.Marshal(&query)
|
|
|
|
if err != nil {
|
|
|
|
log.Print("marshal query failed")
|
|
|
|
}
|
|
|
|
blob := commonpb.Blob{
|
|
|
|
Value: queryByte,
|
|
|
|
}
|
|
|
|
fn := func(n int64) *msgstream.MsgPack {
|
|
|
|
searchMsg := &msgstream.SearchMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
HashValues: []uint32{0},
|
|
|
|
},
|
2021-01-18 19:32:08 +08:00
|
|
|
SearchRequest: internalpb2.SearchRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kSearch,
|
|
|
|
MsgID: n,
|
|
|
|
Timestamp: uint64(msgLength),
|
|
|
|
SourceID: 1,
|
|
|
|
},
|
|
|
|
ResultChannelID: "0",
|
2021-01-06 18:19:44 +08:00
|
|
|
Query: &blob,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
return &msgstream.MsgPack{
|
|
|
|
Msgs: []msgstream.TsMsg{searchMsg},
|
|
|
|
}
|
|
|
|
}
|
2021-01-20 10:02:59 +08:00
|
|
|
searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
2021-01-06 18:19:44 +08:00
|
|
|
searchStream.SetPulsarClient(Params.PulsarAddress)
|
|
|
|
searchStream.CreatePulsarProducers(newSearchChannelNames)
|
|
|
|
searchStream.Start()
|
|
|
|
err = searchStream.Produce(fn(1))
|
|
|
|
assert.NoError(t, err)
|
|
|
|
|
|
|
|
//get search result
|
2021-01-20 10:02:59 +08:00
|
|
|
searchResultStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
2021-01-06 18:19:44 +08:00
|
|
|
searchResultStream.SetPulsarClient(Params.PulsarAddress)
|
2021-01-20 10:02:59 +08:00
|
|
|
unmarshalDispatcher := util.NewUnmarshalDispatcher()
|
2021-01-06 18:19:44 +08:00
|
|
|
searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult", unmarshalDispatcher, receiveBufSize)
|
|
|
|
searchResultStream.Start()
|
|
|
|
searchResult := searchResultStream.Consume()
|
|
|
|
assert.NotNil(t, searchResult)
|
2021-01-22 09:36:18 +08:00
|
|
|
unMarshaledHit := milvuspb.Hits{}
|
2021-01-06 18:19:44 +08:00
|
|
|
err = proto.Unmarshal(searchResult.Msgs[0].(*msgstream.SearchResultMsg).Hits[0], &unMarshaledHit)
|
|
|
|
assert.Nil(t, err)
|
2020-12-24 20:55:40 +08:00
|
|
|
|
|
|
|
// gen load index message pack
|
2020-12-29 14:43:40 +08:00
|
|
|
indexParams := make(map[string]string)
|
|
|
|
indexParams["index_type"] = "IVF_PQ"
|
|
|
|
indexParams["index_mode"] = "cpu"
|
|
|
|
indexParams["dim"] = "16"
|
|
|
|
indexParams["k"] = "10"
|
|
|
|
indexParams["nlist"] = "100"
|
2021-01-06 18:19:44 +08:00
|
|
|
indexParams["nprobe"] = "10"
|
2020-12-29 14:43:40 +08:00
|
|
|
indexParams["m"] = "4"
|
|
|
|
indexParams["nbits"] = "8"
|
|
|
|
indexParams["metric_type"] = "L2"
|
|
|
|
indexParams["SLICE_SIZE"] = "4"
|
|
|
|
|
|
|
|
var indexParamsKV []*commonpb.KeyValuePair
|
|
|
|
for key, value := range indexParams {
|
|
|
|
indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
|
|
|
|
Key: key,
|
|
|
|
Value: value,
|
|
|
|
})
|
2020-12-24 20:55:40 +08:00
|
|
|
}
|
|
|
|
|
2020-12-29 14:43:40 +08:00
|
|
|
// generator index
|
|
|
|
typeParams := make(map[string]string)
|
|
|
|
typeParams["dim"] = "16"
|
|
|
|
var indexRowData []float32
|
2021-01-06 18:19:44 +08:00
|
|
|
for n := 0; n < msgLength; n++ {
|
|
|
|
for i := 0; i < DIM; i++ {
|
|
|
|
indexRowData = append(indexRowData, float32(n*i))
|
2020-12-29 14:43:40 +08:00
|
|
|
}
|
2020-12-24 20:55:40 +08:00
|
|
|
}
|
2021-01-15 14:38:36 +08:00
|
|
|
index, err := indexnode.NewCIndex(typeParams, indexParams)
|
2021-01-06 18:19:44 +08:00
|
|
|
assert.Nil(t, err)
|
2020-12-29 14:43:40 +08:00
|
|
|
err = index.BuildFloatVecIndexWithoutIds(indexRowData)
|
|
|
|
assert.Equal(t, err, nil)
|
|
|
|
|
2021-01-06 14:45:50 +08:00
|
|
|
option := &minioKV.Option{
|
|
|
|
Address: Params.MinioEndPoint,
|
|
|
|
AccessKeyID: Params.MinioAccessKeyID,
|
|
|
|
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
|
|
|
UseSSL: Params.MinioUseSSLStr,
|
|
|
|
BucketName: Params.MinioBucketName,
|
|
|
|
CreateBucket: true,
|
|
|
|
}
|
|
|
|
|
|
|
|
minioKV, err := minioKV.NewMinIOKV(node.queryNodeLoopCtx, option)
|
2020-12-29 14:43:40 +08:00
|
|
|
assert.Equal(t, err, nil)
|
2021-01-06 18:19:44 +08:00
|
|
|
//save index to minio
|
|
|
|
binarySet, err := index.Serialize()
|
|
|
|
assert.Equal(t, err, nil)
|
2020-12-29 14:43:40 +08:00
|
|
|
indexPaths := make([]string, 0)
|
2021-01-29 15:22:24 +08:00
|
|
|
var indexCodec storage.IndexCodec
|
|
|
|
binarySet, err = indexCodec.Serialize(binarySet, indexParams)
|
|
|
|
assert.NoError(t, err)
|
2020-12-29 14:43:40 +08:00
|
|
|
for _, index := range binarySet {
|
2021-01-06 18:19:44 +08:00
|
|
|
path := strconv.Itoa(int(segmentID)) + "/" + index.Key
|
|
|
|
indexPaths = append(indexPaths, path)
|
|
|
|
minioKV.Save(path, string(index.Value))
|
2020-12-24 20:55:40 +08:00
|
|
|
}
|
|
|
|
|
2021-01-06 18:19:44 +08:00
|
|
|
//test index search result
|
|
|
|
indexResult, err := index.QueryOnFloatVecIndexWithParam(searchRowData, indexParams)
|
|
|
|
assert.Equal(t, err, nil)
|
|
|
|
|
2020-12-29 14:43:40 +08:00
|
|
|
// create loadIndexClient
|
2021-01-04 10:13:01 +08:00
|
|
|
fieldID := UniqueID(100)
|
2020-12-24 20:55:40 +08:00
|
|
|
loadIndexChannelNames := Params.LoadIndexChannelNames
|
2021-01-15 15:28:54 +08:00
|
|
|
client := client.NewQueryNodeClient(node.queryNodeLoopCtx, Params.PulsarAddress, loadIndexChannelNames)
|
2021-01-04 10:13:01 +08:00
|
|
|
client.LoadIndex(indexPaths, segmentID, fieldID, "vec", indexParams)
|
2020-12-24 20:55:40 +08:00
|
|
|
|
|
|
|
// init message stream consumer and do checks
|
2021-01-20 10:02:59 +08:00
|
|
|
statsMs := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
|
2021-01-06 18:19:44 +08:00
|
|
|
statsMs.SetPulsarClient(Params.PulsarAddress)
|
2021-01-20 10:02:59 +08:00
|
|
|
statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
|
2020-12-24 20:55:40 +08:00
|
|
|
statsMs.Start()
|
|
|
|
|
2020-12-26 14:16:51 +08:00
|
|
|
findFiledStats := false
|
|
|
|
for {
|
|
|
|
receiveMsg := msgstream.MsgStream(statsMs).Consume()
|
|
|
|
assert.NotNil(t, receiveMsg)
|
|
|
|
assert.NotEqual(t, len(receiveMsg.Msgs), 0)
|
|
|
|
|
|
|
|
for _, msg := range receiveMsg.Msgs {
|
|
|
|
statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg)
|
|
|
|
if statsMsg.FieldStats == nil || len(statsMsg.FieldStats) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
findFiledStats = true
|
|
|
|
assert.Equal(t, ok, true)
|
|
|
|
assert.Equal(t, len(statsMsg.FieldStats), 1)
|
|
|
|
fieldStats0 := statsMsg.FieldStats[0]
|
2021-01-04 10:13:01 +08:00
|
|
|
assert.Equal(t, fieldStats0.FieldID, fieldID)
|
2020-12-26 14:16:51 +08:00
|
|
|
assert.Equal(t, fieldStats0.CollectionID, collectionID)
|
|
|
|
assert.Equal(t, len(fieldStats0.IndexStats), 1)
|
|
|
|
indexStats0 := fieldStats0.IndexStats[0]
|
|
|
|
params := indexStats0.IndexParams
|
|
|
|
// sort index params by key
|
2020-12-29 14:43:40 +08:00
|
|
|
sort.Slice(indexParamsKV, func(i, j int) bool { return indexParamsKV[i].Key < indexParamsKV[j].Key })
|
2021-01-30 16:02:10 +08:00
|
|
|
indexEqual := node.loadService.indexParamsEqual(params, indexParamsKV)
|
2020-12-26 14:16:51 +08:00
|
|
|
assert.Equal(t, indexEqual, true)
|
|
|
|
}
|
|
|
|
|
|
|
|
if findFiledStats {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2020-12-24 20:55:40 +08:00
|
|
|
|
2021-01-06 18:19:44 +08:00
|
|
|
err = searchStream.Produce(fn(2))
|
|
|
|
assert.NoError(t, err)
|
|
|
|
searchResult = searchResultStream.Consume()
|
|
|
|
assert.NotNil(t, searchResult)
|
|
|
|
err = proto.Unmarshal(searchResult.Msgs[0].(*msgstream.SearchResultMsg).Hits[0], &unMarshaledHit)
|
|
|
|
assert.Nil(t, err)
|
|
|
|
|
|
|
|
idsIndex := indexResult.IDs()
|
|
|
|
idsSegment := unMarshaledHit.IDs
|
|
|
|
assert.Equal(t, len(idsIndex), len(idsSegment))
|
|
|
|
for i := 0; i < len(idsIndex); i++ {
|
|
|
|
assert.Equal(t, idsIndex[i], idsSegment[i])
|
|
|
|
}
|
|
|
|
Params.SearchChannelNames = oldSearchChannelNames
|
|
|
|
Params.SearchResultChannelNames = oldSearchResultChannelNames
|
2021-01-12 18:03:24 +08:00
|
|
|
Params.LoadIndexChannelNames = oldLoadIndexChannelNames
|
|
|
|
Params.StatsChannelName = oldStatsChannelName
|
2021-01-06 18:19:44 +08:00
|
|
|
fmt.Println("loadIndex floatVector test Done!")
|
|
|
|
|
2020-12-26 14:16:51 +08:00
|
|
|
defer assert.Equal(t, findFiledStats, true)
|
2020-12-24 20:55:40 +08:00
|
|
|
<-node.queryNodeLoopCtx.Done()
|
2021-01-21 15:20:23 +08:00
|
|
|
node.Stop()
|
2020-12-24 20:55:40 +08:00
|
|
|
}
|
2021-01-12 18:03:24 +08:00
|
|
|
|
2021-01-30 16:02:10 +08:00
|
|
|
func TestLoadService_LoadIndex_BinaryVector(t *testing.T) {
|
2021-01-15 15:28:54 +08:00
|
|
|
node := newQueryNodeMock()
|
2021-01-12 18:03:24 +08:00
|
|
|
collectionID := rand.Int63n(1000000)
|
|
|
|
segmentID := rand.Int63n(1000000)
|
|
|
|
initTestMeta(t, node, "collection0", collectionID, segmentID, true)
|
|
|
|
|
2021-01-30 16:02:10 +08:00
|
|
|
// loadService and statsService
|
2021-01-12 18:03:24 +08:00
|
|
|
suffix := "-test-search-binary" + strconv.FormatInt(rand.Int63n(1000000), 10)
|
|
|
|
oldSearchChannelNames := Params.SearchChannelNames
|
|
|
|
newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix)
|
|
|
|
Params.SearchChannelNames = newSearchChannelNames
|
|
|
|
|
|
|
|
oldSearchResultChannelNames := Params.SearchChannelNames
|
|
|
|
newSearchResultChannelNames := makeNewChannelNames(oldSearchResultChannelNames, suffix)
|
|
|
|
Params.SearchResultChannelNames = newSearchResultChannelNames
|
|
|
|
|
|
|
|
oldLoadIndexChannelNames := Params.LoadIndexChannelNames
|
|
|
|
newLoadIndexChannelNames := makeNewChannelNames(oldLoadIndexChannelNames, suffix)
|
|
|
|
Params.LoadIndexChannelNames = newLoadIndexChannelNames
|
|
|
|
|
|
|
|
oldStatsChannelName := Params.StatsChannelName
|
|
|
|
newStatsChannelNames := makeNewChannelNames([]string{oldStatsChannelName}, suffix)
|
|
|
|
Params.StatsChannelName = newStatsChannelNames[0]
|
|
|
|
go node.Start()
|
|
|
|
|
|
|
|
const msgLength = 1000
|
|
|
|
const receiveBufSize = 1024
|
|
|
|
const DIM = 128
|
|
|
|
|
|
|
|
// generator index data
|
|
|
|
var indexRowData []byte
|
|
|
|
for n := 0; n < msgLength; n++ {
|
|
|
|
for i := 0; i < DIM/8; i++ {
|
|
|
|
indexRowData = append(indexRowData, byte(rand.Intn(8)))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
//generator insert data
|
|
|
|
var insertRowBlob []*commonpb.Blob
|
|
|
|
var timestamps []uint64
|
|
|
|
var rowIDs []int64
|
|
|
|
var hashValues []uint32
|
|
|
|
offset := 0
|
|
|
|
for n := 0; n < msgLength; n++ {
|
|
|
|
rowData := make([]byte, 0)
|
|
|
|
rowData = append(rowData, indexRowData[offset:offset+(DIM/8)]...)
|
|
|
|
offset += DIM / 8
|
|
|
|
age := make([]byte, 4)
|
|
|
|
binary.LittleEndian.PutUint32(age, 1)
|
|
|
|
rowData = append(rowData, age...)
|
|
|
|
blob := &commonpb.Blob{
|
|
|
|
Value: rowData,
|
|
|
|
}
|
|
|
|
insertRowBlob = append(insertRowBlob, blob)
|
|
|
|
timestamps = append(timestamps, uint64(n))
|
|
|
|
rowIDs = append(rowIDs, int64(n))
|
|
|
|
hashValues = append(hashValues, uint32(n))
|
|
|
|
}
|
|
|
|
|
|
|
|
var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
HashValues: hashValues,
|
|
|
|
},
|
2021-01-18 19:32:08 +08:00
|
|
|
InsertRequest: internalpb2.InsertRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kInsert,
|
|
|
|
MsgID: 0,
|
|
|
|
Timestamp: timestamps[0],
|
|
|
|
SourceID: 0,
|
|
|
|
},
|
2021-01-12 18:03:24 +08:00
|
|
|
CollectionName: "collection0",
|
2021-01-18 19:32:08 +08:00
|
|
|
PartitionName: "default",
|
2021-01-12 18:03:24 +08:00
|
|
|
SegmentID: segmentID,
|
2021-01-18 19:32:08 +08:00
|
|
|
ChannelID: "0",
|
2021-01-12 18:03:24 +08:00
|
|
|
Timestamps: timestamps,
|
|
|
|
RowIDs: rowIDs,
|
|
|
|
RowData: insertRowBlob,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
insertMsgPack := msgstream.MsgPack{
|
|
|
|
BeginTs: 0,
|
|
|
|
EndTs: math.MaxUint64,
|
|
|
|
Msgs: []msgstream.TsMsg{insertMsg},
|
|
|
|
}
|
|
|
|
|
|
|
|
// generate timeTick
|
|
|
|
timeTickMsg := &msgstream.TimeTickMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
BeginTimestamp: 0,
|
|
|
|
EndTimestamp: 0,
|
|
|
|
HashValues: []uint32{0},
|
|
|
|
},
|
2021-01-18 19:32:08 +08:00
|
|
|
TimeTickMsg: internalpb2.TimeTickMsg{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kTimeTick,
|
|
|
|
MsgID: 0,
|
|
|
|
Timestamp: math.MaxUint64,
|
|
|
|
SourceID: 0,
|
|
|
|
},
|
2021-01-12 18:03:24 +08:00
|
|
|
},
|
|
|
|
}
|
|
|
|
timeTickMsgPack := &msgstream.MsgPack{
|
|
|
|
Msgs: []msgstream.TsMsg{timeTickMsg},
|
|
|
|
}
|
|
|
|
|
|
|
|
// pulsar produce
|
|
|
|
insertChannels := Params.InsertChannelNames
|
|
|
|
ddChannels := Params.DDChannelNames
|
|
|
|
|
2021-01-20 10:02:59 +08:00
|
|
|
insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
2021-01-12 18:03:24 +08:00
|
|
|
insertStream.SetPulsarClient(Params.PulsarAddress)
|
|
|
|
insertStream.CreatePulsarProducers(insertChannels)
|
2021-01-20 10:02:59 +08:00
|
|
|
ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
2021-01-12 18:03:24 +08:00
|
|
|
ddStream.SetPulsarClient(Params.PulsarAddress)
|
|
|
|
ddStream.CreatePulsarProducers(ddChannels)
|
|
|
|
|
|
|
|
var insertMsgStream msgstream.MsgStream = insertStream
|
|
|
|
insertMsgStream.Start()
|
|
|
|
var ddMsgStream msgstream.MsgStream = ddStream
|
|
|
|
ddMsgStream.Start()
|
|
|
|
|
|
|
|
err := insertMsgStream.Produce(&insertMsgPack)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
err = insertMsgStream.Broadcast(timeTickMsgPack)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
err = ddMsgStream.Broadcast(timeTickMsgPack)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
|
|
|
|
//generate search data and send search msg
|
|
|
|
searchRowData := indexRowData[42*(DIM/8) : 43*(DIM/8)]
|
|
|
|
dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"JACCARD\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }"
|
2021-01-22 09:36:18 +08:00
|
|
|
placeholderValue := milvuspb.PlaceholderValue{
|
2021-01-12 18:03:24 +08:00
|
|
|
Tag: "$0",
|
2021-01-22 09:36:18 +08:00
|
|
|
Type: milvuspb.PlaceholderType_VECTOR_BINARY,
|
2021-01-12 18:03:24 +08:00
|
|
|
Values: [][]byte{searchRowData},
|
|
|
|
}
|
2021-01-22 09:36:18 +08:00
|
|
|
placeholderGroup := milvuspb.PlaceholderGroup{
|
|
|
|
Placeholders: []*milvuspb.PlaceholderValue{&placeholderValue},
|
2021-01-12 18:03:24 +08:00
|
|
|
}
|
|
|
|
placeGroupByte, err := proto.Marshal(&placeholderGroup)
|
|
|
|
if err != nil {
|
|
|
|
log.Print("marshal placeholderGroup failed")
|
|
|
|
}
|
2021-01-22 09:36:18 +08:00
|
|
|
query := milvuspb.SearchRequest{
|
2021-01-12 18:03:24 +08:00
|
|
|
CollectionName: "collection0",
|
2021-01-22 09:36:18 +08:00
|
|
|
PartitionNames: []string{"default"},
|
2021-01-12 18:03:24 +08:00
|
|
|
Dsl: dslString,
|
|
|
|
PlaceholderGroup: placeGroupByte,
|
|
|
|
}
|
|
|
|
queryByte, err := proto.Marshal(&query)
|
|
|
|
if err != nil {
|
|
|
|
log.Print("marshal query failed")
|
|
|
|
}
|
|
|
|
blob := commonpb.Blob{
|
|
|
|
Value: queryByte,
|
|
|
|
}
|
|
|
|
fn := func(n int64) *msgstream.MsgPack {
|
|
|
|
searchMsg := &msgstream.SearchMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
HashValues: []uint32{0},
|
|
|
|
},
|
2021-01-18 19:32:08 +08:00
|
|
|
SearchRequest: internalpb2.SearchRequest{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_kSearch,
|
|
|
|
MsgID: n,
|
|
|
|
Timestamp: uint64(msgLength),
|
|
|
|
SourceID: 1,
|
|
|
|
},
|
|
|
|
ResultChannelID: "0",
|
2021-01-12 18:03:24 +08:00
|
|
|
Query: &blob,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
return &msgstream.MsgPack{
|
|
|
|
Msgs: []msgstream.TsMsg{searchMsg},
|
|
|
|
}
|
|
|
|
}
|
2021-01-20 10:02:59 +08:00
|
|
|
searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
2021-01-12 18:03:24 +08:00
|
|
|
searchStream.SetPulsarClient(Params.PulsarAddress)
|
|
|
|
searchStream.CreatePulsarProducers(newSearchChannelNames)
|
|
|
|
searchStream.Start()
|
|
|
|
err = searchStream.Produce(fn(1))
|
|
|
|
assert.NoError(t, err)
|
|
|
|
|
|
|
|
//get search result
|
2021-01-20 10:02:59 +08:00
|
|
|
searchResultStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
2021-01-12 18:03:24 +08:00
|
|
|
searchResultStream.SetPulsarClient(Params.PulsarAddress)
|
2021-01-20 10:02:59 +08:00
|
|
|
unmarshalDispatcher := util.NewUnmarshalDispatcher()
|
2021-01-12 18:03:24 +08:00
|
|
|
searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult2", unmarshalDispatcher, receiveBufSize)
|
|
|
|
searchResultStream.Start()
|
|
|
|
searchResult := searchResultStream.Consume()
|
|
|
|
assert.NotNil(t, searchResult)
|
2021-01-22 09:36:18 +08:00
|
|
|
unMarshaledHit := milvuspb.Hits{}
|
2021-01-12 18:03:24 +08:00
|
|
|
err = proto.Unmarshal(searchResult.Msgs[0].(*msgstream.SearchResultMsg).Hits[0], &unMarshaledHit)
|
|
|
|
assert.Nil(t, err)
|
|
|
|
|
|
|
|
// gen load index message pack
|
|
|
|
indexParams := make(map[string]string)
|
|
|
|
indexParams["index_type"] = "BIN_IVF_FLAT"
|
|
|
|
indexParams["index_mode"] = "cpu"
|
|
|
|
indexParams["dim"] = "128"
|
|
|
|
indexParams["k"] = "10"
|
|
|
|
indexParams["nlist"] = "100"
|
|
|
|
indexParams["nprobe"] = "10"
|
|
|
|
indexParams["m"] = "4"
|
|
|
|
indexParams["nbits"] = "8"
|
|
|
|
indexParams["metric_type"] = "JACCARD"
|
|
|
|
indexParams["SLICE_SIZE"] = "4"
|
|
|
|
|
|
|
|
var indexParamsKV []*commonpb.KeyValuePair
|
|
|
|
for key, value := range indexParams {
|
|
|
|
indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
|
|
|
|
Key: key,
|
|
|
|
Value: value,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// generator index
|
|
|
|
typeParams := make(map[string]string)
|
|
|
|
typeParams["dim"] = "128"
|
2021-01-15 14:38:36 +08:00
|
|
|
index, err := indexnode.NewCIndex(typeParams, indexParams)
|
2021-01-12 18:03:24 +08:00
|
|
|
assert.Nil(t, err)
|
|
|
|
err = index.BuildBinaryVecIndexWithoutIds(indexRowData)
|
|
|
|
assert.Equal(t, err, nil)
|
|
|
|
|
|
|
|
option := &minioKV.Option{
|
|
|
|
Address: Params.MinioEndPoint,
|
|
|
|
AccessKeyID: Params.MinioAccessKeyID,
|
|
|
|
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
|
|
|
UseSSL: Params.MinioUseSSLStr,
|
|
|
|
BucketName: Params.MinioBucketName,
|
|
|
|
CreateBucket: true,
|
|
|
|
}
|
|
|
|
|
|
|
|
minioKV, err := minioKV.NewMinIOKV(node.queryNodeLoopCtx, option)
|
|
|
|
assert.Equal(t, err, nil)
|
|
|
|
//save index to minio
|
|
|
|
binarySet, err := index.Serialize()
|
|
|
|
assert.Equal(t, err, nil)
|
2021-01-29 15:22:24 +08:00
|
|
|
var indexCodec storage.IndexCodec
|
|
|
|
binarySet, err = indexCodec.Serialize(binarySet, indexParams)
|
|
|
|
assert.NoError(t, err)
|
2021-01-12 18:03:24 +08:00
|
|
|
indexPaths := make([]string, 0)
|
|
|
|
for _, index := range binarySet {
|
|
|
|
path := strconv.Itoa(int(segmentID)) + "/" + index.Key
|
|
|
|
indexPaths = append(indexPaths, path)
|
|
|
|
minioKV.Save(path, string(index.Value))
|
|
|
|
}
|
|
|
|
|
|
|
|
//test index search result
|
|
|
|
indexResult, err := index.QueryOnBinaryVecIndexWithParam(searchRowData, indexParams)
|
|
|
|
assert.Equal(t, err, nil)
|
|
|
|
|
|
|
|
// create loadIndexClient
|
|
|
|
fieldID := UniqueID(100)
|
|
|
|
loadIndexChannelNames := Params.LoadIndexChannelNames
|
2021-01-15 15:28:54 +08:00
|
|
|
client := client.NewQueryNodeClient(node.queryNodeLoopCtx, Params.PulsarAddress, loadIndexChannelNames)
|
2021-01-12 18:03:24 +08:00
|
|
|
client.LoadIndex(indexPaths, segmentID, fieldID, "vec", indexParams)
|
|
|
|
|
|
|
|
// init message stream consumer and do checks
|
2021-01-20 10:02:59 +08:00
|
|
|
statsMs := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
|
2021-01-12 18:03:24 +08:00
|
|
|
statsMs.SetPulsarClient(Params.PulsarAddress)
|
2021-01-20 10:02:59 +08:00
|
|
|
statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
|
2021-01-12 18:03:24 +08:00
|
|
|
statsMs.Start()
|
|
|
|
|
|
|
|
findFiledStats := false
|
|
|
|
for {
|
|
|
|
receiveMsg := msgstream.MsgStream(statsMs).Consume()
|
|
|
|
assert.NotNil(t, receiveMsg)
|
|
|
|
assert.NotEqual(t, len(receiveMsg.Msgs), 0)
|
|
|
|
|
|
|
|
for _, msg := range receiveMsg.Msgs {
|
|
|
|
statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg)
|
|
|
|
if statsMsg.FieldStats == nil || len(statsMsg.FieldStats) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
findFiledStats = true
|
|
|
|
assert.Equal(t, ok, true)
|
|
|
|
assert.Equal(t, len(statsMsg.FieldStats), 1)
|
|
|
|
fieldStats0 := statsMsg.FieldStats[0]
|
|
|
|
assert.Equal(t, fieldStats0.FieldID, fieldID)
|
|
|
|
assert.Equal(t, fieldStats0.CollectionID, collectionID)
|
|
|
|
assert.Equal(t, len(fieldStats0.IndexStats), 1)
|
|
|
|
indexStats0 := fieldStats0.IndexStats[0]
|
|
|
|
params := indexStats0.IndexParams
|
|
|
|
// sort index params by key
|
|
|
|
sort.Slice(indexParamsKV, func(i, j int) bool { return indexParamsKV[i].Key < indexParamsKV[j].Key })
|
2021-01-30 16:02:10 +08:00
|
|
|
indexEqual := node.loadService.indexParamsEqual(params, indexParamsKV)
|
2021-01-12 18:03:24 +08:00
|
|
|
assert.Equal(t, indexEqual, true)
|
|
|
|
}
|
|
|
|
|
|
|
|
if findFiledStats {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
err = searchStream.Produce(fn(2))
|
|
|
|
assert.NoError(t, err)
|
|
|
|
searchResult = searchResultStream.Consume()
|
|
|
|
assert.NotNil(t, searchResult)
|
|
|
|
err = proto.Unmarshal(searchResult.Msgs[0].(*msgstream.SearchResultMsg).Hits[0], &unMarshaledHit)
|
|
|
|
assert.Nil(t, err)
|
|
|
|
|
|
|
|
idsIndex := indexResult.IDs()
|
|
|
|
idsSegment := unMarshaledHit.IDs
|
|
|
|
assert.Equal(t, len(idsIndex), len(idsSegment))
|
|
|
|
for i := 0; i < len(idsIndex); i++ {
|
|
|
|
assert.Equal(t, idsIndex[i], idsSegment[i])
|
|
|
|
}
|
|
|
|
Params.SearchChannelNames = oldSearchChannelNames
|
|
|
|
Params.SearchResultChannelNames = oldSearchResultChannelNames
|
|
|
|
Params.LoadIndexChannelNames = oldLoadIndexChannelNames
|
|
|
|
Params.StatsChannelName = oldStatsChannelName
|
|
|
|
fmt.Println("loadIndex binaryVector test Done!")
|
|
|
|
|
|
|
|
defer assert.Equal(t, findFiledStats, true)
|
|
|
|
<-node.queryNodeLoopCtx.Done()
|
2021-01-21 15:20:23 +08:00
|
|
|
node.Stop()
|
2021-01-12 18:03:24 +08:00
|
|
|
}
|