2021-10-15 18:07:09 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 15:16:33 +08:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-10-15 18:07:09 +08:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 15:16:33 +08:00
|
|
|
//
|
2021-10-15 18:07:09 +08:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-04-19 15:16:33 +08:00
|
|
|
|
2021-01-19 11:37:16 +08:00
|
|
|
package datanode
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding/binary"
|
2021-09-09 15:00:00 +08:00
|
|
|
"errors"
|
2021-06-16 19:03:57 +08:00
|
|
|
"fmt"
|
2021-10-13 21:32:33 +08:00
|
|
|
"io"
|
2021-01-19 11:37:16 +08:00
|
|
|
"strconv"
|
2021-03-23 18:50:13 +08:00
|
|
|
"sync"
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-09-11 11:36:22 +08:00
|
|
|
"github.com/golang/protobuf/proto"
|
2021-09-17 16:27:56 +08:00
|
|
|
"github.com/opentracing/opentracing-go"
|
2021-11-04 15:40:14 +08:00
|
|
|
"go.uber.org/atomic"
|
2021-02-26 10:13:36 +08:00
|
|
|
"go.uber.org/zap"
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
|
|
"github.com/milvus-io/milvus/internal/msgstream"
|
|
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/trace"
|
2021-11-04 15:40:14 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
2021-04-22 14:45:57 +08:00
|
|
|
|
2021-11-02 18:16:32 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/common"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
2021-05-26 12:09:03 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
2021-01-19 11:37:16 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
type (
|
2021-09-23 19:05:55 +08:00
|
|
|
// InsertData of storage
|
2021-01-19 11:37:16 +08:00
|
|
|
InsertData = storage.InsertData
|
2021-09-23 19:05:55 +08:00
|
|
|
|
|
|
|
// Blob of storage
|
|
|
|
Blob = storage.Blob
|
2021-01-19 11:37:16 +08:00
|
|
|
)
|
2021-09-17 16:27:56 +08:00
|
|
|
|
2021-03-25 14:41:46 +08:00
|
|
|
type insertBufferNode struct {
|
|
|
|
BaseNode
|
2021-06-08 19:25:37 +08:00
|
|
|
channelName string
|
2021-09-26 20:55:59 +08:00
|
|
|
insertBuffer sync.Map // SegmentID to BufferData
|
|
|
|
replica Replica
|
|
|
|
idAllocator allocatorInterface
|
2021-09-18 14:25:50 +08:00
|
|
|
|
2021-09-23 16:03:54 +08:00
|
|
|
flushMap sync.Map
|
2021-10-18 12:34:34 +08:00
|
|
|
flushChan <-chan flushMsg
|
2021-09-23 16:03:54 +08:00
|
|
|
flushingSegCache *Cache
|
2021-10-19 11:04:34 +08:00
|
|
|
flushManager flushManager
|
2021-03-25 14:41:46 +08:00
|
|
|
|
|
|
|
timeTickStream msgstream.MsgStream
|
|
|
|
segmentStatisticsStream msgstream.MsgStream
|
2021-11-04 15:40:14 +08:00
|
|
|
ttLogger timeTickLogger
|
2021-11-05 14:59:32 +08:00
|
|
|
ttMerger *mergedTimeTickerSender
|
2021-11-04 15:40:14 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
type timeTickLogger struct {
|
|
|
|
start atomic.Uint64
|
|
|
|
counter atomic.Int32
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *timeTickLogger) LogTs(ts Timestamp) {
|
|
|
|
if l.counter.Load() == 0 {
|
|
|
|
l.start.Store(ts)
|
|
|
|
}
|
|
|
|
l.counter.Inc()
|
|
|
|
if l.counter.Load() == 1000 {
|
|
|
|
min := l.start.Load()
|
|
|
|
l.start.Store(ts)
|
|
|
|
l.counter.Store(0)
|
|
|
|
go l.printLogs(min, ts)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *timeTickLogger) printLogs(start, end Timestamp) {
|
|
|
|
t1, _ := tsoutil.ParseTS(start)
|
|
|
|
t2, _ := tsoutil.ParseTS(end)
|
|
|
|
log.Debug("IBN timetick log", zap.Time("from", t1), zap.Time("to", t2), zap.Duration("elapsed", t2.Sub(t1)), zap.Uint64("start", start), zap.Uint64("end", end))
|
2021-06-04 16:31:34 +08:00
|
|
|
}
|
|
|
|
|
2021-06-06 13:21:37 +08:00
|
|
|
type segmentCheckPoint struct {
|
|
|
|
numRows int64
|
|
|
|
pos internalpb.MsgPosition
|
|
|
|
}
|
|
|
|
|
|
|
|
type segmentFlushUnit struct {
|
2021-06-11 19:15:48 +08:00
|
|
|
collID UniqueID
|
|
|
|
segID UniqueID
|
|
|
|
field2Path map[UniqueID]string
|
|
|
|
checkPoint map[UniqueID]segmentCheckPoint
|
|
|
|
startPositions []*datapb.SegmentStartPosition
|
|
|
|
flushed bool
|
2021-03-25 14:41:46 +08:00
|
|
|
}
|
|
|
|
|
2021-09-23 20:15:54 +08:00
|
|
|
// BufferData buffers insert data, monitoring buffer size and limit
|
2021-09-26 20:55:59 +08:00
|
|
|
// size and limit both indicate numOfRows
|
2021-09-18 14:25:50 +08:00
|
|
|
type BufferData struct {
|
|
|
|
buffer *InsertData
|
|
|
|
size int64
|
2021-09-26 20:55:59 +08:00
|
|
|
limit int64
|
2021-09-18 14:25:50 +08:00
|
|
|
}
|
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
// newBufferData needs an input dimension to calculate the limit of this buffer
|
|
|
|
//
|
|
|
|
// `limit` is the segment numOfRows a buffer can buffer at most.
|
|
|
|
//
|
|
|
|
// For a float32 vector field:
|
|
|
|
// limit = 16 * 2^20 Byte [By default] / (dimension * 4 Byte)
|
|
|
|
//
|
|
|
|
// For a binary vector field:
|
|
|
|
// limit = 16 * 2^20 Byte [By default]/ (dimension / 8 Byte)
|
|
|
|
//
|
|
|
|
// But since the buffer of binary vector fields is larger than the float32 one
|
|
|
|
// with the same dimension, newBufferData takes the smaller buffer limit
|
|
|
|
// to fit in both types of vector fields
|
|
|
|
//
|
|
|
|
// * This need to change for string field support and multi-vector fields support.
|
2021-09-18 14:25:50 +08:00
|
|
|
func newBufferData(dimension int64) (*BufferData, error) {
|
|
|
|
if dimension == 0 {
|
|
|
|
return nil, errors.New("Invalid dimension")
|
|
|
|
}
|
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
limit := Params.FlushInsertBufferSize / (dimension * 4)
|
2021-09-18 14:25:50 +08:00
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
return &BufferData{&InsertData{Data: make(map[UniqueID]storage.FieldData)}, 0, limit}, nil
|
2021-03-25 14:41:46 +08:00
|
|
|
}
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
func (bd *BufferData) effectiveCap() int64 {
|
|
|
|
return bd.limit - bd.size
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
func (bd *BufferData) updateSize(no int64) {
|
|
|
|
bd.size += no
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ibNode *insertBufferNode) Name() string {
|
|
|
|
return "ibNode"
|
|
|
|
}
|
|
|
|
|
2021-09-17 16:27:56 +08:00
|
|
|
func (ibNode *insertBufferNode) Close() {
|
2021-11-05 14:59:32 +08:00
|
|
|
ibNode.ttMerger.close()
|
|
|
|
|
2021-09-17 16:27:56 +08:00
|
|
|
if ibNode.timeTickStream != nil {
|
|
|
|
ibNode.timeTickStream.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
if ibNode.segmentStatisticsStream != nil {
|
|
|
|
ibNode.segmentStatisticsStream.Close()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
2021-06-08 19:25:37 +08:00
|
|
|
// log.Debug("InsertBufferNode Operating")
|
|
|
|
|
2021-01-19 11:37:16 +08:00
|
|
|
if len(in) != 1 {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
|
2021-09-17 16:27:56 +08:00
|
|
|
return []Msg{}
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-09-26 10:43:57 +08:00
|
|
|
fgMsg, ok := in[0].(*flowGraphMsg)
|
2021-01-19 11:37:16 +08:00
|
|
|
if !ok {
|
2021-11-01 11:01:49 +08:00
|
|
|
log.Warn("type assertion failed for flowGraphMsg")
|
2021-09-17 16:27:56 +08:00
|
|
|
ibNode.Close()
|
2021-03-25 14:41:46 +08:00
|
|
|
return []Msg{}
|
|
|
|
}
|
|
|
|
|
|
|
|
var spans []opentracing.Span
|
2021-09-26 10:43:57 +08:00
|
|
|
for _, msg := range fgMsg.insertMessages {
|
2021-03-25 14:41:46 +08:00
|
|
|
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
|
|
|
|
spans = append(spans, sp)
|
|
|
|
msg.SetTraceCtx(ctx)
|
2021-03-23 01:49:50 +08:00
|
|
|
}
|
|
|
|
|
2021-06-18 16:02:05 +08:00
|
|
|
// replace pchannel with vchannel
|
2021-09-26 10:43:57 +08:00
|
|
|
startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions))
|
|
|
|
for idx := range fgMsg.startPositions {
|
|
|
|
pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition)
|
2021-06-18 16:02:05 +08:00
|
|
|
pos.ChannelName = ibNode.channelName
|
2021-09-11 11:36:22 +08:00
|
|
|
startPositions = append(startPositions, pos)
|
2021-06-18 16:02:05 +08:00
|
|
|
}
|
2021-09-26 10:43:57 +08:00
|
|
|
endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions))
|
|
|
|
for idx := range fgMsg.endPositions {
|
|
|
|
pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition)
|
2021-06-18 16:02:05 +08:00
|
|
|
pos.ChannelName = ibNode.channelName
|
2021-09-11 11:36:22 +08:00
|
|
|
endPositions = append(endPositions, pos)
|
2021-06-18 16:02:05 +08:00
|
|
|
}
|
|
|
|
|
2021-09-17 16:27:56 +08:00
|
|
|
// Updating segment statistics in replica
|
2021-09-26 10:43:57 +08:00
|
|
|
seg2Upload, err := ibNode.updateSegStatesInReplica(fgMsg.insertMessages, startPositions[0], endPositions[0])
|
2021-09-17 16:27:56 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Warn("update segment states in Replica wrong", zap.Error(err))
|
|
|
|
return []Msg{}
|
2021-01-21 09:55:25 +08:00
|
|
|
}
|
2021-02-04 11:19:48 +08:00
|
|
|
|
2021-09-17 16:27:56 +08:00
|
|
|
if len(seg2Upload) > 0 {
|
|
|
|
err := ibNode.uploadMemStates2Coord(seg2Upload)
|
2021-03-16 17:55:42 +08:00
|
|
|
if err != nil {
|
2021-09-17 16:27:56 +08:00
|
|
|
log.Error("upload segment statistics to coord error", zap.Error(err))
|
2021-02-04 11:19:48 +08:00
|
|
|
}
|
2021-01-21 09:55:25 +08:00
|
|
|
}
|
|
|
|
|
2021-09-17 16:27:56 +08:00
|
|
|
// insert messages -> buffer
|
2021-09-26 10:43:57 +08:00
|
|
|
for _, msg := range fgMsg.insertMessages {
|
2021-09-18 14:25:50 +08:00
|
|
|
err := ibNode.bufferInsertMsg(msg, endPositions[0])
|
2021-01-19 11:37:16 +08:00
|
|
|
if err != nil {
|
2021-09-09 15:00:00 +08:00
|
|
|
log.Warn("msg to buffer failed", zap.Error(err))
|
2021-06-05 16:21:36 +08:00
|
|
|
}
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
// Find and return the smaller input
|
|
|
|
min := func(former, latter int) (smaller int) {
|
|
|
|
if former <= latter {
|
|
|
|
return former
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
2021-09-26 20:55:59 +08:00
|
|
|
return latter
|
|
|
|
}
|
|
|
|
|
|
|
|
displaySize := min(10, len(seg2Upload))
|
|
|
|
|
2021-10-06 22:20:05 +08:00
|
|
|
// Log the segment statistics in mem
|
2021-09-26 20:55:59 +08:00
|
|
|
for k, segID := range seg2Upload[:displaySize] {
|
|
|
|
bd, ok := ibNode.insertBuffer.Load(segID)
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug("insert seg buffer status", zap.Int("No.", k),
|
|
|
|
zap.Int64("segmentID", segID),
|
|
|
|
zap.Int64("buffer size", bd.(*BufferData).size),
|
|
|
|
zap.Int64("buffer limit", bd.(*BufferData).limit))
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-10-18 12:34:34 +08:00
|
|
|
segmentsToFlush := make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush
|
|
|
|
|
2021-10-25 18:03:42 +08:00
|
|
|
type flushTask struct {
|
|
|
|
buffer *BufferData
|
|
|
|
segmentID UniqueID
|
|
|
|
flushed bool
|
|
|
|
}
|
|
|
|
flushTaskList := make([]flushTask, 0, len(seg2Upload)+1)
|
|
|
|
|
2021-09-17 16:27:56 +08:00
|
|
|
// Auto Flush
|
|
|
|
for _, segToFlush := range seg2Upload {
|
2021-05-25 15:35:37 +08:00
|
|
|
// If full, auto flush
|
2021-09-26 20:55:59 +08:00
|
|
|
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
|
2021-10-19 11:04:34 +08:00
|
|
|
log.Warn("Auto flush", zap.Int64("segment id", segToFlush))
|
2021-09-26 20:55:59 +08:00
|
|
|
ibuffer := bd.(*BufferData)
|
2021-10-25 18:03:42 +08:00
|
|
|
|
|
|
|
flushTaskList = append(flushTaskList, flushTask{
|
|
|
|
buffer: ibuffer,
|
|
|
|
segmentID: segToFlush,
|
|
|
|
flushed: false,
|
|
|
|
})
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-30 17:42:11 +08:00
|
|
|
// Manual Flush
|
2021-06-02 15:58:33 +08:00
|
|
|
select {
|
|
|
|
case fmsg := <-ibNode.flushChan:
|
2021-06-08 19:25:37 +08:00
|
|
|
log.Debug(". Receiving flush message",
|
2021-10-25 18:03:42 +08:00
|
|
|
zap.Int64("segmentID", fmsg.segmentID),
|
2021-06-08 19:25:37 +08:00
|
|
|
zap.Int64("collectionID", fmsg.collectionID),
|
|
|
|
)
|
2021-10-25 18:03:42 +08:00
|
|
|
// merging auto&manual flush segment same segment id
|
|
|
|
dup := false
|
|
|
|
for i, task := range flushTaskList {
|
|
|
|
if task.segmentID == fmsg.segmentID {
|
|
|
|
flushTaskList[i].flushed = fmsg.flushed
|
|
|
|
dup = true
|
|
|
|
break
|
|
|
|
}
|
2021-10-19 19:06:37 +08:00
|
|
|
}
|
2021-10-25 18:03:42 +08:00
|
|
|
// if merged, skip load buffer and create task
|
|
|
|
if !dup {
|
|
|
|
currentSegID := fmsg.segmentID
|
|
|
|
bd, ok := ibNode.insertBuffer.Load(currentSegID)
|
|
|
|
var buf *BufferData
|
|
|
|
if ok {
|
|
|
|
buf = bd.(*BufferData)
|
|
|
|
}
|
|
|
|
flushTaskList = append(flushTaskList, flushTask{
|
|
|
|
buffer: buf,
|
|
|
|
segmentID: currentSegID,
|
|
|
|
flushed: fmsg.flushed,
|
|
|
|
})
|
2021-10-19 11:04:34 +08:00
|
|
|
}
|
2021-10-25 18:03:42 +08:00
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, task := range flushTaskList {
|
|
|
|
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, endPositions[0])
|
2021-10-19 11:04:34 +08:00
|
|
|
if err != nil {
|
2021-10-25 18:03:42 +08:00
|
|
|
log.Warn("failed to invoke flushBufferData", zap.Error(err))
|
2021-10-19 11:04:34 +08:00
|
|
|
} else {
|
2021-10-25 18:03:42 +08:00
|
|
|
segmentsToFlush = append(segmentsToFlush, task.segmentID)
|
|
|
|
if task.flushed {
|
|
|
|
ibNode.replica.segmentFlushed(task.segmentID)
|
2021-10-20 15:02:36 +08:00
|
|
|
}
|
2021-10-25 18:03:42 +08:00
|
|
|
ibNode.insertBuffer.Delete(task.segmentID)
|
2021-05-18 19:45:00 +08:00
|
|
|
}
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-09-26 10:43:57 +08:00
|
|
|
if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax); err != nil {
|
2021-06-04 16:31:34 +08:00
|
|
|
log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
|
|
|
|
}
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-10-11 16:31:44 +08:00
|
|
|
res := flowGraphMsg{
|
2021-10-18 12:34:34 +08:00
|
|
|
deleteMessages: fgMsg.deleteMessages,
|
|
|
|
timeRange: fgMsg.timeRange,
|
|
|
|
startPositions: fgMsg.startPositions,
|
|
|
|
endPositions: fgMsg.endPositions,
|
|
|
|
segmentsToFlush: segmentsToFlush,
|
2021-10-11 16:31:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-25 14:41:46 +08:00
|
|
|
for _, sp := range spans {
|
|
|
|
sp.Finish()
|
|
|
|
}
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-10-11 16:31:44 +08:00
|
|
|
// send delete msg to DeleteNode
|
|
|
|
return []Msg{&res}
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-10-06 22:20:05 +08:00
|
|
|
// updateSegStatesInReplica updates statistics in replica for the segments in insertMsgs.
|
|
|
|
// If the segment doesn't exist, a new segment will be created.
|
|
|
|
// The segment number of rows will be updated in mem, waiting to be uploaded to DataCoord.
|
2021-09-17 16:27:56 +08:00
|
|
|
func (ibNode *insertBufferNode) updateSegStatesInReplica(insertMsgs []*msgstream.InsertMsg, startPos, endPos *internalpb.MsgPosition) (seg2Upload []UniqueID, err error) {
|
|
|
|
uniqueSeg := make(map[UniqueID]int64)
|
|
|
|
for _, msg := range insertMsgs {
|
|
|
|
|
|
|
|
currentSegID := msg.GetSegmentID()
|
|
|
|
collID := msg.GetCollectionID()
|
|
|
|
partitionID := msg.GetPartitionID()
|
|
|
|
|
|
|
|
if !ibNode.replica.hasSegment(currentSegID, true) {
|
2021-09-27 10:01:59 +08:00
|
|
|
err = ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetShardName(),
|
2021-09-17 16:27:56 +08:00
|
|
|
startPos, endPos)
|
|
|
|
if err != nil {
|
2021-09-18 09:13:50 +08:00
|
|
|
log.Error("add segment wrong",
|
|
|
|
zap.Int64("segID", currentSegID),
|
|
|
|
zap.Int64("collID", collID),
|
|
|
|
zap.Int64("partID", partitionID),
|
2021-09-27 10:01:59 +08:00
|
|
|
zap.String("chanName", msg.GetShardName()),
|
2021-09-18 09:13:50 +08:00
|
|
|
zap.Error(err))
|
2021-09-17 16:27:56 +08:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
segNum := uniqueSeg[currentSegID]
|
|
|
|
uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
|
|
|
|
}
|
|
|
|
|
|
|
|
seg2Upload = make([]UniqueID, 0, len(uniqueSeg))
|
|
|
|
for id, num := range uniqueSeg {
|
|
|
|
seg2Upload = append(seg2Upload, id)
|
|
|
|
ibNode.replica.updateStatistics(id, num)
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-09-18 14:45:50 +08:00
|
|
|
/* #nosec G103 */
|
2021-09-09 15:00:00 +08:00
|
|
|
// bufferInsertMsg put InsertMsg into buffer
|
|
|
|
// 1.1 fetch related schema from replica
|
|
|
|
// 1.2 Get buffer data and put data into each field buffer
|
|
|
|
// 1.3 Put back into buffer
|
|
|
|
// 1.4 Update related statistics
|
2021-09-18 14:25:50 +08:00
|
|
|
func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos *internalpb.MsgPosition) error {
|
2021-09-09 15:00:00 +08:00
|
|
|
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
|
|
|
|
return errors.New("misaligned messages detected")
|
|
|
|
}
|
|
|
|
currentSegID := msg.GetSegmentID()
|
|
|
|
collectionID := msg.GetCollectionID()
|
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
collSchema, err := ibNode.replica.getCollectionSchema(collectionID, msg.EndTs())
|
|
|
|
if err != nil {
|
|
|
|
log.Error("Get schema wrong:", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get Dimension
|
|
|
|
// TODO GOOSE: under assumption that there's only 1 Vector field in one collection schema
|
|
|
|
var dimension int
|
|
|
|
for _, field := range collSchema.Fields {
|
|
|
|
if field.DataType == schemapb.DataType_FloatVector ||
|
|
|
|
field.DataType == schemapb.DataType_BinaryVector {
|
|
|
|
|
|
|
|
for _, t := range field.TypeParams {
|
|
|
|
if t.Key == "dim" {
|
|
|
|
dimension, err = strconv.Atoi(t.Value)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("strconv wrong on get dim", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break
|
2021-09-09 15:00:00 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
newbd, err := newBufferData(int64(dimension))
|
2021-09-09 15:00:00 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-09-26 20:55:59 +08:00
|
|
|
bd, _ := ibNode.insertBuffer.LoadOrStore(currentSegID, newbd)
|
|
|
|
|
|
|
|
buffer := bd.(*BufferData)
|
|
|
|
idata := buffer.buffer
|
2021-09-09 15:00:00 +08:00
|
|
|
|
|
|
|
// 1.2 Get Fields
|
|
|
|
var fieldIDs []int64
|
|
|
|
var fieldTypes []schemapb.DataType
|
|
|
|
for _, field := range collSchema.Fields {
|
|
|
|
fieldIDs = append(fieldIDs, field.FieldID)
|
|
|
|
fieldTypes = append(fieldTypes, field.DataType)
|
|
|
|
}
|
|
|
|
|
2021-10-13 21:32:33 +08:00
|
|
|
blobReaders := make([]io.Reader, 0)
|
|
|
|
for _, blob := range msg.RowData {
|
|
|
|
blobReaders = append(blobReaders, bytes.NewReader(blob.GetValue()))
|
|
|
|
}
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
for _, field := range collSchema.Fields {
|
|
|
|
switch field.DataType {
|
|
|
|
case schemapb.DataType_FloatVector:
|
|
|
|
var dim int
|
|
|
|
for _, t := range field.TypeParams {
|
|
|
|
if t.Key == "dim" {
|
|
|
|
dim, err = strconv.Atoi(t.Value)
|
|
|
|
if err != nil {
|
|
|
|
log.Error("strconv wrong on get dim", zap.Error(err))
|
2021-09-26 20:55:59 +08:00
|
|
|
break
|
2021-09-09 15:00:00 +08:00
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]float32, 0),
|
|
|
|
Dim: dim,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
|
2021-10-13 21:32:33 +08:00
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v []float32 = make([]float32, dim)
|
2021-09-09 15:00:00 +08:00
|
|
|
|
2021-10-13 21:32:33 +08:00
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
|
|
|
fieldData.Data = append(fieldData.Data, v...)
|
2021-09-09 15:00:00 +08:00
|
|
|
}
|
2021-10-13 21:32:33 +08:00
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
|
|
|
|
case schemapb.DataType_BinaryVector:
|
|
|
|
var dim int
|
|
|
|
for _, t := range field.TypeParams {
|
|
|
|
if t.Key == "dim" {
|
|
|
|
dim, err = strconv.Atoi(t.Value)
|
|
|
|
if err != nil {
|
2021-09-26 20:55:59 +08:00
|
|
|
log.Error("strconv wrong on get dim", zap.Error(err))
|
|
|
|
return err
|
2021-09-09 15:00:00 +08:00
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]byte, 0),
|
|
|
|
Dim: dim,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
|
|
|
|
|
2021-10-13 21:32:33 +08:00
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v []byte = make([]byte, dim/8)
|
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
|
|
|
fieldData.Data = append(fieldData.Data, v...)
|
2021-09-09 15:00:00 +08:00
|
|
|
}
|
2021-10-13 21:32:33 +08:00
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
|
|
|
|
case schemapb.DataType_Bool:
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.BoolFieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]bool, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
|
2021-10-13 21:32:33 +08:00
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v bool
|
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
2021-10-13 21:32:33 +08:00
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
|
|
|
|
case schemapb.DataType_Int8:
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.Int8FieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]int8, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
|
2021-10-13 21:32:33 +08:00
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v int8
|
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
2021-10-13 21:32:33 +08:00
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
|
|
|
|
case schemapb.DataType_Int16:
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.Int16FieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]int16, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
|
2021-10-13 21:32:33 +08:00
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v int16
|
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
2021-10-13 21:32:33 +08:00
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
|
|
|
|
case schemapb.DataType_Int32:
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.Int32FieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]int32, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
|
2021-10-13 21:32:33 +08:00
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v int32
|
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
2021-10-13 21:32:33 +08:00
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
|
|
|
|
case schemapb.DataType_Int64:
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.Int64FieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]int64, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
|
|
|
|
switch field.FieldID {
|
|
|
|
case 0: // rowIDs
|
|
|
|
fieldData.Data = append(fieldData.Data, msg.RowIDs...)
|
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
case 1: // Timestamps
|
|
|
|
for _, ts := range msg.Timestamps {
|
|
|
|
fieldData.Data = append(fieldData.Data, int64(ts))
|
|
|
|
}
|
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
default:
|
2021-10-13 21:32:33 +08:00
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v int64
|
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
2021-10-13 21:32:33 +08:00
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
}
|
2021-10-19 20:18:47 +08:00
|
|
|
if field.IsPrimaryKey {
|
|
|
|
// update segment pk filter
|
|
|
|
ibNode.replica.updateSegmentPKRange(currentSegID, fieldData.Data)
|
|
|
|
}
|
2021-09-09 15:00:00 +08:00
|
|
|
|
|
|
|
case schemapb.DataType_Float:
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.FloatFieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]float32, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
|
2021-10-13 21:32:33 +08:00
|
|
|
|
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v float32
|
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
|
|
|
|
case schemapb.DataType_Double:
|
|
|
|
if _, ok := idata.Data[field.FieldID]; !ok {
|
|
|
|
idata.Data[field.FieldID] = &storage.DoubleFieldData{
|
|
|
|
NumRows: make([]int64, 0, 1),
|
|
|
|
Data: make([]float64, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
|
2021-10-13 21:32:33 +08:00
|
|
|
|
|
|
|
for _, r := range blobReaders {
|
|
|
|
var v float64
|
|
|
|
readBinary(r, &v, field.DataType)
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
fieldData.Data = append(fieldData.Data, v)
|
|
|
|
}
|
|
|
|
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-26 20:55:59 +08:00
|
|
|
// update buffer size
|
|
|
|
buffer.updateSize(int64(len(msg.RowData)))
|
|
|
|
|
|
|
|
// store in buffer
|
|
|
|
ibNode.insertBuffer.Store(currentSegID, buffer)
|
2021-09-09 15:00:00 +08:00
|
|
|
|
|
|
|
// store current endPositions as Segment->EndPostion
|
2021-09-18 14:25:50 +08:00
|
|
|
ibNode.replica.updateSegmentEndPosition(currentSegID, endPos)
|
|
|
|
|
2021-09-09 15:00:00 +08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-10-06 22:28:11 +08:00
|
|
|
// readBinary read data in bytes and write it into receiver.
|
|
|
|
// The receiver can be any type in int8, int16, int32, int64, float32, float64 and bool
|
|
|
|
// readBinary uses LittleEndian ByteOrder.
|
2021-10-13 21:32:33 +08:00
|
|
|
func readBinary(reader io.Reader, receiver interface{}, dataType schemapb.DataType) {
|
2021-11-02 18:16:32 +08:00
|
|
|
err := binary.Read(reader, common.Endian, receiver)
|
2021-09-09 15:00:00 +08:00
|
|
|
if err != nil {
|
|
|
|
log.Error("binary.Read failed", zap.Any("data type", dataType), zap.Error(err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-06 23:30:21 +08:00
|
|
|
// writeHardTimeTick writes timetick once insertBufferNode operates.
|
2021-01-19 11:37:16 +08:00
|
|
|
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
|
2021-11-04 15:40:14 +08:00
|
|
|
ibNode.ttLogger.LogTs(ts)
|
2021-11-05 14:59:32 +08:00
|
|
|
ibNode.ttMerger.bufferTs(ts)
|
|
|
|
return nil
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|
|
|
|
|
2021-10-06 23:30:21 +08:00
|
|
|
// uploadMemStates2Coord uploads latest changed segments statistics in DataNode memory to DataCoord
|
|
|
|
// through a msgStream channel.
|
|
|
|
//
|
|
|
|
// Currently, the statistics includes segment ID and its total number of rows in memory.
|
2021-09-17 16:27:56 +08:00
|
|
|
func (ibNode *insertBufferNode) uploadMemStates2Coord(segIDs []UniqueID) error {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Debug("Updating segments statistics...")
|
2021-03-12 14:22:09 +08:00
|
|
|
statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
|
2021-01-21 09:55:25 +08:00
|
|
|
for _, segID := range segIDs {
|
|
|
|
updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID)
|
|
|
|
if err != nil {
|
2021-02-26 10:13:36 +08:00
|
|
|
log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err))
|
2021-01-21 09:55:25 +08:00
|
|
|
continue
|
|
|
|
}
|
2021-06-09 17:31:48 +08:00
|
|
|
|
|
|
|
log.Debug("Segment Statistics to Update",
|
|
|
|
zap.Int64("Segment ID", updates.GetSegmentID()),
|
|
|
|
zap.Int64("NumOfRows", updates.GetNumRows()),
|
|
|
|
)
|
|
|
|
|
2021-01-21 09:55:25 +08:00
|
|
|
statsUpdates = append(statsUpdates, updates)
|
|
|
|
}
|
|
|
|
|
2021-03-12 14:22:09 +08:00
|
|
|
segStats := internalpb.SegmentStatistics{
|
2021-01-21 09:55:25 +08:00
|
|
|
Base: &commonpb.MsgBase{
|
2021-03-10 14:45:35 +08:00
|
|
|
MsgType: commonpb.MsgType_SegmentStatistics,
|
2021-01-21 09:55:25 +08:00
|
|
|
MsgID: UniqueID(0), // GOOSE TODO
|
|
|
|
Timestamp: Timestamp(0), // GOOSE TODO
|
2021-01-24 21:20:11 +08:00
|
|
|
SourceID: Params.NodeID,
|
2021-01-21 09:55:25 +08:00
|
|
|
},
|
|
|
|
SegStats: statsUpdates,
|
|
|
|
}
|
|
|
|
|
|
|
|
var msg msgstream.TsMsg = &msgstream.SegmentStatisticsMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
2021-02-04 11:19:48 +08:00
|
|
|
HashValues: []uint32{0}, // GOOSE TODO
|
2021-01-21 09:55:25 +08:00
|
|
|
},
|
|
|
|
SegmentStatistics: segStats,
|
|
|
|
}
|
|
|
|
|
|
|
|
var msgPack = msgstream.MsgPack{
|
|
|
|
Msgs: []msgstream.TsMsg{msg},
|
|
|
|
}
|
2021-03-25 14:41:46 +08:00
|
|
|
return ibNode.segmentStatisticsStream.Produce(&msgPack)
|
2021-01-21 09:55:25 +08:00
|
|
|
}
|
|
|
|
|
2021-06-08 19:25:37 +08:00
|
|
|
func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timestamp) (meta *etcdpb.CollectionMeta, err error) {
|
2021-08-11 14:24:09 +08:00
|
|
|
if !ibNode.replica.hasSegment(segmentID, true) {
|
2021-06-16 19:03:57 +08:00
|
|
|
return nil, fmt.Errorf("No such segment %d in the replica", segmentID)
|
2021-05-18 19:45:00 +08:00
|
|
|
}
|
|
|
|
|
2021-06-16 19:03:57 +08:00
|
|
|
collID := ibNode.replica.getCollectionID()
|
|
|
|
sch, err := ibNode.replica.getCollectionSchema(collID, ts)
|
2021-05-18 19:45:00 +08:00
|
|
|
if err != nil {
|
2021-08-30 10:03:58 +08:00
|
|
|
return nil, err
|
2021-05-18 19:45:00 +08:00
|
|
|
}
|
2021-06-08 19:25:37 +08:00
|
|
|
|
|
|
|
meta = &etcdpb.CollectionMeta{
|
2021-06-16 19:03:57 +08:00
|
|
|
ID: collID,
|
|
|
|
Schema: sch,
|
2021-06-08 19:25:37 +08:00
|
|
|
}
|
2021-05-18 19:45:00 +08:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
|
2021-06-21 16:00:22 +08:00
|
|
|
return ibNode.replica.getCollectionAndPartitionID(segmentID)
|
2021-05-18 19:45:00 +08:00
|
|
|
}
|
|
|
|
|
2021-10-19 11:04:34 +08:00
|
|
|
func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushManager,
|
2021-10-13 11:16:32 +08:00
|
|
|
flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) {
|
2021-01-19 11:37:16 +08:00
|
|
|
|
|
|
|
baseNode := BaseNode{}
|
2021-10-13 11:16:32 +08:00
|
|
|
baseNode.SetMaxQueueLength(config.maxQueueLength)
|
|
|
|
baseNode.SetMaxParallelism(config.maxParallelism)
|
2021-01-19 11:37:16 +08:00
|
|
|
|
2021-01-21 09:55:25 +08:00
|
|
|
//input stream, data node time tick
|
2021-10-13 11:16:32 +08:00
|
|
|
wTt, err := config.msFactory.NewMsgStream(ctx)
|
2021-08-30 10:03:58 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-02-04 14:37:12 +08:00
|
|
|
wTt.AsProducer([]string{Params.TimeTickChannelName})
|
2021-08-30 10:03:58 +08:00
|
|
|
log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.TimeTickChannelName))
|
2021-01-21 09:55:25 +08:00
|
|
|
var wTtMsgStream msgstream.MsgStream = wTt
|
2021-01-24 21:20:11 +08:00
|
|
|
wTtMsgStream.Start()
|
2021-01-21 09:55:25 +08:00
|
|
|
|
|
|
|
// update statistics channel
|
2021-10-13 11:16:32 +08:00
|
|
|
segS, err := config.msFactory.NewMsgStream(ctx)
|
2021-08-30 10:03:58 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-02-04 14:37:12 +08:00
|
|
|
segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
|
2021-08-30 10:03:58 +08:00
|
|
|
log.Debug("datanode AsProducer", zap.String("SegmentStatisChannelName", Params.SegmentStatisticsChannelName))
|
2021-01-21 09:55:25 +08:00
|
|
|
var segStatisticsMsgStream msgstream.MsgStream = segS
|
2021-01-24 21:20:11 +08:00
|
|
|
segStatisticsMsgStream.Start()
|
2021-01-22 19:36:09 +08:00
|
|
|
|
2021-11-05 14:59:32 +08:00
|
|
|
mt := newMergedTimeTickerSender(func(ts Timestamp) error {
|
|
|
|
msgPack := msgstream.MsgPack{}
|
|
|
|
timeTickMsg := msgstream.DataNodeTtMsg{
|
|
|
|
BaseMsg: msgstream.BaseMsg{
|
|
|
|
BeginTimestamp: ts,
|
|
|
|
EndTimestamp: ts,
|
|
|
|
HashValues: []uint32{0},
|
|
|
|
},
|
|
|
|
DataNodeTtMsg: datapb.DataNodeTtMsg{
|
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_DataNodeTt,
|
|
|
|
MsgID: 0,
|
|
|
|
Timestamp: ts,
|
|
|
|
},
|
|
|
|
ChannelName: config.vChannelName,
|
|
|
|
Timestamp: ts,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
|
|
|
return wTtMsgStream.Produce(&msgPack)
|
|
|
|
})
|
|
|
|
|
2021-01-19 11:37:16 +08:00
|
|
|
return &insertBufferNode{
|
2021-06-02 15:58:33 +08:00
|
|
|
BaseNode: baseNode,
|
2021-09-26 20:55:59 +08:00
|
|
|
insertBuffer: sync.Map{},
|
2021-06-02 15:58:33 +08:00
|
|
|
|
2021-01-22 09:36:40 +08:00
|
|
|
timeTickStream: wTtMsgStream,
|
|
|
|
segmentStatisticsStream: segStatisticsMsgStream,
|
2021-06-02 15:58:33 +08:00
|
|
|
|
2021-09-23 16:03:54 +08:00
|
|
|
flushMap: sync.Map{},
|
|
|
|
flushChan: flushCh,
|
|
|
|
flushingSegCache: flushingSegCache,
|
2021-10-19 11:04:34 +08:00
|
|
|
flushManager: fm,
|
2021-10-13 11:16:32 +08:00
|
|
|
|
|
|
|
replica: config.replica,
|
|
|
|
idAllocator: config.allocator,
|
|
|
|
channelName: config.vChannelName,
|
2021-11-05 14:59:32 +08:00
|
|
|
ttMerger: mt,
|
2021-08-30 10:03:58 +08:00
|
|
|
}, nil
|
2021-01-19 11:37:16 +08:00
|
|
|
}
|