2020-11-26 16:01:31 +08:00
|
|
|
package querynode
|
2020-11-09 16:27:11 +08:00
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"sync"
|
2020-11-12 12:04:12 +08:00
|
|
|
|
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
2020-11-09 16:27:11 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
type insertNode struct {
|
|
|
|
BaseNode
|
2020-12-07 15:22:20 +08:00
|
|
|
replica *collectionReplica
|
2020-11-09 16:27:11 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
type InsertData struct {
|
|
|
|
insertIDs map[UniqueID][]UniqueID
|
|
|
|
insertTimestamps map[UniqueID][]Timestamp
|
|
|
|
insertRecords map[UniqueID][]*commonpb.Blob
|
|
|
|
insertOffset map[UniqueID]int64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iNode *insertNode) Name() string {
|
|
|
|
return "iNode"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iNode *insertNode) Operate(in []*Msg) []*Msg {
|
|
|
|
// fmt.Println("Do insertNode operation")
|
|
|
|
|
|
|
|
if len(in) != 1 {
|
2020-11-12 12:04:12 +08:00
|
|
|
log.Println("Invalid operate message input in insertNode, input length = ", len(in))
|
2020-11-09 16:27:11 +08:00
|
|
|
// TODO: add error handling
|
|
|
|
}
|
|
|
|
|
|
|
|
iMsg, ok := (*in[0]).(*insertMsg)
|
|
|
|
if !ok {
|
|
|
|
log.Println("type assertion failed for insertMsg")
|
|
|
|
// TODO: add error handling
|
|
|
|
}
|
|
|
|
|
|
|
|
insertData := InsertData{
|
|
|
|
insertIDs: make(map[int64][]int64),
|
|
|
|
insertTimestamps: make(map[int64][]uint64),
|
|
|
|
insertRecords: make(map[int64][]*commonpb.Blob),
|
|
|
|
insertOffset: make(map[int64]int64),
|
|
|
|
}
|
|
|
|
|
|
|
|
// 1. hash insertMessages to insertData
|
|
|
|
for _, task := range iMsg.insertMessages {
|
2020-11-13 15:17:18 +08:00
|
|
|
if len(task.RowIDs) != len(task.Timestamps) || len(task.RowIDs) != len(task.RowData) {
|
2020-11-09 16:27:11 +08:00
|
|
|
// TODO: what if the messages are misaligned?
|
|
|
|
// Here, we ignore those messages and print error
|
|
|
|
log.Println("Error, misaligned messages detected")
|
|
|
|
continue
|
|
|
|
}
|
2020-11-13 15:17:18 +08:00
|
|
|
insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
|
|
|
|
insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
|
|
|
|
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
|
2020-11-16 17:01:10 +08:00
|
|
|
|
|
|
|
// check if segment exists, if not, create this segment
|
2020-12-07 15:22:20 +08:00
|
|
|
if !(*iNode.replica).hasSegment(task.SegmentID) {
|
|
|
|
collection, err := (*iNode.replica).getCollectionByName(task.CollectionName)
|
2020-11-16 17:01:10 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
continue
|
|
|
|
}
|
2020-12-07 15:22:20 +08:00
|
|
|
err = (*iNode.replica).addSegment(task.SegmentID, task.PartitionTag, collection.ID())
|
2020-11-16 17:01:10 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
2020-11-09 16:27:11 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// 2. do preInsert
|
|
|
|
for segmentID := range insertData.insertRecords {
|
2020-12-07 15:22:20 +08:00
|
|
|
var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID)
|
2020-11-09 16:27:11 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Println("preInsert failed")
|
|
|
|
// TODO: add error handling
|
|
|
|
}
|
|
|
|
|
|
|
|
var numOfRecords = len(insertData.insertRecords[segmentID])
|
|
|
|
if targetSegment != nil {
|
|
|
|
var offset = targetSegment.segmentPreInsert(numOfRecords)
|
|
|
|
insertData.insertOffset[segmentID] = offset
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// 3. do insert
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
for segmentID := range insertData.insertRecords {
|
|
|
|
wg.Add(1)
|
|
|
|
go iNode.insert(&insertData, segmentID, &wg)
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
var res Msg = &serviceTimeMsg{
|
|
|
|
timeRange: iMsg.timeRange,
|
|
|
|
}
|
|
|
|
return []*Msg{&res}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
|
2020-12-07 15:22:20 +08:00
|
|
|
var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID)
|
2020-11-09 16:27:11 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Println("cannot find segment:", segmentID)
|
|
|
|
// TODO: add error handling
|
2020-11-26 16:01:31 +08:00
|
|
|
wg.Done()
|
2020-11-09 16:27:11 +08:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
ids := insertData.insertIDs[segmentID]
|
|
|
|
timestamps := insertData.insertTimestamps[segmentID]
|
|
|
|
records := insertData.insertRecords[segmentID]
|
|
|
|
offsets := insertData.insertOffset[segmentID]
|
|
|
|
|
|
|
|
err = targetSegment.segmentInsert(offsets, &ids, ×tamps, &records)
|
|
|
|
if err != nil {
|
2020-11-26 16:01:31 +08:00
|
|
|
log.Println(err)
|
2020-11-09 16:27:11 +08:00
|
|
|
// TODO: add error handling
|
2020-11-26 16:01:31 +08:00
|
|
|
wg.Done()
|
2020-11-09 16:27:11 +08:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
fmt.Println("Do insert done, len = ", len(insertData.insertIDs[segmentID]))
|
|
|
|
wg.Done()
|
|
|
|
}
|
|
|
|
|
2020-12-07 15:22:20 +08:00
|
|
|
func newInsertNode(replica *collectionReplica) *insertNode {
|
2020-11-19 10:46:17 +08:00
|
|
|
maxQueueLength := Params.flowGraphMaxQueueLength()
|
|
|
|
maxParallelism := Params.flowGraphMaxParallelism()
|
2020-11-18 17:32:52 +08:00
|
|
|
|
2020-11-09 16:27:11 +08:00
|
|
|
baseNode := BaseNode{}
|
|
|
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
|
|
|
baseNode.SetMaxParallelism(maxParallelism)
|
|
|
|
|
|
|
|
return &insertNode{
|
2020-11-17 20:00:23 +08:00
|
|
|
BaseNode: baseNode,
|
|
|
|
replica: replica,
|
2020-11-09 16:27:11 +08:00
|
|
|
}
|
|
|
|
}
|