Fix compaction bug (#11816)

This PR fixes:
- timeout in compaction
- BinlogIterator using rawID as PKField bug

See also: #11757

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-11-17 10:07:13 +08:00 committed by GitHub
parent 8a0ee27799
commit 9bd3664672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 56 additions and 34 deletions

View File

@ -152,6 +152,8 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
}
dbuff.updateSize(dbuff.delData.RowCount)
log.Debug("mergeDeltalogs end", zap.Int64("PlanID", t.getPlanID()),
zap.Int("number of pks to compact in insert logs", len(pk2ts)))
return pk2ts, dbuff, nil
}
@ -196,7 +198,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
return nil, 0, errors.New("Unexpected error")
}
if _, ok := delta[v.ID]; ok {
if _, ok := delta[v.PK]; ok {
continue
}
@ -249,6 +251,8 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
}
}
log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows))
return iDatas, numRows, nil
}
@ -279,7 +283,7 @@ func (t *compactionTask) compact() error {
targetSegID = t.plan.GetSegmentBinlogs()[0].GetSegmentID()
}
log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID()))
log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("timeout in seconds", t.plan.GetTimeoutInSeconds()))
segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs()))
for _, s := range t.plan.GetSegmentBinlogs() {
segIDs = append(segIDs, s.GetSegmentID())
@ -311,8 +315,18 @@ func (t *compactionTask) compact() error {
// SegmentID to deltaBlobs
dblobs = make(map[UniqueID][]*Blob)
dmu sync.Mutex
PKfieldID UniqueID
)
// Get PK fieldID
for _, fs := range meta.GetSchema().GetFields() {
if fs.GetFieldID() >= 100 && fs.GetDataType() == schemapb.DataType_Int64 && fs.GetIsPrimaryKey() {
PKfieldID = fs.GetFieldID()
break
}
}
g, gCtx := errgroup.WithContext(ctxTimeout)
for _, s := range t.plan.GetSegmentBinlogs() {
@ -332,7 +346,7 @@ func (t *compactionTask) compact() error {
return err
}
itr, err := storage.NewInsertBinlogIterator(bs)
itr, err := storage.NewInsertBinlogIterator(bs, PKfieldID)
if err != nil {
log.Warn("new insert binlogs Itr wrong")
return err

View File

@ -242,7 +242,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
iitr, err := storage.NewInsertBinlogIterator(iblobs)
iitr, err := storage.NewInsertBinlogIterator(iblobs, 106)
require.NoError(t, err)
mitr := storage.NewMergeIterator([]iterator{iitr})
@ -432,13 +432,13 @@ func TestCompactorInterfaceMethods(t *testing.T) {
require.True(t, replica.hasSegment(segID2, true))
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name")
iData1 := genInsertDataWithRowIDs([2]int64{1, 2})
iData1 := genInsertDataWithPKs([2]int64{1, 2})
dData1 := &DeleteData{
Pks: []UniqueID{1},
Tss: []Timestamp{20000},
RowCount: 1,
}
iData2 := genInsertDataWithRowIDs([2]int64{9, 10})
iData2 := genInsertDataWithPKs([2]int64{9, 10})
dData2 := &DeleteData{
Pks: []UniqueID{9},
Tss: []Timestamp{30000},

View File

@ -119,11 +119,6 @@ type DataNode struct {
msFactory msgstream.Factory
}
type plan struct {
channelName string
cancel context.CancelFunc
}
// NewDataNode will return a DataNode with abnormal state.
func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
rand.Seed(time.Now().UnixNano())
@ -754,7 +749,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
binlogIO := &binlogIO{node.blobKv, ds.idAllocator}
task := newCompactionTask(
ctx,
node.ctx,
binlogIO, binlogIO,
ds.replica,
ds.flushManager,

View File

@ -241,6 +241,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
log.Debug("insert seg buffer status", zap.Int("No.", k),
zap.Int64("segmentID", segID),
zap.String("vchannel name", ibNode.channelName),
zap.Int64("buffer size", bd.(*BufferData).size),
zap.Int64("buffer limit", bd.(*BufferData).limit))
}
@ -261,7 +262,9 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
if fgMsg.dropCollection {
segmentsToFlush := ibNode.replica.listAllSegmentIDs()
log.Debug("Recive drop collection req and flushing all segments",
zap.Any("segments", segmentsToFlush))
zap.Any("segments", segmentsToFlush),
zap.String("vchannel name", ibNode.channelName),
)
flushTaskList = make([]flushTask, 0, len(segmentsToFlush))
for _, seg2Flush := range segmentsToFlush {
@ -287,7 +290,10 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
for _, segToFlush := range seg2Upload {
// If full, auto flush
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
log.Warn("Auto flush", zap.Int64("segment id", segToFlush))
log.Info("Auto flush",
zap.Int64("segment id", segToFlush),
zap.String("vchannel name", ibNode.channelName),
)
ibuffer := bd.(*BufferData)
flushTaskList = append(flushTaskList, flushTask{
@ -306,6 +312,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
log.Debug(". Receiving flush message",
zap.Int64("segmentID", fmsg.segmentID),
zap.Int64("collectionID", fmsg.collectionID),
zap.String("vchannel name", ibNode.channelName),
)
// merging auto&manual flush segment same segment id
dup := false
@ -708,7 +715,6 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
//
// Currently, the statistics includes segment ID and its total number of rows in memory.
func (ibNode *insertBufferNode) uploadMemStates2Coord(segIDs []UniqueID) error {
log.Debug("Updating segments statistics...")
statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
for _, segID := range segIDs {
updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID)
@ -718,8 +724,10 @@ func (ibNode *insertBufferNode) uploadMemStates2Coord(segIDs []UniqueID) error {
}
log.Debug("Segment Statistics to Update",
zap.Int64("Segment ID", updates.GetSegmentID()),
zap.Int64("NumOfRows", updates.GetNumRows()),
zap.Int64("segment ID", updates.GetSegmentID()),
zap.Int64("collection ID", ibNode.replica.getCollectionID()),
zap.String("vchannel name", ibNode.channelName),
zap.Int64("numOfRows", updates.GetNumRows()),
)
statsUpdates = append(statsUpdates, updates)

View File

@ -691,9 +691,9 @@ func (f *FailMessageStreamFactory) NewTtMsgStream(ctx context.Context) (msgstrea
return nil, errors.New("mocked failure")
}
func genInsertDataWithRowIDs(rowIDs [2]int64) *InsertData {
func genInsertDataWithPKs(PKs [2]int64) *InsertData {
iD := genInsertData()
iD.Data[0].(*s.Int64FieldData).Data = rowIDs[:]
iD.Data[106].(*s.Int64FieldData).Data = PKs[:]
return iD
}
@ -703,7 +703,7 @@ func genInsertData() *InsertData {
Data: map[int64]s.FieldData{
0: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{11, 22},
},
1: &s.Int64FieldData{
NumRows: []int64{2},
@ -737,7 +737,7 @@ func genInsertData() *InsertData {
},
106: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{11, 12},
Data: []int64{1, 2},
},
107: &s.FloatFieldData{
NumRows: []int64{2},

View File

@ -554,7 +554,7 @@ func (replica *SegmentReplica) refreshFlushedSegStatistics(segID UniqueID, numRo
return
}
log.Warn("refesh numRow on not exists segment", zap.Int64("segID", segID))
log.Warn("refresh numRow on not exists segment", zap.Int64("segID", segID))
}
// updateStatistics updates the number of rows of a segment in replica.

View File

@ -38,6 +38,7 @@ type Iterator interface {
// Value is the return value of HasNext
type Value struct {
ID int64
PK int64
Timestamp int64
IsDeleted bool
Value interface{}
@ -45,14 +46,14 @@ type Value struct {
// InsertBinlogIterator is the iterator of binlog
type InsertBinlogIterator struct {
dispose int32 // 0: false, 1: true
data *InsertData
fieldID int64
pos int
dispose int32 // 0: false, 1: true
data *InsertData
PKfieldID int64
pos int
}
// NewInsertBinlogIterator creates a new iterator
func NewInsertBinlogIterator(blobs []*Blob) (*InsertBinlogIterator, error) {
func NewInsertBinlogIterator(blobs []*Blob, PKfieldID UniqueID) (*InsertBinlogIterator, error) {
// TODO: load part of file to read records other than loading all content
reader := NewInsertCodec(nil)
@ -63,7 +64,7 @@ func NewInsertBinlogIterator(blobs []*Blob) (*InsertBinlogIterator, error) {
return nil, err
}
return &InsertBinlogIterator{data: serData}, nil
return &InsertBinlogIterator{data: serData, PKfieldID: PKfieldID}, nil
}
// HasNext returns true if the iterator have unread record
@ -89,6 +90,7 @@ func (itr *InsertBinlogIterator) Next() (interface{}, error) {
v := &Value{
ID: itr.data.Data[rootcoord.RowIDField].Get(itr.pos).(int64),
Timestamp: itr.data.Data[rootcoord.TimeStampField].Get(itr.pos).(int64),
PK: itr.data.Data[itr.PKfieldID].Get(itr.pos).(int64),
IsDeleted: false,
Value: m,
}

View File

@ -56,7 +56,7 @@ func TestInsertlogIterator(t *testing.T) {
t.Run("test dispose", func(t *testing.T) {
blobs := generateTestData(t, 1)
itr, err := NewInsertBinlogIterator(blobs)
itr, err := NewInsertBinlogIterator(blobs, rootcoord.RowIDField)
assert.Nil(t, err)
itr.Dispose()
@ -67,7 +67,7 @@ func TestInsertlogIterator(t *testing.T) {
t.Run("not empty iterator", func(t *testing.T) {
blobs := generateTestData(t, 3)
itr, err := NewInsertBinlogIterator(blobs)
itr, err := NewInsertBinlogIterator(blobs, rootcoord.RowIDField)
assert.Nil(t, err)
for i := 1; i <= 3; i++ {
@ -76,6 +76,7 @@ func TestInsertlogIterator(t *testing.T) {
assert.Nil(t, err)
value := v.(*Value)
expected := &Value{
int64(i),
int64(i),
int64(i),
false,
@ -106,7 +107,7 @@ func TestMergeIterator(t *testing.T) {
t.Run("empty and non-empty iterators", func(t *testing.T) {
blobs := generateTestData(t, 3)
insertItr, err := NewInsertBinlogIterator(blobs)
insertItr, err := NewInsertBinlogIterator(blobs, rootcoord.RowIDField)
assert.Nil(t, err)
iterators := []Iterator{
&InsertBinlogIterator{data: &InsertData{}},
@ -121,6 +122,7 @@ func TestMergeIterator(t *testing.T) {
assert.Nil(t, err)
value := v.(*Value)
expected := &Value{
int64(i),
int64(i),
int64(i),
false,
@ -135,15 +137,16 @@ func TestMergeIterator(t *testing.T) {
t.Run("non-empty iterators", func(t *testing.T) {
blobs := generateTestData(t, 3)
itr1, err := NewInsertBinlogIterator(blobs)
itr1, err := NewInsertBinlogIterator(blobs, rootcoord.RowIDField)
assert.Nil(t, err)
itr2, err := NewInsertBinlogIterator(blobs)
itr2, err := NewInsertBinlogIterator(blobs, rootcoord.RowIDField)
assert.Nil(t, err)
iterators := []Iterator{itr1, itr2}
itr := NewMergeIterator(iterators)
for i := 1; i <= 3; i++ {
expected := &Value{
int64(i),
int64(i),
int64(i),
false,
@ -165,7 +168,7 @@ func TestMergeIterator(t *testing.T) {
t.Run("test dispose", func(t *testing.T) {
blobs := generateTestData(t, 3)
itr1, err := NewInsertBinlogIterator(blobs)
itr1, err := NewInsertBinlogIterator(blobs, rootcoord.RowIDField)
assert.Nil(t, err)
itr := NewMergeIterator([]Iterator{itr1})