mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 03:18:29 +08:00
parent
7064937dc3
commit
74c73fa75f
@ -87,18 +87,18 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
saveBinlog := func(fu *autoFlushUnit) error {
|
||||
saveBinlog := func(fu *segmentFlushUnit) error {
|
||||
id2path := []*datapb.ID2PathList{}
|
||||
checkPoints := []*datapb.CheckPoint{}
|
||||
for k, v := range fu.field2Path {
|
||||
id2path = append(id2path, &datapb.ID2PathList{ID: k, Paths: []string{v}})
|
||||
}
|
||||
for k, v := range fu.openSegCheckpoints {
|
||||
for k, v := range fu.checkPoint {
|
||||
v := v
|
||||
checkPoints = append(checkPoints, &datapb.CheckPoint{
|
||||
SegmentID: k,
|
||||
NumOfRows: fu.numRows[k],
|
||||
Position: &v,
|
||||
NumOfRows: v.numRows,
|
||||
Position: &v.pos,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -60,18 +60,22 @@ type insertBufferNode struct {
|
||||
timeTickStream msgstream.MsgStream
|
||||
segmentStatisticsStream msgstream.MsgStream
|
||||
|
||||
dsSaveBinlog func(fu *autoFlushUnit) error
|
||||
openSegList map[UniqueID]bool
|
||||
openSegLock sync.Mutex
|
||||
dsSaveBinlog func(fu *segmentFlushUnit) error
|
||||
segmentCheckPoints map[UniqueID]segmentCheckPoint
|
||||
segmentCheckPointLock sync.Mutex
|
||||
}
|
||||
|
||||
type autoFlushUnit struct {
|
||||
collID UniqueID
|
||||
segID UniqueID
|
||||
field2Path map[UniqueID]string
|
||||
openSegCheckpoints map[UniqueID]internalpb.MsgPosition
|
||||
numRows map[UniqueID]int64
|
||||
flushed bool
|
||||
type segmentCheckPoint struct {
|
||||
numRows int64
|
||||
pos internalpb.MsgPosition
|
||||
}
|
||||
|
||||
type segmentFlushUnit struct {
|
||||
collID UniqueID
|
||||
segID UniqueID
|
||||
field2Path map[UniqueID]string
|
||||
checkPoint map[UniqueID]segmentCheckPoint
|
||||
flushed bool
|
||||
}
|
||||
|
||||
type insertBuffer struct {
|
||||
@ -155,6 +159,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
// this position is the start position of current segment, not start position of current MsgPack
|
||||
// so setStartPositions will only call once when meet new segment
|
||||
ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions)
|
||||
ibNode.setSegmentCheckPoint(currentSegID, segmentCheckPoint{0, *iMsg.startPositions[0]})
|
||||
}
|
||||
|
||||
segNum := uniqueSeg[currentSegID]
|
||||
@ -471,7 +476,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
}
|
||||
}
|
||||
|
||||
finishCh := make(chan autoFlushUnit, len(segToUpdate))
|
||||
finishCh := make(chan segmentFlushUnit, len(segToUpdate))
|
||||
finishCnt := sync.WaitGroup{}
|
||||
for _, segToFlush := range segToUpdate {
|
||||
// If full, auto flush
|
||||
@ -506,7 +511,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
log.Debug("segment is empty")
|
||||
continue
|
||||
}
|
||||
fu.openSegCheckpoints, fu.numRows = ibNode.listOpenSegmentCheckPointAndNumRows()
|
||||
fu.checkPoint = ibNode.listSegmentCheckPoints()
|
||||
fu.flushed = false
|
||||
if err := ibNode.dsSaveBinlog(&fu); err != nil {
|
||||
log.Debug("data service save bin log path failed", zap.Error(err))
|
||||
@ -522,25 +527,23 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
|
||||
if ibNode.insertBuffer.size(currentSegID) <= 0 {
|
||||
log.Debug(".. Buffer empty ...")
|
||||
c, n := ibNode.listOpenSegmentCheckPointAndNumRows()
|
||||
ibNode.dsSaveBinlog(&autoFlushUnit{
|
||||
collID: fmsg.collectionID,
|
||||
segID: currentSegID,
|
||||
numRows: n,
|
||||
field2Path: map[UniqueID]string{},
|
||||
openSegCheckpoints: c,
|
||||
flushed: true,
|
||||
ibNode.dsSaveBinlog(&segmentFlushUnit{
|
||||
collID: fmsg.collectionID,
|
||||
segID: currentSegID,
|
||||
field2Path: map[UniqueID]string{},
|
||||
checkPoint: ibNode.listSegmentCheckPoints(),
|
||||
flushed: true,
|
||||
})
|
||||
ibNode.removeSegmentCheckPoint(fmsg.segmentID)
|
||||
fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
|
||||
} else {
|
||||
log.Debug(".. Buffer not empty, flushing ..")
|
||||
finishCh := make(chan autoFlushUnit, 1)
|
||||
finishCh := make(chan segmentFlushUnit, 1)
|
||||
|
||||
ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
|
||||
delete(ibNode.insertBuffer.insertData, currentSegID)
|
||||
clearFn := func() {
|
||||
finishCh <- autoFlushUnit{field2Path: nil}
|
||||
finishCh <- segmentFlushUnit{field2Path: nil}
|
||||
log.Debug(".. Clearing flush Buffer ..")
|
||||
ibNode.flushMap.Delete(currentSegID)
|
||||
close(finishCh)
|
||||
@ -575,7 +578,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
fu := <-finishCh
|
||||
close(finishCh)
|
||||
if fu.field2Path != nil {
|
||||
fu.openSegCheckpoints, fu.numRows = ibNode.listOpenSegmentCheckPointAndNumRows()
|
||||
fu.checkPoint = ibNode.listSegmentCheckPoints()
|
||||
fu.flushed = true
|
||||
if ibNode.dsSaveBinlog(&fu) != nil {
|
||||
log.Debug("data service save bin log path failed", zap.Error(err))
|
||||
@ -603,7 +606,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
}
|
||||
|
||||
func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID,
|
||||
insertData *sync.Map, kv kv.BaseKV, flushUnit chan<- autoFlushUnit, wgFinish *sync.WaitGroup,
|
||||
insertData *sync.Map, kv kv.BaseKV, flushUnit chan<- segmentFlushUnit, wgFinish *sync.WaitGroup,
|
||||
ibNode *insertBufferNode, idAllocator allocatorInterface) {
|
||||
if wgFinish != nil {
|
||||
defer wgFinish.Done()
|
||||
@ -611,7 +614,7 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un
|
||||
|
||||
clearFn := func(isSuccess bool) {
|
||||
if !isSuccess {
|
||||
flushUnit <- autoFlushUnit{field2Path: nil}
|
||||
flushUnit <- segmentFlushUnit{field2Path: nil}
|
||||
}
|
||||
|
||||
log.Debug(".. Clearing flush Buffer ..")
|
||||
@ -695,29 +698,31 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un
|
||||
return
|
||||
}
|
||||
|
||||
ibNode.setSegmentCheckPoint(segID)
|
||||
flushUnit <- autoFlushUnit{collID: collID, segID: segID, field2Path: field2Path}
|
||||
_, ep := ibNode.replica.getSegmentPositions(segID)
|
||||
sta, _ := ibNode.replica.getSegmentStatisticsUpdates(segID)
|
||||
ibNode.setSegmentCheckPoint(segID, segmentCheckPoint{sta.NumRows, *ep[0]})
|
||||
flushUnit <- segmentFlushUnit{collID: collID, segID: segID, field2Path: field2Path}
|
||||
clearFn(true)
|
||||
}
|
||||
|
||||
func (ibNode *insertBufferNode) setSegmentCheckPoint(segID UniqueID) {
|
||||
ibNode.openSegLock.Lock()
|
||||
defer ibNode.openSegLock.Unlock()
|
||||
ibNode.openSegList[segID] = true
|
||||
func (ibNode *insertBufferNode) setSegmentCheckPoint(segID UniqueID, chk segmentCheckPoint) {
|
||||
ibNode.segmentCheckPointLock.Lock()
|
||||
defer ibNode.segmentCheckPointLock.Unlock()
|
||||
ibNode.segmentCheckPoints[segID] = chk
|
||||
}
|
||||
func (ibNode *insertBufferNode) removeSegmentCheckPoint(segID UniqueID) {
|
||||
ibNode.openSegLock.Lock()
|
||||
defer ibNode.openSegLock.Unlock()
|
||||
delete(ibNode.openSegList, segID)
|
||||
ibNode.segmentCheckPointLock.Lock()
|
||||
defer ibNode.segmentCheckPointLock.Unlock()
|
||||
delete(ibNode.segmentCheckPoints, segID)
|
||||
}
|
||||
func (ibNode *insertBufferNode) listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
|
||||
ibNode.openSegLock.Lock()
|
||||
defer ibNode.openSegLock.Unlock()
|
||||
segs := make([]UniqueID, 0, len(ibNode.openSegList))
|
||||
for k := range ibNode.openSegList {
|
||||
segs = append(segs, k)
|
||||
func (ibNode *insertBufferNode) listSegmentCheckPoints() map[UniqueID]segmentCheckPoint {
|
||||
ibNode.segmentCheckPointLock.Lock()
|
||||
defer ibNode.segmentCheckPointLock.Unlock()
|
||||
segs := make(map[UniqueID]segmentCheckPoint)
|
||||
for k, v := range ibNode.segmentCheckPoints {
|
||||
segs[k] = v
|
||||
}
|
||||
return ibNode.replica.listOpenSegmentCheckPointAndNumRows(segs)
|
||||
return segs
|
||||
}
|
||||
|
||||
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
|
||||
@ -816,7 +821,7 @@ func newInsertBufferNode(
|
||||
factory msgstream.Factory,
|
||||
idAllocator allocatorInterface,
|
||||
flushCh <-chan *flushMsg,
|
||||
saveBinlog func(*autoFlushUnit) error,
|
||||
saveBinlog func(*segmentFlushUnit) error,
|
||||
) *insertBufferNode {
|
||||
|
||||
maxQueueLength := Params.FlowGraphMaxQueueLength
|
||||
@ -869,11 +874,11 @@ func newInsertBufferNode(
|
||||
timeTickStream: wTtMsgStream,
|
||||
segmentStatisticsStream: segStatisticsMsgStream,
|
||||
|
||||
replica: replica,
|
||||
flushMap: sync.Map{},
|
||||
flushChan: flushCh,
|
||||
idAllocator: idAllocator,
|
||||
dsSaveBinlog: saveBinlog,
|
||||
openSegList: make(map[UniqueID]bool),
|
||||
replica: replica,
|
||||
flushMap: sync.Map{},
|
||||
flushChan: flushCh,
|
||||
idAllocator: idAllocator,
|
||||
dsSaveBinlog: saveBinlog,
|
||||
segmentCheckPoints: make(map[UniqueID]segmentCheckPoint),
|
||||
}
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
|
||||
err = msFactory.SetParams(m)
|
||||
assert.Nil(t, err)
|
||||
|
||||
saveBinlog := func(fu *autoFlushUnit) error {
|
||||
saveBinlog := func(fu *segmentFlushUnit) error {
|
||||
t.Log(fu)
|
||||
return nil
|
||||
}
|
||||
@ -135,9 +135,13 @@ func TestFlushSegment(t *testing.T) {
|
||||
collMeta := genCollectionMeta(collectionID, "test_flush_segment_txn")
|
||||
flushMap := sync.Map{}
|
||||
replica := newReplica()
|
||||
err := replica.addCollection(collMeta.ID, collMeta.Schema)
|
||||
require.NoError(t, err)
|
||||
err = replica.addSegment(segmentID, collMeta.ID, 0, Params.InsertChannelNames[0])
|
||||
require.NoError(t, err)
|
||||
replica.setEndPositions(segmentID, []*internalpb.MsgPosition{{ChannelName: "TestChannel"}})
|
||||
|
||||
finishCh := make(chan autoFlushUnit, 1)
|
||||
finishCh := make(chan segmentFlushUnit, 1)
|
||||
|
||||
insertData := &InsertData{
|
||||
Data: make(map[storage.FieldID]storage.FieldData),
|
||||
@ -161,10 +165,10 @@ func TestFlushSegment(t *testing.T) {
|
||||
"receiveBufSize": 1024,
|
||||
"pulsarAddress": Params.PulsarAddress,
|
||||
"pulsarBufSize": 1024}
|
||||
err := msFactory.SetParams(m)
|
||||
err = msFactory.SetParams(m)
|
||||
assert.Nil(t, err)
|
||||
flushChan := make(chan *flushMsg, 100)
|
||||
saveBinlog := func(*autoFlushUnit) error {
|
||||
saveBinlog := func(*segmentFlushUnit) error {
|
||||
return nil
|
||||
}
|
||||
ibNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog)
|
||||
@ -263,8 +267,6 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||
}
|
||||
err = colRep.addCollection(collMeta.ID, collMeta.Schema)
|
||||
require.NoError(t, err)
|
||||
err = colRep.addSegment(1, collMeta.ID, 0, Params.InsertChannelNames[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
msFactory := msgstream.NewPmsFactory()
|
||||
m := map[string]interface{}{
|
||||
@ -274,8 +276,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||
err = msFactory.SetParams(m)
|
||||
assert.Nil(t, err)
|
||||
|
||||
flushUnit := []autoFlushUnit{}
|
||||
saveBinlog := func(fu *autoFlushUnit) error {
|
||||
flushUnit := []segmentFlushUnit{}
|
||||
saveBinlog := func(fu *segmentFlushUnit) error {
|
||||
flushUnit = append(flushUnit, *fu)
|
||||
return nil
|
||||
}
|
||||
@ -289,6 +291,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||
for i := range inMsg.insertMessages {
|
||||
inMsg.insertMessages[i].SegmentID = int64(i%2) + 1
|
||||
}
|
||||
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}}
|
||||
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
|
||||
|
||||
var iMsg flowgraph.Msg = &inMsg
|
||||
@ -296,27 +299,44 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||
assert.Equal(t, len(colRep.endPositions), 2)
|
||||
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123))
|
||||
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(123))
|
||||
assert.Equal(t, len(iBNode.openSegList), 0)
|
||||
assert.Equal(t, len(iBNode.segmentCheckPoints), 2)
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(0))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(0))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(100))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(100))
|
||||
assert.Equal(t, len(iBNode.insertBuffer.insertData), 2)
|
||||
assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000))
|
||||
assert.Equal(t, iBNode.insertBuffer.size(2), int32(50+16000))
|
||||
assert.Equal(t, len(flushUnit), 0)
|
||||
|
||||
for i := range inMsg.insertMessages {
|
||||
inMsg.insertMessages[i].SegmentID = int64(i%2) + 2
|
||||
}
|
||||
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
|
||||
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
|
||||
iBNode.Operate([]flowgraph.Msg{iMsg})
|
||||
assert.Equal(t, len(colRep.endPositions), 3)
|
||||
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123))
|
||||
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true})
|
||||
assert.Equal(t, len(iBNode.segmentCheckPoints), 3)
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(0))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(100))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123))
|
||||
|
||||
assert.Equal(t, len(flushUnit), 1)
|
||||
assert.Equal(t, flushUnit[0].segID, int64(2))
|
||||
assert.Equal(t, len(flushUnit[0].numRows), 1)
|
||||
assert.Equal(t, flushUnit[0].numRows[2], int64(100+32000))
|
||||
assert.Equal(t, len(flushUnit[0].openSegCheckpoints), 1)
|
||||
assert.Equal(t, flushUnit[0].openSegCheckpoints[2].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, len(flushUnit[0].checkPoint), 3)
|
||||
assert.Equal(t, flushUnit[0].checkPoint[1].numRows, int64(0))
|
||||
assert.Equal(t, flushUnit[0].checkPoint[2].numRows, int64(100+32000))
|
||||
assert.Equal(t, flushUnit[0].checkPoint[3].numRows, int64(0))
|
||||
assert.Equal(t, flushUnit[0].checkPoint[1].pos.Timestamp, Timestamp(100))
|
||||
assert.Equal(t, flushUnit[0].checkPoint[2].pos.Timestamp, Timestamp(234))
|
||||
assert.Equal(t, flushUnit[0].checkPoint[3].pos.Timestamp, Timestamp(123))
|
||||
|
||||
assert.Greater(t, len(flushUnit[0].field2Path), 0)
|
||||
assert.False(t, flushUnit[0].flushed)
|
||||
assert.Equal(t, len(iBNode.insertBuffer.insertData), 2)
|
||||
@ -326,21 +346,30 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||
for i := range inMsg.insertMessages {
|
||||
inMsg.insertMessages[i].SegmentID = 1
|
||||
}
|
||||
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
|
||||
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
|
||||
iBNode.Operate([]flowgraph.Msg{iMsg})
|
||||
assert.Equal(t, len(colRep.endPositions), 3)
|
||||
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
|
||||
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{1: true, 2: true})
|
||||
assert.Equal(t, len(iBNode.segmentCheckPoints), 3)
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(50+16000+100+32000))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(345))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123))
|
||||
|
||||
assert.Equal(t, len(flushUnit), 2)
|
||||
assert.Equal(t, flushUnit[1].segID, int64(1))
|
||||
assert.Equal(t, len(flushUnit[1].numRows), 2)
|
||||
assert.Equal(t, flushUnit[1].numRows[2], int64(100+32000))
|
||||
assert.Equal(t, flushUnit[1].numRows[1], int64(50+16000+100+32000))
|
||||
assert.Equal(t, len(flushUnit[1].openSegCheckpoints), 2)
|
||||
assert.Equal(t, flushUnit[1].openSegCheckpoints[1].Timestamp, Timestamp(345))
|
||||
assert.Equal(t, flushUnit[1].openSegCheckpoints[2].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, len(flushUnit[1].checkPoint), 3)
|
||||
assert.Equal(t, flushUnit[1].checkPoint[1].numRows, int64(50+16000+100+32000))
|
||||
assert.Equal(t, flushUnit[1].checkPoint[2].numRows, int64(100+32000))
|
||||
assert.Equal(t, flushUnit[1].checkPoint[3].numRows, int64(0))
|
||||
assert.Equal(t, flushUnit[1].checkPoint[1].pos.Timestamp, Timestamp(345))
|
||||
assert.Equal(t, flushUnit[1].checkPoint[2].pos.Timestamp, Timestamp(234))
|
||||
assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123))
|
||||
assert.False(t, flushUnit[1].flushed)
|
||||
assert.Greater(t, len(flushUnit[1].field2Path), 0)
|
||||
assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
|
||||
@ -357,6 +386,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||
}
|
||||
|
||||
inMsg.insertMessages = []*msgstream.InsertMsg{}
|
||||
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
|
||||
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}}
|
||||
iBNode.Operate([]flowgraph.Msg{iMsg})
|
||||
|
||||
@ -369,19 +399,26 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
|
||||
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true})
|
||||
assert.Equal(t, len(iBNode.segmentCheckPoints), 2)
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123))
|
||||
|
||||
assert.Equal(t, len(flushUnit), 3)
|
||||
assert.Equal(t, flushUnit[2].segID, int64(1))
|
||||
assert.Equal(t, len(flushUnit[2].numRows), 2)
|
||||
assert.Equal(t, flushUnit[2].numRows[2], int64(100+32000))
|
||||
assert.Equal(t, flushUnit[2].numRows[1], int64(50+16000+100+32000))
|
||||
t.Log(flushUnit[2].openSegCheckpoints)
|
||||
assert.Equal(t, len(flushUnit[2].openSegCheckpoints), 2)
|
||||
assert.Equal(t, flushUnit[2].openSegCheckpoints[1].Timestamp, Timestamp(345))
|
||||
assert.Equal(t, flushUnit[2].openSegCheckpoints[2].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, len(flushUnit[2].checkPoint), 3)
|
||||
assert.Equal(t, flushUnit[2].checkPoint[1].numRows, int64(50+16000+100+32000))
|
||||
assert.Equal(t, flushUnit[2].checkPoint[2].numRows, int64(100+32000))
|
||||
assert.Equal(t, flushUnit[2].checkPoint[3].numRows, int64(0))
|
||||
assert.Equal(t, flushUnit[2].checkPoint[1].pos.Timestamp, Timestamp(345))
|
||||
assert.Equal(t, flushUnit[2].checkPoint[2].pos.Timestamp, Timestamp(234))
|
||||
assert.Equal(t, flushUnit[2].checkPoint[3].pos.Timestamp, Timestamp(123))
|
||||
assert.Equal(t, len(flushUnit[2].field2Path), 0)
|
||||
assert.NotNil(t, flushUnit[2].field2Path)
|
||||
assert.True(t, flushUnit[2].flushed)
|
||||
assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
|
||||
assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
|
||||
|
||||
flushChan <- &flushMsg{
|
||||
msgID: 4,
|
||||
@ -400,12 +437,16 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
|
||||
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
|
||||
assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true})
|
||||
assert.Equal(t, len(iBNode.segmentCheckPoints), 1)
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
|
||||
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
|
||||
assert.Equal(t, len(flushUnit), 4)
|
||||
assert.Equal(t, flushUnit[3].segID, int64(3))
|
||||
assert.Equal(t, len(flushUnit[3].numRows), 2)
|
||||
assert.Equal(t, flushUnit[3].numRows[3], int64(50+16000))
|
||||
assert.Equal(t, flushUnit[3].numRows[2], int64(100+32000))
|
||||
assert.Equal(t, len(flushUnit[3].checkPoint), 2)
|
||||
assert.Equal(t, flushUnit[3].checkPoint[3].numRows, int64(50+16000))
|
||||
assert.Equal(t, flushUnit[3].checkPoint[2].numRows, int64(100+32000))
|
||||
assert.Equal(t, flushUnit[3].checkPoint[3].pos.Timestamp, Timestamp(234))
|
||||
assert.Equal(t, flushUnit[3].checkPoint[2].pos.Timestamp, Timestamp(234))
|
||||
assert.Greater(t, len(flushUnit[3].field2Path), 0)
|
||||
assert.NotNil(t, flushUnit[3].field2Path)
|
||||
assert.True(t, flushUnit[3].flushed)
|
||||
|
Loading…
Reference in New Issue
Block a user