Add reader flowgraph about nodes and messages

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2020-11-02 19:30:12 +08:00 committed by yefu.chen
parent f75dc8cf82
commit f49c98ed85
17 changed files with 493 additions and 38 deletions

View File

@ -0,0 +1,24 @@
package reader
type deleteNode struct {
BaseNode
deleteMsg deleteMsg
}
func (dNode *deleteNode) Name() string {
return "dNode"
}
func (dNode *deleteNode) Operate(in []*Msg) []*Msg {
return in
}
func newDeleteNode() *deleteNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &deleteNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,24 @@
package reader
type deletePreprocessNode struct {
BaseNode
deletePreprocessMsg deletePreprocessMsg
}
func (dpNode *deletePreprocessNode) Name() string {
return "dpNode"
}
func (dpNode *deletePreprocessNode) Operate(in []*Msg) []*Msg {
return in
}
func newDeletePreprocessNode() *deletePreprocessNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &deletePreprocessNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,24 @@
package reader
type dmNode struct {
BaseNode
dmMsg dmMsg
}
func (dmNode *dmNode) Name() string {
return "dmNode"
}
func (dmNode *dmNode) Operate(in []*Msg) []*Msg {
return in
}
func newDmNode() *dmNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &dmNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,24 @@
package reader
type filteredDmNode struct {
BaseNode
filteredDmMsg filteredDmMsg
}
func (fdmNode *filteredDmNode) Name() string {
return "dmNode"
}
func (fdmNode *filteredDmNode) Operate(in []*Msg) []*Msg {
return in
}
func newFilteredDmNode() *filteredDmNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &filteredDmNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,24 @@
package reader
type insertNode struct {
BaseNode
insertMsg insertMsg
}
func (iNode *insertNode) Name() string {
return "iNode"
}
func (iNode *insertNode) Operate(in []*Msg) []*Msg {
return in
}
func newInsertNode() *insertNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &insertNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,24 @@
package reader
type key2SegNode struct {
BaseNode
key2SegMsg key2SegMsg
}
func (ksNode *key2SegNode) Name() string {
return "ksNode"
}
func (ksNode *key2SegNode) Operate(in []*Msg) []*Msg {
return in
}
func newKey2SegNode() *key2SegNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &key2SegNode{
BaseNode: baseNode,
}
}

View File

@ -1,12 +1,92 @@
package reader
import (
"context"
"fmt"
msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
"log"
"sync"
)
type manipulationService struct {
ctx context.Context
fg *flowgraph.TimeTickedFlowGraph
}
func (dmService *manipulationService) initNodes() {
dmService.fg = flowgraph.NewTimeTickedFlowGraph(dmService.ctx)
var msgStreamNode Node = newMsgStreamNode()
var dmNode Node = newDmNode()
// var key2SegNode Node = newKey2SegNode()
var schemaUpdateNode Node = newSchemaUpdateNode()
var filteredDmNode Node = newFilteredDmNode()
var insertNode Node = newInsertNode()
// var deletePreprocessNode Node = newDeletePreprocessNode()
// var deleteNode Node = newDeleteNode()
var serviceTimeNode Node = newServiceTimeNode()
dmService.fg.AddNode(&msgStreamNode)
dmService.fg.AddNode(&dmNode)
// fg.AddNode(&key2SegNode)
dmService.fg.AddNode(&schemaUpdateNode)
dmService.fg.AddNode(&filteredDmNode)
dmService.fg.AddNode(&insertNode)
// fg.AddNode(&deletePreprocessNode)
// fg.AddNode(&deleteNode)
dmService.fg.AddNode(&serviceTimeNode)
// TODO: add delete pipeline support
var err = dmService.fg.SetEdges(dmNode.Name(),
[]string{},
[]string{filteredDmNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", dmNode.Name())
}
err = dmService.fg.SetEdges(schemaUpdateNode.Name(),
[]string{},
[]string{filteredDmNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", schemaUpdateNode.Name())
}
err = dmService.fg.SetEdges(filteredDmNode.Name(),
[]string{dmNode.Name(), schemaUpdateNode.Name()},
[]string{insertNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", filteredDmNode.Name())
}
err = dmService.fg.SetEdges(insertNode.Name(),
[]string{filteredDmNode.Name()},
[]string{serviceTimeNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", insertNode.Name())
}
err = dmService.fg.SetEdges(serviceTimeNode.Name(),
[]string{insertNode.Name()},
[]string{},
)
if err != nil {
log.Fatal("set edges failed in node:", serviceTimeNode.Name())
}
// TODO: add top nodes's initialization
}
func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOrDeleteMsg, timeRange TimeRange) msgPb.Status {
var tMax = timeRange.timestampMax

147
internal/reader/message.go Normal file
View File

@ -0,0 +1,147 @@
package reader
import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
type Msg = flowgraph.Msg
type msgStreamMsg struct {
tsMessages []*msgstream.TsMsg
timeRange TimeRange
}
type dmMsg struct {
tsMessages []*msgstream.TsMsg
timeRange TimeRange
}
type key2SegMsg struct {
tsMessages []*msgstream.TsMsg
timeRange TimeRange
}
type schemaUpdateMsg struct {
timeRange TimeRange
}
type filteredDmMsg struct {
tsMessages []*msgstream.TsMsg
timeRange TimeRange
}
type insertMsg struct {
insertData InsertData
timeRange TimeRange
}
type deletePreprocessMsg struct {
deletePreprocessData DeletePreprocessData
timeRange TimeRange
}
type deleteMsg struct {
deleteData DeleteData
timeRange TimeRange
}
type serviceTimeMsg struct {
timeRange TimeRange
}
type InsertData struct {
insertIDs map[int64][]int64
insertTimestamps map[int64][]uint64
insertRecords map[int64][][]byte
insertOffset map[int64]int64
}
type DeleteData struct {
deleteIDs map[int64][]int64
deleteTimestamps map[int64][]uint64
deleteOffset map[int64]int64
}
type DeleteRecord struct {
entityID int64
timestamp uint64
segmentID int64
}
type DeletePreprocessData struct {
deleteRecords []*DeleteRecord
count int32
}
func (msMsg *msgStreamMsg) TimeTick() Timestamp {
return msMsg.timeRange.timestampMax
}
func (msMsg *msgStreamMsg) DownStreamNodeIdx() int {
return 0
}
func (dmMsg *dmMsg) TimeTick() Timestamp {
return dmMsg.timeRange.timestampMax
}
func (dmMsg *dmMsg) DownStreamNodeIdx() int {
return 0
}
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
}
func (ksMsg *key2SegMsg) DownStreamNodeIdx() int {
return 0
}
func (suMsg *schemaUpdateMsg) TimeTick() Timestamp {
return suMsg.timeRange.timestampMax
}
func (suMsg *schemaUpdateMsg) DownStreamNodeIdx() int {
return 0
}
func (fdmMsg *filteredDmMsg) TimeTick() Timestamp {
return fdmMsg.timeRange.timestampMax
}
func (fdmMsg *filteredDmMsg) DownStreamNodeIdx() int {
return 0
}
func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
}
func (iMsg *insertMsg) DownStreamNodeIdx() int {
return 0
}
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
}
func (dMsg *deleteMsg) DownStreamNodeIdx() int {
return 0
}
func (dpMsg *deletePreprocessMsg) TimeTick() Timestamp {
return dpMsg.timeRange.timestampMax
}
func (dpMsg *deletePreprocessMsg) DownStreamNodeIdx() int {
return 0
}
func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
return stMsg.timeRange.timestampMax
}
func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int {
return 0
}

View File

@ -0,0 +1,24 @@
package reader
type msgStreamNode struct {
BaseNode
msgStreamMsg msgStreamMsg
}
func (msNode *msgStreamNode) Name() string {
return "msNode"
}
func (msNode *msgStreamNode) Operate(in []*Msg) []*Msg {
return in
}
func newMsgStreamNode() *msgStreamNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &msgStreamNode{
BaseNode: baseNode,
}
}

9
internal/reader/node.go Normal file
View File

@ -0,0 +1,9 @@
package reader
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
const maxQueueLength = 1024
const maxParallelism = 1024
type BaseNode = flowgraph.BaseNode
type Node = flowgraph.Node

View File

@ -15,6 +15,7 @@ import "C"
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"time"
"github.com/zilliztech/milvus-distributed/internal/kv"
@ -22,29 +23,31 @@ import (
msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message"
)
type InsertData struct {
insertIDs map[int64][]int64
insertTimestamps map[int64][]uint64
insertRecords map[int64][][]byte
insertOffset map[int64]int64
}
type Timestamp = typeutil.Timestamp
type DeleteData struct {
deleteIDs map[int64][]int64
deleteTimestamps map[int64][]uint64
deleteOffset map[int64]int64
}
type DeleteRecord struct {
entityID int64
timestamp uint64
segmentID int64
}
type DeletePreprocessData struct {
deleteRecords []*DeleteRecord
count int32
}
//type InsertData struct {
// insertIDs map[int64][]int64
// insertTimestamps map[int64][]uint64
// insertRecords map[int64][][]byte
// insertOffset map[int64]int64
//}
//
//type DeleteData struct {
// deleteIDs map[int64][]int64
// deleteTimestamps map[int64][]uint64
// deleteOffset map[int64]int64
//}
//
//type DeleteRecord struct {
// entityID int64
// timestamp uint64
// segmentID int64
//}
//
//type DeletePreprocessData struct {
// deleteRecords []*DeleteRecord
// count int32
//}
type QueryNodeDataBuffer struct {
InsertDeleteBuffer []*msgPb.InsertOrDeleteMsg

View File

@ -0,0 +1,24 @@
package reader
type schemaUpdateNode struct {
BaseNode
schemaUpdateMsg schemaUpdateMsg
}
func (suNode *schemaUpdateNode) Name() string {
return "suNode"
}
func (suNode *schemaUpdateNode) Operate(in []*Msg) []*Msg {
return in
}
func newSchemaUpdateNode() *schemaUpdateNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &schemaUpdateNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,24 @@
package reader
type serviceTimeNode struct {
BaseNode
serviceTimeMsg serviceTimeMsg
}
func (stNode *serviceTimeNode) Name() string {
return "iNode"
}
func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
return in
}
func newServiceTimeNode() *serviceTimeNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &serviceTimeNode{
BaseNode: baseNode,
}
}

View File

@ -14,22 +14,22 @@ import (
const ctxTimeInMillisecond = 3000
type nodeA struct {
baseNode
BaseNode
a float64
}
type nodeB struct {
baseNode
BaseNode
b float64
}
type nodeC struct {
baseNode
BaseNode
c float64
}
type nodeD struct {
baseNode
BaseNode
d float64
resChan chan float64
}
@ -43,7 +43,7 @@ func (m *intMsg) TimeTick() Timestamp {
return m.t
}
func (m *intMsg) DownStreamNodeIdx() int32 {
func (m *intMsg) DownStreamNodeIdx() int {
return 1
}
@ -178,22 +178,22 @@ func TestTimeTickedFlowGraph_Start(t *testing.T) {
fg := NewTimeTickedFlowGraph(ctx)
var a Node = &nodeA{
baseNode: baseNode{
BaseNode: BaseNode{
maxQueueLength: maxQueueLength,
},
}
var b Node = &nodeB{
baseNode: baseNode{
BaseNode: BaseNode{
maxQueueLength: maxQueueLength,
},
}
var c Node = &nodeC{
baseNode: baseNode{
BaseNode: BaseNode{
maxQueueLength: maxQueueLength,
},
}
var d Node = &nodeD{
baseNode: baseNode{
BaseNode: BaseNode{
maxQueueLength: maxQueueLength,
},
resChan: make(chan float64),

View File

@ -2,5 +2,5 @@ package flowgraph
type Msg interface {
TimeTick() Timestamp
DownStreamNodeIdx() int32
DownStreamNodeIdx() int
}

View File

@ -16,7 +16,7 @@ type Node interface {
Operate(in []*Msg) []*Msg
}
type baseNode struct {
type BaseNode struct {
maxQueueLength int32
maxParallelism int32
graphStates *flowGraphStates
@ -97,22 +97,22 @@ func (nodeCtx *nodeCtx) getMessagesFromChannel() {
}
}
func (node *baseNode) MaxQueueLength() int32 {
func (node *BaseNode) MaxQueueLength() int32 {
return node.maxQueueLength
}
func (node *baseNode) MaxParallelism() int32 {
func (node *BaseNode) MaxParallelism() int32 {
return node.maxParallelism
}
func (node *baseNode) SetMaxQueueLength(n int32) {
func (node *BaseNode) SetMaxQueueLength(n int32) {
node.maxQueueLength = n
}
func (node *baseNode) SetMaxParallelism(n int32) {
func (node *BaseNode) SetMaxParallelism(n int32) {
node.maxParallelism = n
}
func (node *baseNode) SetPipelineStates(states *flowGraphStates) {
func (node *BaseNode) SetPipelineStates(states *flowGraphStates) {
node.graphStates = states
}