Signed-off-by: GuoRentong <rentong.guo@zilliz.com>
This commit is contained in:
GuoRentong 2020-10-29 20:42:47 +08:00 committed by yefu.chen
parent 6eee7d1342
commit f7bd6ebfad
12 changed files with 898 additions and 33 deletions

View File

@ -25,7 +25,6 @@ services:
command: &ubuntu-command >
/bin/bash -c "
/milvus-distributed/scripts/core_build.sh -u && \
go build -o /milvus-distributed/cmd/writer/writer /milvus-distributed/cmd/writer/writer.go && \
go build -o /milvus-distributed/cmd/reader/reader /milvus-distributed/cmd/reader/reader.go && \
go build -o /milvus-distributed/cmd/master/master /milvus-distributed/cmd/master/main.go && \
go build -o /milvus-distributed/cmd/proxy/proxy /milvus-distributed/cmd/proxy/proxy.go"

View File

@ -224,12 +224,11 @@ func (tso *timestampOracle) loadTimestamp() error
#### 4.2 Timestamp Allocator
```go
type TimestampAllocator struct {
Alloc(count uint32) ([]Timestamp, error)
}
type TimestampAllocator struct {}
func (allocator *TimestampAllocator) Start() error
func (allocator *TimestampAllocator) Close() error
func (allocator *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error)
func NewTimestampAllocator() *TimestampAllocator
```
@ -303,8 +302,15 @@ func (gparams *GlobalParamsTable) Remove(key string) error
``` go
type MsgType uint32
const {
USER_REQUEST MsgType = 1
TIME_TICK = 2
kInsert MsgType = 400
kDelete MsgType = 401
kSearch MsgType = 500
KSearchResult MsgType = 1000
kSegStatistics MsgType = 1100
kTimeTick MsgType = 1200
kTimeSync MsgType = 1201
}
type TsMsg interface {
@ -370,24 +376,105 @@ func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick
#### 5.4 ID Allocator
#### 5.4 Time Ticked Flow Graph
###### 5.4.1 Flow Graph States
```go
type IdAllocator struct {
Alloc(count uint32) ([]int64, error)
type flowGraphStates struct {
startTick Timestamp
numActiveTasks map[string]int32
numCompletedTasks map[string]int64
}
```
func (allocator *IdAllocator) Start() error
func (allocator *IdAllocator) Close() error
###### 5.4.2 Message
func NewIdAllocator() *IdAllocator
```go
type Msg interface {
TimeTick() Timestamp
DownStreamNodeIdx() int32
}
```
###### 5.4.3 Node
```go
type Node interface {
Name() string
MaxQueueLength() int32
MaxParallelism() int32
SetPipelineStates(states *flowGraphStates)
Operate([]*Msg) []*Msg
}
```
#### 5.4 KV
```go
type baseNode struct {
Name string
maxQueueLength int32
maxParallelism int32
graphStates *flowGraphStates
}
func (node *baseNode) MaxQueueLength() int32
func (node *baseNode) MaxParallelism() int32
func (node *baseNode) SetMaxQueueLength(n int32)
func (node *baseNode) SetMaxParallelism(n int32)
func (node *baseNode) SetPipelineStates(states *flowGraphStates)
```
###### 5.4.1 KV Base
###### 5.4.4 Flow Graph
```go
type nodeCtx struct {
node *Node
inputChans [](*chan *Msg)
outputChans [](*chan *Msg)
inputMsgs [](*Msg List)
downstreams []*nodeCtx
}
func (nodeCtx *nodeCtx) Start(ctx context.Context) error
```
*Start()* will enter a loop. In each iteration, it tries to collect input messges from *inputChan*, then prepare node's input. When input is ready, it will trigger *node.Operate*. When *node.Operate* returns, it sends the returned *Msg* to *outputChans*, which connects to the downstreams' *inputChans*.
```go
type TimeTickedFlowGraph struct {
states *flowGraphStates
nodeCtx map[string]*nodeCtx
}
func (*pipeline TimeTickedFlowGraph) AddNode(node *Node)
func (*pipeline TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []string)
func (*pipeline TimeTickedFlowGraph) Start() error
func (*pipeline TimeTickedFlowGraph) Close() error
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph
```
#### 5.5 ID Allocator
```go
type IdAllocator struct {
}
func (allocator *IdAllocator) Start() error
func (allocator *IdAllocator) Close() error
func (allocator *IdAllocator) Alloc(count uint32) ([]int64, error)
func NewIdAllocator(ctx context.Context) *IdAllocator
```
#### 5.6 KV
###### 5.6.1 KV Base
```go
type KVBase interface {
@ -409,7 +496,7 @@ type KVBase interface {
###### 5.4.2 Etcd KV
###### 5.6.2 Etcd KV
```go
type EtcdKV struct {
@ -854,6 +941,8 @@ type task interface {
}
```
A task example is as follows. In this example, we wrap a CreateCollectionRequest (a proto) as a createCollectionTask. The wrapper need to contain task interfaces.
``` go

View File

@ -0,0 +1,194 @@
package msgstream
import (
"github.com/golang/protobuf/proto"
commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type TsMsgMarshaler interface {
Marshal(input *TsMsg) ([]byte, commonPb.Status)
Unmarshal(input []byte) (*TsMsg, commonPb.Status)
}
func GetMarshalers(inputMsgType MsgType, outputMsgType MsgType) (*TsMsgMarshaler, *TsMsgMarshaler) {
return GetMarshaler(inputMsgType), GetMarshaler(outputMsgType)
}
func GetMarshaler(MsgType MsgType) *TsMsgMarshaler {
switch MsgType {
case kInsert:
insertMarshaler := &InsertMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = insertMarshaler
return &tsMsgMarshaller
case kDelete:
deleteMarshaler := &DeleteMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = deleteMarshaler
return &tsMsgMarshaller
case kSearch:
searchMarshaler := &SearchMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = searchMarshaler
return &tsMsgMarshaller
case kSearchResult:
searchResultMarshler := &SearchResultMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = searchResultMarshler
return &tsMsgMarshaller
case kTimeSync:
timeSyncMarshaler := &TimeSyncMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = timeSyncMarshaler
return &tsMsgMarshaller
default:
return nil
}
}
//////////////////////////////////////Insert///////////////////////////////////////////////
type InsertMarshaler struct{}
func (im *InsertMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
insertTask := (*input).(InsertTask)
insertRequest := &insertTask.InsertRequest
mb, err := proto.Marshal(insertRequest)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (im *InsertMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
insertTask := InsertTask{insertRequest}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = insertTask
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
/////////////////////////////////////Delete//////////////////////////////////////////////
type DeleteMarshaler struct{}
func (dm *DeleteMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
deleteTask := (*input).(DeleteTask)
deleteRequest := &deleteTask.DeleteRequest
mb, err := proto.Marshal(deleteRequest)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (dm *DeleteMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
deleteRequest := internalPb.DeleteRequest{}
err := proto.Unmarshal(input, &deleteRequest)
deleteTask := DeleteTask{deleteRequest}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = deleteTask
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
/////////////////////////////////////Search///////////////////////////////////////////////
type SearchMarshaler struct{}
func (sm *SearchMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
searchTask := (*input).(SearchTask)
searchRequest := &searchTask.SearchRequest
mb, err := proto.Marshal(searchRequest)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (sm *SearchMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
searchRequest := internalPb.SearchRequest{}
err := proto.Unmarshal(input, &searchRequest)
searchTask := SearchTask{searchRequest}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = searchTask
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
/////////////////////////////////////SearchResult///////////////////////////////////////////////
type SearchResultMarshaler struct{}
func (srm *SearchResultMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
searchResultTask := (*input).(SearchResultTask)
searchResult := &searchResultTask.SearchResult
mb, err := proto.Marshal(searchResult)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (srm *SearchResultMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
searchResult := internalPb.SearchResult{}
err := proto.Unmarshal(input, &searchResult)
searchResultTask := SearchResultTask{searchResult}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = searchResultTask
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
/////////////////////////////////////TimeSync///////////////////////////////////////////////
type TimeSyncMarshaler struct{}
func (tm *TimeSyncMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
timeSyncTask := (*input).(TimeSyncTask)
timeSyncMsg := &timeSyncTask.TimeSyncMsg
mb, err := proto.Marshal(timeSyncMsg)
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (tm *TimeSyncMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
timeSyncMsg := internalPb.TimeSyncMsg{}
err := proto.Unmarshal(input, &timeSyncMsg)
timeSyncTask := TimeSyncTask{timeSyncMsg}
if err != nil {
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
}
var tsMsg TsMsg = timeSyncTask
return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
///////////////////////////////////////Key2Seg///////////////////////////////////////////////
//
//type Key2SegMarshaler struct{}
//
//func (km *Key2SegMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
// key2SegTask := (*input).(Key2SegTask)
// key2SegMsg := &key2SegTask.Key2SegMsg
// mb, err := proto.Marshal(key2SegMsg)
// if err != nil{
// return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
// }
// return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
//}
//
//func (km *Key2SegMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
// key2SegMsg := internalPb.Key2SegMsg{}
// err := proto.Unmarshal(input, &key2SegMsg)
// key2SegTask := Key2SegTask{key2SegMsg}
// if err != nil{
// return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
// }
// var tsMsg TsMsg = key2SegTask
// return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
//}

View File

@ -0,0 +1,230 @@
package msgstream
import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/msgclient"
commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"log"
"sync"
)
const PulsarChannelLength = 100
type TimeStamp uint64
type MsgPack struct {
BeginTs TimeStamp
EndTs TimeStamp
Msgs []*TsMsg
}
type HashFunc func(*MsgPack) map[uint32]*MsgPack
type MsgStream interface {
SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
Produce(*MsgPack) commonPb.Status
Consume() *MsgPack // message can be consumed exactly once
}
type PulsarMsgStream struct {
client *pulsar.Client
producers []*pulsar.Producer
consumers []*pulsar.Consumer
msgHashFunc HashFunc // return a map from produceChannel idx to *MsgPack
msgMarshaler *TsMsgMarshaler
msgUnmarshaler *TsMsgMarshaler
inputChannel chan *MsgPack
outputChannel chan *MsgPack
}
func (ms *PulsarMsgStream) SetPulsarCient(address string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
if err != nil {
log.Printf("connect pulsar failed, %v", err)
}
ms.client = &client
}
func (ms *PulsarMsgStream) SetProducers(channels []string) {
for i := 0; i < len(channels); i++ {
pp, err := (*ms.client).CreateProducer(pulsar.ProducerOptions{Topic: channels[i]})
if err != nil {
log.Printf("failed to create reader producer %s, error = %v", channels[i], err)
}
ms.producers = append(ms.producers, &pp)
}
}
func (ms *PulsarMsgStream) SetConsumers(channels []string, subName string) {
for i := 0; i < len(channels); i++ {
receiveChannel := make(chan pulsar.ConsumerMessage, PulsarChannelLength)
pc, err := (*ms.client).Subscribe(pulsar.ConsumerOptions{
Topic: channels[i],
SubscriptionName: subName,
Type: pulsar.KeyShared,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
MessageChannel: receiveChannel,
})
if err != nil {
log.Printf("failed to subscribe topic, error = %v", err)
}
ms.consumers = append(ms.consumers, &pc)
}
}
func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) {
ms.msgMarshaler = marshal
ms.msgUnmarshaler = unmarshal
}
func (ms *PulsarMsgStream) SetHashFunc(hashFunc HashFunc) {
ms.msgHashFunc = func(pack *MsgPack) map[uint32]*MsgPack {
hashResult := hashFunc(pack)
bucketResult := make(map[uint32]*MsgPack)
for k, v := range hashResult {
channelIndex := k % uint32(len(ms.producers))
_, ok := bucketResult[channelIndex]
if ok == false {
msgPack := MsgPack{}
bucketResult[channelIndex] = &msgPack
}
for _, msg := range v.Msgs {
bucketResult[channelIndex].Msgs = append(bucketResult[channelIndex].Msgs, msg)
}
}
return bucketResult
}
}
func (ms *PulsarMsgStream) Produce(msg *MsgPack) commonPb.Status {
result := ms.msgHashFunc(msg)
for k, v := range result {
for i := 0; i < len(v.Msgs); i++ {
mb, status := (*ms.msgMarshaler).Marshal(v.Msgs[i])
if status.ErrorCode != commonPb.ErrorCode_SUCCESS {
log.Printf("Marshal ManipulationReqMsg failed, error ")
continue
}
if _, err := (*ms.producers[k]).Send(
context.Background(),
&pulsar.ProducerMessage{Payload: mb},
); err != nil {
log.Printf("post into pulsar filed, error = %v", err)
}
}
}
return commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
}
func (ms *PulsarMsgStream) Consume() *MsgPack {
tsMsgList := make([]*TsMsg, 0)
for i := 0; i < len(ms.consumers); i++ {
pulsarMsg, ok := <-(*ms.consumers[i]).Chan()
if ok == false {
log.Fatal("consumer closed!")
continue
}
(*ms.consumers[i]).AckID(pulsarMsg.ID())
tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload())
if status.ErrorCode != commonPb.ErrorCode_SUCCESS {
log.Printf("Marshal ManipulationReqMsg failed, error ")
}
tsMsgList = append(tsMsgList, tsMsg)
}
msgPack := MsgPack{Msgs: tsMsgList}
return &msgPack
}
type PulsarTtMsgStream struct {
PulsarMsgStream
inputBuf []*TsMsg
unsolvedBuf []*TsMsg
msgPacks []*MsgPack
lastTimeStamp TimeStamp
}
func (ms *PulsarTtMsgStream) Consume() *MsgPack { //return messages in one time tick
wg := sync.WaitGroup{}
wg.Add(len(ms.consumers))
eofMsgTimeStamp := make(map[int]TimeStamp)
mu := sync.Mutex{}
for i := 0; i < len(ms.consumers); i++ {
go ms.findTimeTick(context.Background(), i, eofMsgTimeStamp, &wg, &mu)
}
wg.Wait()
timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp)
if ok == false {
log.Fatal("timeTick err")
}
timeTickBuf := make([]*TsMsg, 0)
for _, v := range ms.unsolvedBuf {
ms.inputBuf = append(ms.inputBuf, v)
}
ms.unsolvedBuf = ms.unsolvedBuf[:0]
for _, v := range ms.inputBuf {
if (*v).Ts() >= timeStamp {
timeTickBuf = append(timeTickBuf, v)
} else {
ms.unsolvedBuf = append(ms.unsolvedBuf, v)
}
}
ms.inputBuf = ms.inputBuf[:0]
msgPack := MsgPack{
BeginTs: ms.lastTimeStamp,
EndTs: timeStamp,
Msgs: timeTickBuf,
}
return &msgPack
}
func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context,
channelIndex int,
eofMsgMap map[int]TimeStamp,
wg *sync.WaitGroup,
mu *sync.Mutex) {
for {
select {
case <-ctx.Done():
return
case pulsarMsg, ok := <-(*ms.consumers[channelIndex]).Chan():
if ok == false {
log.Fatal("consumer closed!")
continue
}
(*ms.consumers[channelIndex]).Ack(pulsarMsg)
tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload())
// TODO:: Find the EOF
if (*tsMsg).Type() == msgclient.kTimeTick {
eofMsgMap[channelIndex] = (*tsMsg).Ts()
break
}
if status.ErrorCode != commonPb.ErrorCode_SUCCESS {
log.Printf("Marshal ManipulationReqMsg failed, error ")
}
mu.Lock()
ms.inputBuf = append(ms.inputBuf, tsMsg)
mu.Unlock()
}
}
wg.Done()
}
func checkTimeTickMsg(msg map[int]TimeStamp) (TimeStamp, bool) {
checkMap := make(map[TimeStamp]int)
for _, v := range msg {
checkMap[v] += 1
}
if len(checkMap) <= 1 {
for k, _ := range checkMap {
return k, true
}
}
return 0, false
}

View File

@ -0,0 +1,215 @@
package msgstream
import (
"fmt"
commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"log"
"testing"
)
func produceHashTopic(input *MsgPack) map[uint32]*MsgPack {
msgs := input.Msgs
result := make(map[uint32]*MsgPack)
count := len(msgs)
for i := 0; i < count; i++ {
var key uint32
var err error
switch (*msgs[i]).Type() {
case kInsert:
var insertMsg InsertTask = (*msgs[i]).(InsertTask)
key, err = typeutil.Hash32Int64(insertMsg.ReqId)
case kDelete:
var deleteMsg DeleteTask = (*msgs[i]).(DeleteTask)
key, err = typeutil.Hash32Int64(deleteMsg.ReqId)
case kSearch:
var searchMsg SearchTask = (*msgs[i]).(SearchTask)
key, err = typeutil.Hash32Int64(searchMsg.ReqId)
case kSearchResult:
var searchResultMsg SearchResultTask = (*msgs[i]).(SearchResultTask)
key, err = typeutil.Hash32Int64(searchResultMsg.ReqId)
case kTimeSync:
var timeSyncMsg TimeSyncTask = (*msgs[i]).(TimeSyncTask)
key, err = typeutil.Hash32Int64(timeSyncMsg.PeerId)
default:
log.Fatal("con't find msgType")
}
if err != nil {
log.Fatal(err)
}
_, ok := result[key]
if ok == false {
msgPack := MsgPack{}
result[key] = &msgPack
}
result[key].Msgs = append(result[key].Msgs, msgs[i])
}
return result
}
func baseTest(pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string,
msgPack *MsgPack,
inputMsgType MsgType,
outputMsgType MsgType) {
// set input stream
inputStream := PulsarMsgStream{}
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetMsgMarshaler(GetMarshaler(inputMsgType), nil)
inputStream.SetProducers(producerChannels)
inputStream.SetHashFunc(produceHashTopic)
// set output stream
outputStream := PulsarMsgStream{}
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetMsgMarshaler(nil, GetMarshaler(outputMsgType))
outputStream.SetConsumers(consumerChannels, consumerSubName)
//send msgPack
inputStream.Produce(msgPack)
// receive msg
for {
result := outputStream.Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
break
}
}
}
func TestStream_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert"}
consumerChannels := []string{"insert"}
consumerSubName := "subInsert"
//pack tsmsg
insertRequest := internalPb.InsertRequest{
ReqType: internalPb.ReqType_kInsert,
ReqId: 1,
CollectionName: "Collection",
PartitionTag: "Partition",
SegmentId: 1,
ChannelId: 1,
ProxyId: 1,
Timestamp: 1,
}
insertMsg := InsertTask{
insertRequest,
}
var tsMsg TsMsg = insertMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
//run stream
baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kInsert, kInsert)
}
func TestStream_Delete(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"delete"}
consumerChannels := []string{"delete"}
consumerSubName := "subDelete"
//pack tsmsg
deleteRequest := internalPb.DeleteRequest{
ReqType: internalPb.ReqType_kInsert,
ReqId: 1,
CollectionName: "Collection",
ChannelId: 1,
ProxyId: 1,
Timestamp: 1,
PrimaryKeys: []int64{1},
}
deleteMsg := DeleteTask{
deleteRequest,
}
var tsMsg TsMsg = deleteMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
//run stream
baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kDelete, kDelete)
}
func TestStream_Search(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
consumerSubName := "subSearch"
//pack tsmsg
searchRequest := internalPb.SearchRequest{
ReqType: internalPb.ReqType_kSearch,
ReqId: 1,
ProxyId: 1,
Timestamp: 1,
ResultChannelId: 1,
}
searchMsg := SearchTask{
searchRequest,
}
var tsMsg TsMsg = searchMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
//run stream
baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kSearch, kSearch)
}
func TestStream_SearchResult(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
consumerSubName := "subSearch"
//pack tsmsg
searchResult := internalPb.SearchResult{
Status: &commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS},
ReqId: 1,
ProxyId: 1,
QueryNodeId: 1,
Timestamp: 1,
ResultChannelId: 1,
}
searchResultMsg := SearchResultTask{
searchResult,
}
var tsMsg TsMsg = searchResultMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
//run stream
baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kSearchResult, kSearchResult)
}
func TestStream_TimeSync(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
consumerSubName := "subSearch"
//pack tsmsg
timeSyncResult := internalPb.TimeSyncMsg{
PeerId: 1,
Timestamp: 1,
}
timeSyncMsg := TimeSyncTask{
timeSyncResult,
}
var tsMsg TsMsg = timeSyncMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
//run stream
baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kTimeSync, kTimeSync)
}

133
internal/msgstream/task.go Normal file
View File

@ -0,0 +1,133 @@
package msgstream
import (
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type MsgType uint32
const (
kInsert MsgType = 1
kDelete MsgType = 2
kSearch MsgType = 3
kSearchResult MsgType = 4
kTimeTick MsgType = 5
kSegmentStatics MsgType = 6
kTimeSync MsgType = 7
)
type TsMsg interface {
SetTs(ts TimeStamp)
Ts() TimeStamp
Type() MsgType
}
/////////////////////////////////////////Insert//////////////////////////////////////////
type InsertTask struct {
internalPb.InsertRequest
}
func (it InsertTask) SetTs(ts TimeStamp) {
it.Timestamp = uint64(ts)
}
func (it InsertTask) Ts() TimeStamp {
return TimeStamp(it.Timestamp)
}
func (it InsertTask) Type() MsgType {
if it.ReqType == internalPb.ReqType_kNone {
return kTimeTick
}
return kInsert
}
/////////////////////////////////////////Delete//////////////////////////////////////////
type DeleteTask struct {
internalPb.DeleteRequest
}
func (dt DeleteTask) SetTs(ts TimeStamp) {
dt.Timestamp = uint64(ts)
}
func (dt DeleteTask) Ts() TimeStamp {
return TimeStamp(dt.Timestamp)
}
func (dt DeleteTask) Type() MsgType {
if dt.ReqType == internalPb.ReqType_kNone {
return kTimeTick
}
return kDelete
}
/////////////////////////////////////////Search//////////////////////////////////////////
type SearchTask struct {
internalPb.SearchRequest
}
func (st SearchTask) SetTs(ts TimeStamp) {
st.Timestamp = uint64(ts)
}
func (st SearchTask) Ts() TimeStamp {
return TimeStamp(st.Timestamp)
}
func (st SearchTask) Type() MsgType {
if st.ReqType == internalPb.ReqType_kNone {
return kTimeTick
}
return kSearch
}
/////////////////////////////////////////SearchResult//////////////////////////////////////////
type SearchResultTask struct {
internalPb.SearchResult
}
func (srt SearchResultTask) SetTs(ts TimeStamp) {
srt.Timestamp = uint64(ts)
}
func (srt SearchResultTask) Ts() TimeStamp {
return TimeStamp(srt.Timestamp)
}
func (srt SearchResultTask) Type() MsgType {
return kSearchResult
}
/////////////////////////////////////////TimeSync//////////////////////////////////////////
type TimeSyncTask struct {
internalPb.TimeSyncMsg
}
func (tst TimeSyncTask) SetTs(ts TimeStamp) {
tst.Timestamp = uint64(ts)
}
func (tst TimeSyncTask) Ts() TimeStamp {
return TimeStamp(tst.Timestamp)
}
func (tst TimeSyncTask) Type() MsgType {
return kTimeSync
}
///////////////////////////////////////////Key2Seg//////////////////////////////////////////
//type Key2SegTask struct {
// internalPb.Key2SegMsg
//}
//
////TODO::Key2SegMsg don't have timestamp
//func (k2st Key2SegTask) SetTs(ts TimeStamp) {}
//
//func (k2st Key2SegTask) Ts() TimeStamp {
// return TimeStamp(0)
//}
//
//func (k2st Key2SegTask) Type() MsgType {
// return
//}

View File

@ -24,9 +24,13 @@ enum ReqType {
/* Manipulation Requests */
kInsert = 400;
kDelete = 401;
/* Query */
kSearch = 500;
/* System Control */
kTimeTick = 1200
}
enum PeerRole {

View File

@ -164,7 +164,7 @@ func TestProxyNode(t *testing.T) {
Id: 100,
Schema: nil,
CreateTime: 0,
SegmentIds: []uint64{101, 102},
SegmentIds: []int64{101, 102},
PartitionTags: nil,
}
sm101 := etcdpb.SegmentMeta{
@ -307,7 +307,7 @@ func TestProxyNode(t *testing.T) {
assert.Equal(t, insertR.EntityIdArray[i], int64(i+10))
}
var insertPrimaryKey []uint64
var insertPrimaryKey []int64
readerM1, ok := <-reader.Chan()
assert.True(t, ok)

View File

@ -18,6 +18,7 @@ import (
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
etcd "go.etcd.io/etcd/clientv3"
"go.uber.org/atomic"
"google.golang.org/grpc"
@ -195,7 +196,7 @@ func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*ser
for i := 0; i < len(req.HashValues); i++ {
key := int64(req.HashValues[i])
hash, err := Hash32_Int64(key)
hash, err := typeutil.Hash32Int64(key)
if err != nil {
return nil, status.Errorf(codes.Unknown, "hash failed on %d", key)
}

View File

@ -125,7 +125,7 @@ func TestProxyServer_WatchEtcd(t *testing.T) {
col1 := etcdpb.CollectionMeta{
Id: 1,
SegmentIds: []uint64{2, 3},
SegmentIds: []int64{2, 3},
}
seg2 := etcdpb.SegmentMeta{
SegmentId: 2,
@ -174,7 +174,7 @@ func TestProxyServer_WatchEtcd(t *testing.T) {
col4 := etcdpb.CollectionMeta{
Id: 4,
SegmentIds: []uint64{5},
SegmentIds: []int64{5},
}
seg5 := etcdpb.SegmentMeta{
SegmentId: 5,
@ -206,7 +206,7 @@ func TestProxyServer_InsertAndDelete(t *testing.T) {
Id: 10,
Schema: nil,
CreateTime: 0,
SegmentIds: []uint64{11, 12},
SegmentIds: []int64{11, 12},
PartitionTags: nil,
}
seg11 := etcdpb.SegmentMeta{
@ -323,7 +323,7 @@ func TestProxyServer_InsertAndDelete(t *testing.T) {
assert.Nil(t, err)
assert.Equalf(t, dr.ErrorCode, pb.ErrorCode_SUCCESS, "delete failed, error code = %d, reason = %s", dr.ErrorCode, dr.Reason)
var primaryKey []uint64
var primaryKey []int64
isbreak = false
for {
if isbreak {

View File

@ -1,4 +1,4 @@
package proxy
package typeutil
import (
"encoding/binary"
@ -6,7 +6,7 @@ import (
"unsafe"
)
func Hash32_Bytes(b []byte) (uint32, error) {
func Hash32Bytes(b []byte) (uint32, error) {
h := murmur3.New32()
if _, err := h.Write(b); err != nil {
return 0, err
@ -14,13 +14,13 @@ func Hash32_Bytes(b []byte) (uint32, error) {
return h.Sum32() & 0x7fffffff, nil
}
func Hash32_Uint64(v uint64) (uint32, error) {
func Hash32Uint64(v uint64) (uint32, error) {
b := make([]byte, unsafe.Sizeof(v))
binary.LittleEndian.PutUint64(b, v)
return Hash32_Bytes(b)
return Hash32Bytes(b)
}
func Hash32_Int64(v int64) (uint32, error) {
return Hash32_Uint64(uint64(v))
func Hash32Int64(v int64) (uint32, error) {
return Hash32Uint64(uint64(v))
}

View File

@ -1,5 +1,4 @@
package proxy
package typeutil
import (
"github.com/stretchr/testify/assert"
"testing"
@ -15,16 +14,17 @@ func TestUint64(t *testing.T) {
func TestHash32_Uint64(t *testing.T) {
var u uint64 = 0x12
h, err := Hash32_Uint64(u)
h, err := Hash32Uint64(u)
assert.Nil(t, err)
t.Log(h)
b := make([]byte, unsafe.Sizeof(u))
b[0] = 0x12
h2, err := Hash32_Bytes(b)
h2, err := Hash32Bytes(b)
assert.Nil(t, err)
t.Log(h2)
assert.Equal(t, h, h2)
}