2021-04-19 15:16:33 +08:00
|
|
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
|
2021-01-19 11:37:16 +08:00
|
|
|
package datanode
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding/binary"
|
2021-05-18 19:45:00 +08:00
|
|
|
"errors"
|
2021-01-19 11:37:16 +08:00
|
|
|
"path"
|
|
|
|
"strconv"
|
2021-03-23 18:50:13 +08:00
|
|
|
"sync"
|
2021-01-19 11:37:16 +08:00
|
|
|
"unsafe"
|
|
|
|
|
2021-02-26 10:13:36 +08:00
|
|
|
"go.uber.org/zap"
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/kv"
|
|
|
|
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
|
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
|
|
"github.com/milvus-io/milvus/internal/msgstream"
|
|
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/trace"
|
2021-03-25 14:41:46 +08:00
|
|
|
"github.com/opentracing/opentracing-go"
|
2021-04-22 14:45:57 +08:00
|
|
|
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
2021-01-19 11:37:16 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
CollectionPrefix = "/collection/"
|
|
|
|
SegmentPrefix = "/segment/"
|
|
|
|
)
|
|
|
|
|
|
|
|
type (
|
|
|
|
InsertData = storage.InsertData
|
|
|
|
Blob = storage.Blob
|
|
|
|
)
|
2021-03-25 14:41:46 +08:00
|
|
|
type insertBufferNode struct {
|
|
|
|
BaseNode
|
|
|
|
insertBuffer *insertBuffer
|
|
|
|
replica Replica
|
2021-05-18 19:45:00 +08:00
|
|
|
flushMeta *binlogMeta // GOOSE TODO remove
|
|
|
|
idAllocator allocatorInterface
|
2021-03-25 14:41:46 +08:00
|
|
|
flushMap sync.Map
|
|
|
|
|
2021-04-12 18:09:28 +08:00
|
|
|
minIOKV kv.BaseKV
|
2021-03-25 14:41:46 +08:00
|
|
|
|
|
|
|
timeTickStream msgstream.MsgStream
|
|
|
|
segmentStatisticsStream msgstream.MsgStream
|
|
|
|
completeFlushStream msgstream.MsgStream
|
|
|
|
}
|
|
|
|
|
|
|
|
type insertBuffer struct {
|
|
|
|
insertData map[UniqueID]*InsertData // SegmentID to InsertData
|
|
|
|
maxSize int32
|
|
|
|
}
|
2021-01-19 11:37:16 +08:00
|
|
|
|
|
|
|
func (ib *insertBuffer) size(segmentID UniqueID) int32 {
|
|
|
|
if ib.insertData == nil || len(ib.insertData) <= 0 {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
idata, ok := ib.insertData[segmentID]
|
|
|
|
if !ok {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
|
|
|
var maxSize int32 = 0
|
|
|
|
for _, data := range idata.Data {
|
|
|
|
fdata, ok := data.(*storage.FloatVectorFieldData)
|
|
|
|
if ok && int32(fdata.NumRows) > maxSize {
|
|
|
|
maxSize = int32(fdata.NumRows)
|
|
|
|
}
|
|
|
|
|
|
|
|
bdata, ok := data.(*storage.BinaryVectorFieldData)
|
|
|
|
if ok && int32(bdata.NumRows) > maxSize {
|
|
|
|
maxSize = int32(bdata.NumRows)
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
return maxSize
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ib *insertBuffer) full(segmentID UniqueID) bool {
|
|
|
|
return ib.size(segmentID) >= ib.maxSize
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ibNode *insertBufferNode) Name() string {
|
|
|
|
return "ibNode"
|
|
|
|
}
|
|
|
|
|
2021-03-25 14:41:46 +08:00
|
|
|
func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
2021-01-19 11:37:16 +08:00
|
|
|
|
|
|
|
if len(in) != 1 {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
|
2021-01-19 11:37:16 +08:00
|
|
|
// TODO: add error handling
|
|
|
|
}
|
|
|
|
|
2021-02-25 17:35:36 +08:00
|
|
|
iMsg, ok := in[0].(*insertMsg)
|
2021-01-19 11:37:16 +08:00
|
|
|
if !ok {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("type assertion failed for insertMsg")
|
2021-01-19 11:37:16 +08:00
|
|
|
// TODO: add error handling
|
|
|
|
}
|
|
|
|
|
2021-03-23 01:49:50 +08:00
|
|
|
if iMsg == nil {
|
2021-03-25 14:41:46 +08:00
|
|
|
return []Msg{}
|
|
|
|
}
|
|
|
|
|
|
|
|
var spans []opentracing.Span
|
|
|
|
for _, msg := range iMsg.insertMessages {
|
|
|
|
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
|
|
|
|
spans = append(spans, sp)
|
|
|
|
msg.SetTraceCtx(ctx)
|
2021-03-23 01:49:50 +08:00
|
|
|
}
|
|
|
|
|
2021-01-22 09:36:40 +08:00
|
|
|
// Updating segment statistics
|
2021-03-16 17:55:42 +08:00
|
|
|
uniqueSeg := make(map[UniqueID]int64)
|
2021-01-21 09:55:25 +08:00
|
|
|
for _, msg := range iMsg.insertMessages {
|
2021-03-25 14:41:46 +08:00
|
|
|
|
2021-01-21 09:55:25 +08:00
|
|
|
currentSegID := msg.GetSegmentID()
|
2021-01-22 19:36:09 +08:00
|
|
|
collID := msg.GetCollectionID()
|
|
|
|
partitionID := msg.GetPartitionID()
|
|
|
|
|
2021-01-21 09:55:25 +08:00
|
|
|
if !ibNode.replica.hasSegment(currentSegID) {
|
2021-02-04 11:19:48 +08:00
|
|
|
err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID())
|
2021-01-22 19:36:09 +08:00
|
|
|
if err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("add segment wrong", zap.Error(err))
|
2021-01-22 19:36:09 +08:00
|
|
|
}
|
2021-03-16 17:55:42 +08:00
|
|
|
|
|
|
|
switch {
|
|
|
|
case iMsg.startPositions == nil || len(iMsg.startPositions) <= 0:
|
|
|
|
log.Error("insert Msg StartPosition empty")
|
|
|
|
default:
|
2021-03-22 16:36:10 +08:00
|
|
|
segment, err := ibNode.replica.getSegmentByID(currentSegID)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("get segment wrong", zap.Error(err))
|
|
|
|
}
|
|
|
|
var startPosition *internalpb.MsgPosition = nil
|
|
|
|
for _, pos := range iMsg.startPositions {
|
|
|
|
if pos.ChannelName == segment.channelName {
|
|
|
|
startPosition = pos
|
2021-03-30 09:47:27 +08:00
|
|
|
break
|
2021-03-22 16:36:10 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if startPosition == nil {
|
|
|
|
log.Error("get position wrong", zap.Error(err))
|
|
|
|
} else {
|
|
|
|
ibNode.replica.setStartPosition(currentSegID, startPosition)
|
|
|
|
}
|
2021-03-16 17:55:42 +08:00
|
|
|
}
|
2021-01-22 19:36:09 +08:00
|
|
|
}
|
|
|
|
|
2021-03-16 17:55:42 +08:00
|
|
|
segNum := uniqueSeg[currentSegID]
|
|
|
|
uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
|
2021-01-21 09:55:25 +08:00
|
|
|
}
|
2021-02-04 11:19:48 +08:00
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
segToUpdate := make([]UniqueID, 0, len(uniqueSeg))
|
2021-03-16 17:55:42 +08:00
|
|
|
for id, num := range uniqueSeg {
|
2021-05-18 19:45:00 +08:00
|
|
|
segToUpdate = append(segToUpdate, id)
|
2021-03-16 17:55:42 +08:00
|
|
|
|
|
|
|
err := ibNode.replica.updateStatistics(id, num)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("update Segment Row number wrong", zap.Error(err))
|
|
|
|
}
|
2021-01-21 09:55:25 +08:00
|
|
|
}
|
2021-02-04 11:19:48 +08:00
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
if len(segToUpdate) > 0 {
|
|
|
|
err := ibNode.updateSegStatistics(segToUpdate)
|
2021-03-16 17:55:42 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Error("update segment statistics error", zap.Error(err))
|
2021-02-04 11:19:48 +08:00
|
|
|
}
|
2021-01-21 09:55:25 +08:00
|
|
|
}
|
|
|
|
|
2021-01-19 11:37:16 +08:00
|
|
|
// iMsg is insertMsg
|
|
|
|
// 1. iMsg -> buffer
|
|
|
|
for _, msg := range iMsg.insertMessages {
|
|
|
|
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("misaligned messages detected")
|
2021-01-19 11:37:16 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
currentSegID := msg.GetSegmentID()
|
2021-01-22 19:36:09 +08:00
|
|
|
collectionID := msg.GetCollectionID()
|
2021-01-19 11:37:16 +08:00
|
|
|
|
|
|
|
idata, ok := ibNode.insertBuffer.insertData[currentSegID]
|
|
|
|
if !ok {
|
|
|
|
idata = &InsertData{
|
|
|
|
Data: make(map[UniqueID]storage.FieldData),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-22 19:36:09 +08:00
|
|
|
// 1.1 Get CollectionMeta
|
|
|
|
collection, err := ibNode.replica.getCollectionByID(collectionID)
|
2021-01-19 11:37:16 +08:00
|
|
|
if err != nil {
|
|
|
|
// GOOSE TODO add error handler
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("Get meta wrong:", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
collSchema := collection.schema
|
|
|
|
// 1.2 Get Fields
|
|
|
|
var pos int = 0 // Record position of blob
|
2021-06-05 16:21:36 +08:00
|
|
|
log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("Fields", collSchema.Fields))
|
|
|
|
var fieldIDs []int64
|
|
|
|
var fieldTypes []schemapb.DataType
|
|
|
|
for _, field := range collSchema.Fields {
|
|
|
|
fieldIDs = append(fieldIDs, field.FieldID)
|
|
|
|
fieldTypes = append(fieldTypes, field.DataType)
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("FieldIDs", fieldIDs))
|
|
|
|
log.Debug("DataNode flow_graph_insert_buffer_node", zap.Any("fieldTypes", fieldTypes))
|
|
|
|
|
2021-01-19 11:37:16 +08:00
|
|
|
for _, field := range collSchema.Fields {
|
|
|
|
switch field.DataType {
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_FloatVector:
|
2021-01-19 11:37:16 +08:00
|
|
|
var dim int
|
|
|
|
for _, t := range field.TypeParams {
|
|
|
|
if t.Key == "dim" {
|
|
|
|
dim, err = strconv.Atoi(t.Value)
|
|
|
|
if err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("strconv wrong")
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if dim <= 0 {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("invalid dim")
|
|
|
|
continue
|
2021-01-19 11:37:16 +08:00
|
|
|
// TODO: add error handling
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]float32, 0),
|
|
|
|
Dim: dim,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
|
|
|
|
|
|
|
|
var offset int
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
offset = 0
|
|
|
|
for j := 0; j < dim; j++ {
|
|
|
|
var v float32
|
|
|
|
buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("binary.read float32 wrong", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
offset += int(unsafe.Sizeof(*(&v)))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
pos += offset
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_BinaryVector:
|
2021-01-19 11:37:16 +08:00
|
|
|
var dim int
|
|
|
|
for _, t := range field.TypeParams {
|
|
|
|
if t.Key == "dim" {
|
|
|
|
dim, err = strconv.Atoi(t.Value)
|
|
|
|
if err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("strconv wrong")
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if dim <= 0 {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("invalid dim")
|
2021-01-19 11:37:16 +08:00
|
|
|
// TODO: add error handling
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]byte, 0),
|
|
|
|
Dim: dim,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
|
|
|
|
|
|
|
|
var offset int
|
|
|
|
for _, blob := range msg.RowData {
|
2021-02-20 09:20:51 +08:00
|
|
|
bv := blob.GetValue()[pos : pos+(dim/8)]
|
2021-01-19 11:37:16 +08:00
|
|
|
fieldData.Data = append(fieldData.Data, bv...)
|
|
|
|
offset = len(bv)
|
|
|
|
}
|
|
|
|
pos += offset
|
|
|
|
fieldData.NumRows += len(msg.RowData)
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_Bool:
|
2021-01-19 11:37:16 +08:00
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.BoolFieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]bool, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
|
|
|
|
var v bool
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
buf := bytes.NewReader(blob.GetValue()[pos:])
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("binary.Read bool wrong", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
|
|
|
|
}
|
|
|
|
pos += int(unsafe.Sizeof(*(&v)))
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_Int8:
|
2021-01-19 11:37:16 +08:00
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.Int8FieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]int8, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
|
|
|
|
var v int8
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
buf := bytes.NewReader(blob.GetValue()[pos:])
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("binary.Read int8 wrong", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
|
|
|
pos += int(unsafe.Sizeof(*(&v)))
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_Int16:
|
2021-01-19 11:37:16 +08:00
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.Int16FieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]int16, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
|
|
|
|
var v int16
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
buf := bytes.NewReader(blob.GetValue()[pos:])
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("binary.Read int16 wrong", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
|
|
|
pos += int(unsafe.Sizeof(*(&v)))
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_Int32:
|
2021-01-19 11:37:16 +08:00
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.Int32FieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]int32, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
|
|
|
|
var v int32
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
buf := bytes.NewReader(blob.GetValue()[pos:])
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("binary.Read int32 wrong", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
|
|
|
pos += int(unsafe.Sizeof(*(&v)))
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_Int64:
|
2021-01-19 11:37:16 +08:00
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.Int64FieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]int64, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
|
|
|
|
switch field.FieldID {
|
|
|
|
case 0: // rowIDs
|
|
|
|
fieldData.Data = append(fieldData.Data, msg.RowIDs...)
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
case 1: // Timestamps
|
|
|
|
for _, ts := range msg.Timestamps {
|
|
|
|
fieldData.Data = append(fieldData.Data, int64(ts))
|
|
|
|
}
|
|
|
|
fieldData.NumRows += len(msg.Timestamps)
|
|
|
|
default:
|
|
|
|
var v int64
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
buf := bytes.NewBuffer(blob.GetValue()[pos:])
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("binary.Read int64 wrong", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
|
|
|
pos += int(unsafe.Sizeof(*(&v)))
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
}
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_Float:
|
2021-01-19 11:37:16 +08:00
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.FloatFieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]float32, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
|
|
|
|
var v float32
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
buf := bytes.NewBuffer(blob.GetValue()[pos:])
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("binary.Read float32 wrong", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
|
|
|
pos += int(unsafe.Sizeof(*(&v)))
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
case schemapb.DataType_Double:
|
2021-01-19 11:37:16 +08:00
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.DoubleFieldData{
|
|
|
|
NumRows: 0,
|
|
|
|
Data: make([]float64, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
|
|
|
|
var v float64
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
buf := bytes.NewBuffer(blob.GetValue()[pos:])
|
|
|
|
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("binary.Read float64 wrong", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
|
|
|
|
|
|
|
pos += int(unsafe.Sizeof(*(&v)))
|
|
|
|
fieldData.NumRows += len(msg.RowIDs)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// 1.3 store in buffer
|
|
|
|
ibNode.insertBuffer.insertData[currentSegID] = idata
|
|
|
|
|
2021-03-16 17:55:42 +08:00
|
|
|
switch {
|
|
|
|
case iMsg.endPositions == nil || len(iMsg.endPositions) <= 0:
|
|
|
|
log.Error("insert Msg EndPosition empty")
|
|
|
|
default:
|
2021-03-22 16:36:10 +08:00
|
|
|
segment, err := ibNode.replica.getSegmentByID(currentSegID)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("get segment wrong", zap.Error(err))
|
|
|
|
}
|
|
|
|
var endPosition *internalpb.MsgPosition = nil
|
|
|
|
for _, pos := range iMsg.endPositions {
|
|
|
|
if pos.ChannelName == segment.channelName {
|
|
|
|
endPosition = pos
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if endPosition == nil {
|
|
|
|
log.Error("get position wrong", zap.Error(err))
|
|
|
|
}
|
|
|
|
ibNode.replica.setEndPosition(currentSegID, endPosition)
|
2021-03-16 17:55:42 +08:00
|
|
|
}
|
|
|
|
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
if len(iMsg.insertMessages) > 0 {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Debug("---insert buffer status---")
|
2021-01-19 11:37:16 +08:00
|
|
|
var stopSign int = 0
|
|
|
|
for k := range ibNode.insertBuffer.insertData {
|
|
|
|
if stopSign >= 10 {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Debug("......")
|
2021-01-19 11:37:16 +08:00
|
|
|
break
|
|
|
|
}
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Debug("seg buffer status", zap.Int64("segmentID", k), zap.Int32("buffer size", ibNode.insertBuffer.size(k)))
|
2021-01-19 11:37:16 +08:00
|
|
|
stopSign++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-22 09:36:40 +08:00
|
|
|
// iMsg is Flush() msg from dataservice
|
2021-01-19 11:37:16 +08:00
|
|
|
// 1. insertBuffer(not empty) -> binLogs -> minIO/S3
|
|
|
|
for _, msg := range iMsg.flushMessages {
|
2021-01-22 09:36:40 +08:00
|
|
|
for _, currentSegID := range msg.segmentIDs {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
// finishCh := make(chan bool)
|
|
|
|
finishCh := make(chan map[UniqueID]string)
|
2021-03-23 18:50:13 +08:00
|
|
|
go ibNode.completeFlush(currentSegID, finishCh)
|
|
|
|
|
|
|
|
if ibNode.insertBuffer.size(currentSegID) <= 0 {
|
|
|
|
log.Debug(".. Buffer empty ...")
|
2021-05-18 19:45:00 +08:00
|
|
|
finishCh <- make(map[UniqueID]string)
|
2021-03-23 18:50:13 +08:00
|
|
|
continue
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
2021-03-23 18:50:13 +08:00
|
|
|
|
|
|
|
log.Debug(".. Buffer not empty, flushing ..")
|
|
|
|
ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
|
|
|
|
delete(ibNode.insertBuffer.insertData, currentSegID)
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn := func() {
|
2021-05-18 19:45:00 +08:00
|
|
|
finishCh <- nil
|
2021-03-30 09:47:27 +08:00
|
|
|
log.Debug(".. Clearing flush Buffer ..")
|
|
|
|
ibNode.flushMap.Delete(currentSegID)
|
|
|
|
}
|
2021-03-23 18:50:13 +08:00
|
|
|
|
|
|
|
seg, err := ibNode.replica.getSegmentByID(currentSegID)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn()
|
2021-03-23 18:50:13 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
collSch, err := ibNode.getCollectionSchemaByID(seg.collectionID)
|
2021-02-07 17:02:13 +08:00
|
|
|
if err != nil {
|
2021-03-23 18:50:13 +08:00
|
|
|
log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn()
|
2021-03-23 18:50:13 +08:00
|
|
|
continue
|
2021-02-07 17:02:13 +08:00
|
|
|
}
|
2021-03-23 18:50:13 +08:00
|
|
|
|
|
|
|
collMeta := &etcdpb.CollectionMeta{
|
|
|
|
Schema: collSch,
|
|
|
|
ID: seg.collectionID,
|
|
|
|
}
|
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
go flushSegment(collMeta, currentSegID, seg.partitionID, seg.collectionID,
|
|
|
|
&ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, segToFlush := range segToUpdate {
|
|
|
|
// If full, auto flush
|
|
|
|
if ibNode.insertBuffer.full(segToFlush) {
|
|
|
|
log.Debug(". Insert Buffer full, auto flushing ",
|
|
|
|
zap.Int32("num of rows", ibNode.insertBuffer.size(segToFlush)))
|
|
|
|
|
|
|
|
collMeta, err := ibNode.getCollMetabySegID(segToFlush)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
ibNode.flushMap.Store(segToFlush, ibNode.insertBuffer.insertData[segToFlush])
|
|
|
|
delete(ibNode.insertBuffer.insertData, segToFlush)
|
|
|
|
|
|
|
|
collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(segToFlush)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Auto flush failed .. cannot get collection ID or partition ID..", zap.Error(err))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
finishCh := make(chan map[UniqueID]string)
|
|
|
|
go flushSegment(collMeta, segToFlush, partitionID, collID,
|
|
|
|
&ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator)
|
|
|
|
go ibNode.bufferAutoFlushPaths(finishCh, segToFlush)
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
var res Msg = &gcMsg{
|
|
|
|
gcRecord: iMsg.gcRecord,
|
|
|
|
timeRange: iMsg.timeRange,
|
|
|
|
}
|
2021-03-25 14:41:46 +08:00
|
|
|
for _, sp := range spans {
|
|
|
|
sp.Finish()
|
|
|
|
}
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-03-25 14:41:46 +08:00
|
|
|
return []Msg{res}
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID,
|
|
|
|
insertData *sync.Map, kv kv.BaseKV, field2PathCh chan<- map[UniqueID]string, idAllocator allocatorInterface) {
|
2021-01-22 09:36:40 +08:00
|
|
|
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn := func(isSuccess bool) {
|
2021-05-18 19:45:00 +08:00
|
|
|
if !isSuccess {
|
|
|
|
field2PathCh <- nil
|
|
|
|
}
|
|
|
|
|
2021-03-23 18:50:13 +08:00
|
|
|
log.Debug(".. Clearing flush Buffer ..")
|
|
|
|
insertData.Delete(segID)
|
2021-03-30 09:47:27 +08:00
|
|
|
}
|
2021-01-22 09:36:40 +08:00
|
|
|
|
|
|
|
inCodec := storage.NewInsertCodec(collMeta)
|
|
|
|
|
|
|
|
// buffer data to binlogs
|
2021-03-23 18:50:13 +08:00
|
|
|
data, ok := insertData.Load(segID)
|
|
|
|
if !ok {
|
|
|
|
log.Error("Flush failed ... cannot load insertData ..")
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn(false)
|
2021-03-23 18:50:13 +08:00
|
|
|
return
|
|
|
|
}
|
2021-01-22 09:36:40 +08:00
|
|
|
|
2021-05-20 18:38:45 +08:00
|
|
|
binLogs, statsBinlogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
|
2021-01-22 09:36:40 +08:00
|
|
|
if err != nil {
|
2021-03-23 18:50:13 +08:00
|
|
|
log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn(false)
|
2021-03-23 18:50:13 +08:00
|
|
|
return
|
2021-01-22 09:36:40 +08:00
|
|
|
}
|
|
|
|
|
2021-03-23 18:50:13 +08:00
|
|
|
log.Debug(".. Saving binlogs to MinIO ..", zap.Int("number", len(binLogs)))
|
|
|
|
field2Path := make(map[UniqueID]string, len(binLogs))
|
|
|
|
kvs := make(map[string]string, len(binLogs))
|
|
|
|
paths := make([]string, 0, len(binLogs))
|
2021-05-20 18:38:45 +08:00
|
|
|
field2Logidx := make(map[UniqueID]UniqueID, len(binLogs))
|
|
|
|
|
|
|
|
// write insert binlog
|
2021-03-23 18:50:13 +08:00
|
|
|
for _, blob := range binLogs {
|
|
|
|
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
|
2021-01-22 09:36:40 +08:00
|
|
|
if err != nil {
|
2021-03-23 18:50:13 +08:00
|
|
|
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn(false)
|
2021-03-23 18:50:13 +08:00
|
|
|
return
|
2021-01-22 09:36:40 +08:00
|
|
|
}
|
|
|
|
|
2021-05-20 18:38:45 +08:00
|
|
|
logidx, err := idAllocator.allocID()
|
2021-01-22 09:36:40 +08:00
|
|
|
if err != nil {
|
2021-03-23 18:50:13 +08:00
|
|
|
log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn(false)
|
2021-03-23 18:50:13 +08:00
|
|
|
return
|
2021-01-22 09:36:40 +08:00
|
|
|
}
|
|
|
|
|
2021-05-20 18:38:45 +08:00
|
|
|
// no error raise if alloc=false
|
|
|
|
k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)
|
|
|
|
|
2021-03-23 18:50:13 +08:00
|
|
|
key := path.Join(Params.InsertBinlogRootPath, k)
|
|
|
|
paths = append(paths, key)
|
|
|
|
kvs[key] = string(blob.Value[:])
|
|
|
|
field2Path[fieldID] = key
|
2021-05-20 18:38:45 +08:00
|
|
|
field2Logidx[fieldID] = logidx
|
|
|
|
}
|
|
|
|
|
|
|
|
// write stats binlog
|
|
|
|
for _, blob := range statsBinlogs {
|
|
|
|
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
|
|
|
|
clearFn(false)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
logidx := field2Logidx[fieldID]
|
|
|
|
|
|
|
|
// no error raise if alloc=false
|
|
|
|
k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)
|
|
|
|
|
|
|
|
key := path.Join(Params.StatsBinlogRootPath, k)
|
|
|
|
kvs[key] = string(blob.Value[:])
|
2021-03-23 18:50:13 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
err = kv.MultiSave(kvs)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
|
|
|
|
_ = kv.MultiRemove(paths)
|
2021-03-30 09:47:27 +08:00
|
|
|
clearFn(false)
|
2021-03-23 18:50:13 +08:00
|
|
|
return
|
|
|
|
}
|
2021-01-22 09:36:40 +08:00
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
field2PathCh <- field2Path
|
|
|
|
clearFn(true)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ibNode *insertBufferNode) bufferAutoFlushPaths(wait <-chan map[UniqueID]string, segID UniqueID) error {
|
|
|
|
field2Path := <-wait
|
|
|
|
if field2Path == nil {
|
|
|
|
return errors.New("Nil field2Path")
|
2021-01-22 09:36:40 +08:00
|
|
|
}
|
2021-03-23 18:50:13 +08:00
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
return ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path)
|
2021-01-22 09:36:40 +08:00
|
|
|
}
|
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[UniqueID]string) {
|
|
|
|
field2Path := <-wait
|
|
|
|
|
|
|
|
if field2Path == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO Call DataService RPC SaveBinlogPaths
|
|
|
|
ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path)
|
|
|
|
bufferField2Paths, err := ibNode.replica.getBufferPaths(segID)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Flush failed ... cannot get buffered paths", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
|
|
|
// GOOSE TODO remove the below
|
|
|
|
log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number of fields", len(field2Path)))
|
|
|
|
err = ibNode.flushMeta.SaveSegmentBinlogMetaTxn(segID, bufferField2Paths)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err))
|
2021-03-23 18:50:13 +08:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug(".. Segment flush completed ..")
|
2021-03-16 17:55:42 +08:00
|
|
|
ibNode.replica.setIsFlushed(segID)
|
|
|
|
ibNode.updateSegStatistics([]UniqueID{segID})
|
|
|
|
|
2021-01-22 19:36:09 +08:00
|
|
|
msgPack := msgstream.MsgPack{}
|
2021-03-12 14:22:09 +08:00
|
|
|
completeFlushMsg := internalpb.SegmentFlushCompletedMsg{
|
2021-01-22 19:36:09 +08:00
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_SegmentFlushDone,
|
2021-01-22 19:36:09 +08:00
|
|
|
MsgID: 0, // GOOSE TODO
|
|
|
|
Timestamp: 0, // GOOSE TODO
|
2021-01-24 21:20:11 +08:00
|
|
|
SourceID: Params.NodeID,
|
2021-01-22 19:36:09 +08:00
|
|
|
},
|
|
|
|
SegmentID: segID,
|
|
|
|
}
|
|
|
|
var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
HashValues: []uint32{0},
|
|
|
|
},
|
|
|
|
SegmentFlushCompletedMsg: completeFlushMsg,
|
|
|
|
}
|
|
|
|
|
|
|
|
msgPack.Msgs = append(msgPack.Msgs, msg)
|
2021-05-18 19:45:00 +08:00
|
|
|
err = ibNode.completeFlushStream.Produce(&msgPack)
|
2021-03-23 18:50:13 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Error(".. Produce complete flush msg failed ..", zap.Error(err))
|
|
|
|
}
|
2021-01-22 19:36:09 +08:00
|
|
|
}
|
|
|
|
|
2021-01-19 11:37:16 +08:00
|
|
|
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
|
|
|
|
msgPack := msgstream.MsgPack{}
|
|
|
|
timeTickMsg := msgstream.TimeTickMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
BeginTimestamp: ts,
|
|
|
|
EndTimestamp: ts,
|
|
|
|
HashValues: []uint32{0},
|
|
|
|
},
|
2021-03-12 14:22:09 +08:00
|
|
|
TimeTickMsg: internalpb.TimeTickMsg{
|
2021-01-19 11:37:16 +08:00
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_TimeTick,
|
2021-01-22 19:36:09 +08:00
|
|
|
MsgID: 0, // GOOSE TODO
|
|
|
|
Timestamp: ts, // GOOSE TODO
|
2021-01-24 21:20:11 +08:00
|
|
|
SourceID: Params.NodeID,
|
2021-01-19 11:37:16 +08:00
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
2021-03-25 14:41:46 +08:00
|
|
|
return ibNode.timeTickStream.Produce(&msgPack)
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-03-16 17:55:42 +08:00
|
|
|
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Debug("Updating segments statistics...")
|
2021-03-12 14:22:09 +08:00
|
|
|
statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
|
2021-01-21 09:55:25 +08:00
|
|
|
for _, segID := range segIDs {
|
|
|
|
updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID)
|
|
|
|
if err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err))
|
2021-01-21 09:55:25 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
statsUpdates = append(statsUpdates, updates)
|
|
|
|
}
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
segStats := internalpb.SegmentStatistics{
|
2021-01-21 09:55:25 +08:00
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_SegmentStatistics,
|
2021-01-21 09:55:25 +08:00
|
|
|
MsgID: UniqueID(0), // GOOSE TODO
|
|
|
|
Timestamp: Timestamp(0), // GOOSE TODO
|
2021-01-24 21:20:11 +08:00
|
|
|
SourceID: Params.NodeID,
|
2021-01-21 09:55:25 +08:00
|
|
|
},
|
|
|
|
SegStats: statsUpdates,
|
|
|
|
}
|
|
|
|
|
|
|
|
var msg msgstream.TsMsg = &msgstream.SegmentStatisticsMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
2021-02-04 11:19:48 +08:00
|
|
|
HashValues: []uint32{0}, // GOOSE TODO
|
2021-01-21 09:55:25 +08:00
|
|
|
},
|
|
|
|
SegmentStatistics: segStats,
|
|
|
|
}
|
|
|
|
|
|
|
|
var msgPack = msgstream.MsgPack{
|
|
|
|
Msgs: []msgstream.TsMsg{msg},
|
|
|
|
}
|
2021-03-25 14:41:46 +08:00
|
|
|
return ibNode.segmentStatisticsStream.Produce(&msgPack)
|
2021-01-21 09:55:25 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) {
|
|
|
|
ret, err := ibNode.replica.getCollectionByID(collectionID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return ret.schema, nil
|
|
|
|
}
|
|
|
|
|
2021-05-18 19:45:00 +08:00
|
|
|
func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID) (meta *etcdpb.CollectionMeta, err error) {
|
|
|
|
ret, err := ibNode.replica.getSegmentByID(segmentID)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
2021-05-21 16:21:08 +08:00
|
|
|
meta = &etcdpb.CollectionMeta{}
|
2021-05-18 19:45:00 +08:00
|
|
|
meta.ID = ret.collectionID
|
|
|
|
|
|
|
|
coll, err := ibNode.replica.getCollectionByID(ret.collectionID)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
meta.Schema = coll.GetSchema()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
|
|
|
|
seg, err := ibNode.replica.getSegmentByID(segmentID)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
collID = seg.collectionID
|
|
|
|
partitionID = seg.partitionID
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-03-23 18:50:13 +08:00
|
|
|
func newInsertBufferNode(ctx context.Context, flushMeta *binlogMeta,
|
2021-05-18 19:45:00 +08:00
|
|
|
replica Replica, factory msgstream.Factory, idAllocator allocatorInterface) *insertBufferNode {
|
2021-01-19 11:37:16 +08:00
|
|
|
maxQueueLength := Params.FlowGraphMaxQueueLength
|
|
|
|
maxParallelism := Params.FlowGraphMaxParallelism
|
|
|
|
|
|
|
|
baseNode := BaseNode{}
|
|
|
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
|
|
|
baseNode.SetMaxParallelism(maxParallelism)
|
|
|
|
|
|
|
|
maxSize := Params.FlushInsertBufferSize
|
|
|
|
iBuffer := &insertBuffer{
|
|
|
|
insertData: make(map[UniqueID]*InsertData),
|
|
|
|
maxSize: maxSize,
|
|
|
|
}
|
|
|
|
|
|
|
|
// MinIO
|
|
|
|
option := &miniokv.Option{
|
|
|
|
Address: Params.MinioAddress,
|
|
|
|
AccessKeyID: Params.MinioAccessKeyID,
|
|
|
|
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
|
|
|
UseSSL: Params.MinioUseSSL,
|
|
|
|
CreateBucket: true,
|
|
|
|
BucketName: Params.MinioBucketName,
|
|
|
|
}
|
|
|
|
|
|
|
|
minIOKV, err := miniokv.NewMinIOKV(ctx, option)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2021-01-21 09:55:25 +08:00
|
|
|
//input stream, data node time tick
|
2021-02-04 14:37:12 +08:00
|
|
|
wTt, _ := factory.NewMsgStream(ctx)
|
|
|
|
wTt.AsProducer([]string{Params.TimeTickChannelName})
|
2021-03-05 18:16:50 +08:00
|
|
|
log.Debug("datanode AsProducer: " + Params.TimeTickChannelName)
|
2021-01-21 09:55:25 +08:00
|
|
|
var wTtMsgStream msgstream.MsgStream = wTt
|
2021-01-24 21:20:11 +08:00
|
|
|
wTtMsgStream.Start()
|
2021-01-21 09:55:25 +08:00
|
|
|
|
|
|
|
// update statistics channel
|
2021-02-04 14:37:12 +08:00
|
|
|
segS, _ := factory.NewMsgStream(ctx)
|
|
|
|
segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
|
2021-03-05 18:16:50 +08:00
|
|
|
log.Debug("datanode AsProducer: " + Params.SegmentStatisticsChannelName)
|
2021-01-21 09:55:25 +08:00
|
|
|
var segStatisticsMsgStream msgstream.MsgStream = segS
|
2021-01-24 21:20:11 +08:00
|
|
|
segStatisticsMsgStream.Start()
|
2021-01-22 19:36:09 +08:00
|
|
|
|
|
|
|
// segment flush completed channel
|
2021-02-04 14:37:12 +08:00
|
|
|
cf, _ := factory.NewMsgStream(ctx)
|
|
|
|
cf.AsProducer([]string{Params.CompleteFlushChannelName})
|
2021-03-05 18:16:50 +08:00
|
|
|
log.Debug("datanode AsProducer: " + Params.CompleteFlushChannelName)
|
2021-01-22 19:36:09 +08:00
|
|
|
var completeFlushStream msgstream.MsgStream = cf
|
2021-01-24 21:20:11 +08:00
|
|
|
completeFlushStream.Start()
|
2021-01-19 11:37:16 +08:00
|
|
|
|
|
|
|
return &insertBufferNode{
|
2021-01-22 09:36:40 +08:00
|
|
|
BaseNode: baseNode,
|
|
|
|
insertBuffer: iBuffer,
|
|
|
|
minIOKV: minIOKV,
|
|
|
|
timeTickStream: wTtMsgStream,
|
|
|
|
segmentStatisticsStream: segStatisticsMsgStream,
|
2021-01-22 19:36:09 +08:00
|
|
|
completeFlushStream: completeFlushStream,
|
2021-01-22 09:36:40 +08:00
|
|
|
replica: replica,
|
|
|
|
flushMeta: flushMeta,
|
2021-03-23 18:50:13 +08:00
|
|
|
flushMap: sync.Map{},
|
2021-05-18 19:45:00 +08:00
|
|
|
idAllocator: idAllocator,
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
}
|