milvus/internal/datanode/flow_graph_insert_buffer_node.go
XuanYang-cn 6a13386393 Add DataNode package
Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
2021-01-19 11:37:16 +08:00

661 lines
19 KiB
Go

package datanode
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"log"
"path"
"strconv"
"unsafe"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/storage"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
CollectionPrefix = "/collection/"
SegmentPrefix = "/segment/"
)
type (
InsertData = storage.InsertData
Blob = storage.Blob
insertBufferNode struct {
BaseNode
insertBuffer *insertBuffer
minIOKV kv.Base
minioPrifex string
idAllocator *allocator.IDAllocator
outCh chan *insertFlushSyncMsg
pulsarDataNodeTimeTickStream *msgstream.PulsarMsgStream
replica collectionReplica
}
insertBuffer struct {
insertData map[UniqueID]*InsertData // SegmentID to InsertData
maxSize int32
}
)
func (ib *insertBuffer) size(segmentID UniqueID) int32 {
if ib.insertData == nil || len(ib.insertData) <= 0 {
return 0
}
idata, ok := ib.insertData[segmentID]
if !ok {
return 0
}
var maxSize int32 = 0
for _, data := range idata.Data {
fdata, ok := data.(*storage.FloatVectorFieldData)
if ok && int32(fdata.NumRows) > maxSize {
maxSize = int32(fdata.NumRows)
}
bdata, ok := data.(*storage.BinaryVectorFieldData)
if ok && int32(bdata.NumRows) > maxSize {
maxSize = int32(bdata.NumRows)
}
}
return maxSize
}
func (ib *insertBuffer) full(segmentID UniqueID) bool {
return ib.size(segmentID) >= ib.maxSize
}
func (ibNode *insertBufferNode) Name() string {
return "ibNode"
}
func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// log.Println("=========== insert buffer Node Operating")
if len(in) != 1 {
log.Println("Error: Invalid operate message input in insertBuffertNode, input length = ", len(in))
// TODO: add error handling
}
iMsg, ok := (*in[0]).(*insertMsg)
if !ok {
log.Println("Error: type assertion failed for insertMsg")
// TODO: add error handling
}
// iMsg is insertMsg
// 1. iMsg -> buffer
for _, msg := range iMsg.insertMessages {
ctx := msg.GetMsgContext()
var span opentracing.Span
if ctx != nil {
span, _ = opentracing.StartSpanFromContext(ctx, fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs()))
} else {
span = opentracing.StartSpan(fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs()))
}
span.SetTag("hash keys", msg.HashKeys())
span.SetTag("start time", msg.BeginTs())
span.SetTag("end time", msg.EndTs())
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
log.Println("Error: misaligned messages detected")
continue
}
currentSegID := msg.GetSegmentID()
collectionName := msg.GetCollectionName()
span.LogFields(oplog.Int("segment id", int(currentSegID)))
idata, ok := ibNode.insertBuffer.insertData[currentSegID]
if !ok {
idata = &InsertData{
Data: make(map[UniqueID]storage.FieldData),
}
}
// 1.1 Get CollectionMeta from etcd
collection, err := ibNode.replica.getCollectionByName(collectionName)
if err != nil {
// GOOSE TODO add error handler
log.Println("bbb, Get meta wrong:", err)
continue
}
collectionID := collection.ID()
collSchema := collection.schema
// 1.2 Get Fields
var pos int = 0 // Record position of blob
for _, field := range collSchema.Fields {
switch field.DataType {
case schemapb.DataType_VECTOR_FLOAT:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Println("strconv wrong")
}
break
}
}
if dim <= 0 {
log.Println("invalid dim")
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
NumRows: 0,
Data: make([]float32, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
var offset int
for _, blob := range msg.RowData {
offset = 0
for j := 0; j < dim; j++ {
var v float32
buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.read float32 err:", err)
}
fieldData.Data = append(fieldData.Data, v)
offset += int(unsafe.Sizeof(*(&v)))
}
}
pos += offset
fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_VECTOR_BINARY:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Println("strconv wrong")
}
break
}
}
if dim <= 0 {
log.Println("invalid dim")
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
NumRows: 0,
Data: make([]byte, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
var offset int
for _, blob := range msg.RowData {
bv := blob.GetValue()[pos+offset : pos+(dim/8)]
fieldData.Data = append(fieldData.Data, bv...)
offset = len(bv)
}
pos += offset
fieldData.NumRows += len(msg.RowData)
case schemapb.DataType_BOOL:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BoolFieldData{
NumRows: 0,
Data: make([]bool, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
var v bool
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read bool failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_INT8:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int8FieldData{
NumRows: 0,
Data: make([]int8, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
var v int8
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int8 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_INT16:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int16FieldData{
NumRows: 0,
Data: make([]int16, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
var v int16
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int16 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_INT32:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int32FieldData{
NumRows: 0,
Data: make([]int32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
var v int32
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int32 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_INT64:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int64FieldData{
NumRows: 0,
Data: make([]int64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
switch field.FieldID {
case 0: // rowIDs
fieldData.Data = append(fieldData.Data, msg.RowIDs...)
fieldData.NumRows += len(msg.RowIDs)
case 1: // Timestamps
for _, ts := range msg.Timestamps {
fieldData.Data = append(fieldData.Data, int64(ts))
}
fieldData.NumRows += len(msg.Timestamps)
default:
var v int64
for _, blob := range msg.RowData {
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int64 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
}
case schemapb.DataType_FLOAT:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatFieldData{
NumRows: 0,
Data: make([]float32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
var v float32
for _, blob := range msg.RowData {
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read float32 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_DOUBLE:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.DoubleFieldData{
NumRows: 0,
Data: make([]float64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
var v float64
for _, blob := range msg.RowData {
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read float64 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs)
}
}
// 1.3 store in buffer
ibNode.insertBuffer.insertData[currentSegID] = idata
span.LogFields(oplog.String("store in buffer", "store in buffer"))
// 1.4 if full
// 1.4.1 generate binlogs
span.LogFields(oplog.String("generate binlogs", "generate binlogs"))
if ibNode.insertBuffer.full(currentSegID) {
log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID))
// partitionTag -> partitionID
partitionTag := msg.GetPartitionName()
partitionID, err := typeutil.Hash32String(partitionTag)
if err != nil {
log.Println("partitionTag to partitionID wrong")
// TODO GOOSE add error handler
}
collMeta := &etcdpb.CollectionMeta{
Schema: collSchema,
ID: collectionID,
}
inCodec := storage.NewInsertCodec(collMeta)
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
if err != nil {
log.Println("generate binlog wrong: ", err)
}
// clear buffer
delete(ibNode.insertBuffer.insertData, currentSegID)
log.Println(".. Clearing buffer")
// 1.5.2 binLogs -> minIO/S3
collIDStr := strconv.FormatInt(collectionID, 10)
partitionIDStr := strconv.FormatInt(partitionID, 10)
segIDStr := strconv.FormatInt(currentSegID, 10)
keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr)
log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs))
for index, blob := range binLogs {
uid, err := ibNode.idAllocator.AllocOne()
if err != nil {
log.Println("Allocate Id failed")
// GOOSE TODO error handler
}
key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
err = ibNode.minIOKV.Save(key, string(blob.Value[:]))
if err != nil {
log.Println("Save to MinIO failed")
// GOOSE TODO error handler
}
fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
if err != nil {
log.Println("string to fieldID wrong")
// GOOSE TODO error handler
}
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: iMsg.timeRange.timestampMax,
segID: currentSegID,
fieldID: fieldID,
paths: []string{key},
},
}
log.Println("... Appending binlog paths ...", index)
ibNode.outCh <- inBinlogMsg
}
}
span.Finish()
}
if len(iMsg.insertMessages) > 0 {
log.Println("---insert buffer status---")
var stopSign int = 0
for k := range ibNode.insertBuffer.insertData {
if stopSign >= 10 {
break
}
log.Printf("seg(%v) buffer size = (%v)", k, ibNode.insertBuffer.size(k))
stopSign++
}
}
// iMsg is Flush() msg from master
// 1. insertBuffer(not empty) -> binLogs -> minIO/S3
for _, msg := range iMsg.flushMessages {
currentSegID := msg.GetSegmentID()
flushTs := msg.Base.GetTimestamp()
partitionTag := msg.GetPartitionTag()
collectionID := msg.GetCollectionID()
log.Printf(". Receiving flush message segID(%v)...", currentSegID)
if ibNode.insertBuffer.size(currentSegID) > 0 {
log.Println(".. Buffer not empty, flushing ...")
collSchema, err := ibNode.getCollectionSchemaByID(collectionID)
if err != nil {
// GOOSE TODO add error handler
log.Println("aaa, Get meta wrong: ", err)
}
collMeta := &etcdpb.CollectionMeta{
Schema: collSchema,
ID: collectionID,
}
inCodec := storage.NewInsertCodec(collMeta)
// partitionTag -> partitionID
partitionID, err := typeutil.Hash32String(partitionTag)
if err != nil {
// GOOSE TODO add error handler
log.Println("partitionTag to partitionID Wrong: ", err)
}
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
if err != nil {
log.Println("generate binlog wrong: ", err)
}
// clear buffer
delete(ibNode.insertBuffer.insertData, currentSegID)
// binLogs -> minIO/S3
collIDStr := strconv.FormatInt(collectionID, 10)
partitionIDStr := strconv.FormatInt(partitionID, 10)
segIDStr := strconv.FormatInt(currentSegID, 10)
keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr)
for _, blob := range binLogs {
uid, err := ibNode.idAllocator.AllocOne()
if err != nil {
log.Println("Allocate Id failed")
// GOOSE TODO error handler
}
key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
err = ibNode.minIOKV.Save(key, string(blob.Value[:]))
if err != nil {
log.Println("Save to MinIO failed")
// GOOSE TODO error handler
}
fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
if err != nil {
log.Println("string to fieldID wrong")
// GOOSE TODO error handler
}
// Append binlogs
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: flushTs,
segID: currentSegID,
fieldID: fieldID,
paths: []string{key},
},
}
ibNode.outCh <- inBinlogMsg
}
}
// Flushed
log.Println(".. Flush finished ...")
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: true,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: flushTs,
segID: currentSegID,
},
}
ibNode.outCh <- inBinlogMsg
}
if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
log.Printf("Error: send hard time tick into pulsar channel failed, %s\n", err.Error())
}
var res Msg = &gcMsg{
gcRecord: iMsg.gcRecord,
timeRange: iMsg.timeRange,
}
return []*Msg{&res}
}
func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) {
ret, err := ibNode.replica.getCollectionByID(collectionID)
if err != nil {
return nil, err
}
return ret.schema, nil
}
func (ibNode *insertBufferNode) getCollectionSchemaByName(collectionName string) (*schemapb.CollectionSchema, error) {
ret, err := ibNode.replica.getCollectionByName(collectionName)
if err != nil {
return nil, err
}
return ret.schema, nil
}
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: ts,
EndTimestamp: ts,
HashValues: []uint32{0},
},
TimeTickMsg: internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
MsgID: 0,
Timestamp: ts,
SourceID: Params.DataNodeID,
},
},
}
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
return ibNode.pulsarDataNodeTimeTickStream.Produce(&msgPack)
}
func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg, replica collectionReplica) *insertBufferNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
maxSize := Params.FlushInsertBufferSize
iBuffer := &insertBuffer{
insertData: make(map[UniqueID]*InsertData),
maxSize: maxSize,
}
// MinIO
option := &miniokv.Option{
Address: Params.MinioAddress,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSL,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
minIOKV, err := miniokv.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
}
minioPrefix := Params.InsertBinlogRootPath
idAllocator, err := allocator.NewIDAllocator(ctx, Params.MasterAddress)
if err != nil {
panic(err)
}
err = idAllocator.Start()
if err != nil {
panic(err)
}
wTt := msgstream.NewPulsarMsgStream(ctx, 1024) //input stream, data node time tick
wTt.SetPulsarClient(Params.PulsarAddress)
wTt.CreatePulsarProducers([]string{Params.TimeTickChannelName})
return &insertBufferNode{
BaseNode: baseNode,
insertBuffer: iBuffer,
minIOKV: minIOKV,
minioPrifex: minioPrefix,
idAllocator: idAllocator,
outCh: outCh,
pulsarDataNodeTimeTickStream: wTt,
replica: replica,
}
}