Use segment statslog instead of separate statslog (#18775)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-09-02 10:34:59 +08:00 committed by GitHub
parent 4c1e7bc1bd
commit 29c72cad15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 460 additions and 137 deletions

View File

@ -302,17 +302,10 @@ func (m *meta) UpdateFlushSegmentsInfo(
}
}
clonedSegment.Binlogs = currBinlogs
// statlogs
currStatsLogs := clonedSegment.GetStatslogs()
for _, tStatsLogs := range statslogs {
fieldStatsLog := getFieldBinlogs(tStatsLogs.GetFieldID(), currStatsLogs)
if fieldStatsLog == nil {
currStatsLogs = append(currStatsLogs, tStatsLogs)
} else {
fieldStatsLog.Binlogs = append(fieldStatsLog.Binlogs, tStatsLogs.Binlogs...)
}
// statlogs, overwrite latest segment stats log
if len(statslogs) > 0 {
clonedSegment.Statslogs = statslogs
}
clonedSegment.Statslogs = currStatsLogs
// deltalogs
currDeltaLogs := clonedSegment.GetDeltalogs()
for _, tDeltaLogs := range deltalogs {

View File

@ -27,6 +27,7 @@ import (
"syscall"
"time"
"github.com/blang/semver/v4"
"github.com/minio/minio-go/v7"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
@ -413,7 +414,8 @@ var getCheckBucketFn = func(cli *minio.Client) func() error {
}
func (s *Server) initServiceDiscovery() error {
sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
r := semver.MustParseRange(">=2.1.2")
sessions, rev, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r)
if err != nil {
log.Warn("DataCoord failed to init service discovery", zap.Error(err))
return err
@ -432,7 +434,7 @@ func (s *Server) initServiceDiscovery() error {
s.cluster.Startup(s.ctx, datanodes)
// TODO implement rewatch logic
s.dnEventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
s.dnEventCh = s.session.WatchServicesWithVersionRange(typeutil.DataNodeRole, r, rev+1, nil)
//icSessions, icRevision, err := s.session.GetSessions(typeutil.IndexCoordRole)
//if err != nil {

View File

@ -51,7 +51,7 @@ type uploader interface {
//
// errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress.
// Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever.
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error)
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, segStats []byte, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error)
}
type binlogIO struct {
@ -110,6 +110,7 @@ func (b *binlogIO) upload(
ctx context.Context,
segID, partID UniqueID,
iDatas []*InsertData,
segStats []byte,
dData *DeleteData,
meta *etcdpb.CollectionMeta) (*segPaths, error) {
@ -131,7 +132,7 @@ func (b *binlogIO) upload(
continue
}
kv, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
kv, inpaths, err := b.genInsertBlobs(iData, partID, segID, meta)
if err != nil {
log.Warn("generate insert blobs wrong",
zap.Int64("collectionID", meta.GetID()),
@ -153,16 +154,25 @@ func (b *binlogIO) upload(
}
insertField2Path[fID] = tmpBinlog
}
}
for fID, path := range statspaths {
tmpBinlog, ok := statsField2Path[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
statsField2Path[fID] = tmpBinlog
}
pkID := getPKID(meta)
if pkID == common.InvalidFieldID {
log.Error("get invalid field id when finding pk", zap.Int64("collectionID", meta.GetID()), zap.Any("fields", meta.GetSchema().GetFields()))
return nil, errors.New("invalid pk id")
}
logID, err := b.allocID()
if err != nil {
return nil, err
}
k := JoinIDPath(meta.GetID(), partID, segID, pkID, logID)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
fileLen := len(segStats)
kvs[key] = segStats
statsField2Path[pkID] = &datapb.FieldBinlog{
FieldID: pkID,
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}},
}
for _, path := range insertField2Path {
@ -195,7 +205,7 @@ func (b *binlogIO) upload(
})
}
var err = errStart
err = errStart
for err != nil {
select {
case <-ctx.Done():
@ -237,25 +247,24 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI
}
// genInsertBlobs returns kvs, insert-paths, stats-paths
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, error) {
inCodec := storage.NewInsertCodec(meta)
inlogs, statslogs, err := inCodec.Serialize(partID, segID, data)
inlogs, _, err := inCodec.Serialize(partID, segID, data)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
var (
kvs = make(map[string][]byte, len(inlogs)+len(statslogs))
inpaths = make(map[UniqueID]*datapb.FieldBinlog)
statspaths = make(map[UniqueID]*datapb.FieldBinlog)
kvs = make(map[string][]byte, len(inlogs)+1)
inpaths = make(map[UniqueID]*datapb.FieldBinlog)
)
notifyGenIdx := make(chan struct{})
defer close(notifyGenIdx)
generator, err := b.idxGenerator(len(inlogs)+len(statslogs), notifyGenIdx)
generator, err := b.idxGenerator(len(inlogs)+1, notifyGenIdx)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
for _, blob := range inlogs {
@ -274,24 +283,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
}
}
for _, blob := range statslogs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
value := blob.GetValue()
fileLen := len(value)
kvs[key] = value
statspaths[fID] = &datapb.FieldBinlog{
FieldID: fID,
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}},
}
}
return kvs, inpaths, statspaths, nil
return kvs, inpaths, nil
}
func (b *binlogIO) idxGenerator(n int, done <-chan struct{}) (<-chan UniqueID, error) {

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -53,7 +52,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
Tss: []uint64{666666},
}
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(p.inPaths))
assert.Equal(t, 1, len(p.statsPaths))
@ -61,18 +60,18 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs()))
assert.NotNil(t, p.deltaInfo)
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, dData, meta)
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, []byte{}, dData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(p.inPaths))
assert.Equal(t, 1, len(p.statsPaths))
assert.Equal(t, 2, len(p.inPaths[0].GetBinlogs()))
assert.Equal(t, 2, len(p.statsPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs()))
assert.NotNil(t, p.deltaInfo)
ctx, cancel := context.WithCancel(context.Background())
cancel()
p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, p)
})
@ -86,17 +85,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
}
iData := genEmptyInsertData()
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
assert.NoError(t, err)
assert.Empty(t, p.inPaths)
assert.Empty(t, p.statsPaths)
assert.NotEmpty(t, p.statsPaths)
assert.Empty(t, p.deltaInfo)
iData = &InsertData{Data: make(map[int64]storage.FieldData)}
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
assert.NoError(t, err)
assert.Empty(t, p.inPaths)
assert.Empty(t, p.statsPaths)
assert.NotEmpty(t, p.statsPaths)
assert.Empty(t, p.deltaInfo)
iData = genInsertData()
@ -105,7 +104,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
Tss: []uint64{1},
RowCount: 1,
}
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
@ -119,9 +118,23 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
RowCount: 1,
}
ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond)
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
alloc.isvalid = false
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
alloc.isvalid = true
for _, field := range meta.GetSchema().GetFields() {
field.IsPrimaryKey = false
}
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
cancel()
})
@ -254,60 +267,55 @@ func TestBinlogIOInnerMethods(t *testing.T) {
tests := []struct {
pkType schemapb.DataType
description string
expectError bool
}{
{schemapb.DataType_Int64, "int64PrimaryField"},
{schemapb.DataType_VarChar, "varCharPrimaryField"},
{schemapb.DataType_Int64, "int64PrimaryField", false},
{schemapb.DataType_VarChar, "varCharPrimaryField", false},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType)
helper, err := typeutil.CreateSchemaHelper(meta.Schema)
assert.NoError(t, err)
primaryKeyFieldSchema, err := helper.GetPrimaryKeyField()
assert.NoError(t, err)
primaryKeyFieldID := primaryKeyFieldSchema.GetFieldID()
kvs, pin, pstats, err := b.genInsertBlobs(genInsertData(), 10, 1, meta)
kvs, pin, err := b.genInsertBlobs(genInsertData(), 10, 1, meta)
if test.expectError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, 1, len(pstats))
assert.Equal(t, 12, len(pin))
assert.Equal(t, 13, len(kvs))
assert.Equal(t, 12, len(kvs))
log.Debug("test paths",
zap.Any("kvs no.", len(kvs)),
zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()),
zap.String("stats paths field0", pstats[primaryKeyFieldID].GetBinlogs()[0].GetLogPath()))
zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()))
})
}
})
t.Run("Test genInsertBlobs error", func(t *testing.T) {
kvs, pin, pstats, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil)
kvs, pin, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
kvs, pin, pstats, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta)
kvs, pin, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
errAlloc := NewAllocatorFactory()
errAlloc.errAllocBatch = true
bin := &binlogIO{cm, errAlloc}
kvs, pin, pstats, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta)
kvs, pin, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
})
t.Run("Test idxGenerator", func(t *testing.T) {

View File

@ -25,6 +25,7 @@ import (
"sync"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -128,6 +129,27 @@ func (t *compactionTask) getChannelName() string {
return t.plan.GetChannel()
}
func (t *compactionTask) getPlanTargetEntryNumber() int64 {
if t.plan == nil {
// if plan empty return default size
return int64(bloomFilterSize)
}
var result int64
for _, info := range t.plan.GetSegmentBinlogs() {
for _, fieldLog := range info.GetFieldBinlogs() {
for _, binlog := range fieldLog.GetBinlogs() {
result += binlog.GetEntriesNum()
}
}
}
// prevent bloom filter too small
if result == 0 {
return int64(bloomFilterSize)
}
return result
}
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[interface{}]Timestamp, *DelDataBuf, error) {
mergeStart := time.Now()
dCodec := storage.NewDeleteCodec()
@ -186,7 +208,7 @@ func nano2Milli(nano time.Duration) float64 {
return float64(nano) / float64(time.Millisecond)
}
func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) {
func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, []byte, int64, error) {
mergeStart := time.Now()
var (
@ -196,6 +218,11 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
expired int64 // the number of expired entity
err error
// statslog generation
segment *Segment // empty segment used for bf generation
pkID UniqueID
pkType schemapb.DataType
iDatas = make([]*InsertData, 0)
fID2Type = make(map[UniqueID]schemapb.DataType)
fID2Content = make(map[UniqueID][]interface{})
@ -209,16 +236,27 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
return false
}
//
targetRowCount := t.getPlanTargetEntryNumber()
log.Debug("merge estimate target row count", zap.Int64("row count", targetRowCount))
segment = &Segment{
pkFilter: bloom.NewWithEstimates(uint(targetRowCount), maxBloomFalsePositive),
}
// get dim
for _, fs := range schema.GetFields() {
fID2Type[fs.GetFieldID()] = fs.GetDataType()
if fs.GetIsPrimaryKey() {
pkID = fs.GetFieldID()
pkType = fs.GetDataType()
}
if fs.GetDataType() == schemapb.DataType_FloatVector ||
fs.GetDataType() == schemapb.DataType_BinaryVector {
for _, t := range fs.GetTypeParams() {
if t.Key == "dim" {
if dim, err = strconv.Atoi(t.Value); err != nil {
log.Warn("strconv wrong on get dim", zap.Error(err))
return nil, 0, err
return nil, nil, 0, err
}
break
}
@ -234,7 +272,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
v, ok := vInter.(*storage.Value)
if !ok {
log.Warn("transfer interface to Value wrong")
return nil, 0, errors.New("unexpected error")
return nil, nil, 0, errors.New("unexpected error")
}
if isDeletedValue(v) {
@ -251,7 +289,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
row, ok := v.Value.(map[UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong")
return nil, 0, errors.New("unexpected error")
return nil, nil, 0, errors.New("unexpected error")
}
for fID, vInter := range row {
@ -278,7 +316,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
tp, ok := fID2Type[fID]
if !ok {
log.Warn("no field ID in this schema", zap.Int64("fieldID", fID))
return nil, 0, errors.New("Unexpected error")
return nil, nil, 0, errors.New("Unexpected error")
}
for i := 0; i < numBinlogs; i++ {
@ -294,17 +332,30 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
if err != nil {
log.Warn("transfer interface to FieldData wrong", zap.Error(err))
return nil, 0, err
return nil, nil, 0, err
}
if fID == pkID {
err = segment.updatePKRange(fData)
if err != nil {
log.Warn("update pk range failed", zap.Error(err))
return nil, nil, 0, err
}
}
iDatas[i].Data[fID] = fData
}
}
// marshal segment statslog
segStats, err := segment.getSegmentStatslog(pkID, pkType)
if err != nil {
log.Warn("failed to generate segment statslog", zap.Int64("pkID", pkID), zap.Error(err))
return nil, nil, 0, err
}
log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows),
zap.Int64("expired entities", expired),
zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart))))
return iDatas, numRows, nil
return iDatas, segStats, numRows, nil
}
func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
@ -471,14 +522,14 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
return nil, err
}
iDatas, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime())
iDatas, segStats, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime())
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return nil, err
}
uploadStart := time.Now()
segPaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta)
segPaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, segStats, deltaBuf.delData, meta)
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return nil, err

View File

@ -24,6 +24,7 @@ import (
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -264,11 +265,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{}
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 1, len(idata))
assert.NotEmpty(t, idata[0].Data)
assert.NotEmpty(t, segStats)
})
t.Run("Merge without expiration2", func(t *testing.T) {
Params.CommonCfg.EntityExpirationTTL = 0
@ -291,11 +293,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
dm := map[interface{}]Timestamp{}
ct := &compactionTask{}
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 2, len(idata))
assert.NotEmpty(t, idata[0].Data)
assert.NotEmpty(t, segStats)
})
t.Run("Merge with expiration", func(t *testing.T) {
@ -315,10 +318,63 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{}
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp())
idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp())
assert.NoError(t, err)
assert.Equal(t, int64(1), numOfRow)
assert.Equal(t, 1, len(idata))
assert.NotEmpty(t, segStats)
})
t.Run("Merge with meta error", func(t *testing.T) {
Params.CommonCfg.EntityExpirationTTL = 0
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
iitr, err := storage.NewInsertBinlogIterator(iblobs, 106, schemapb.DataType_Int64)
require.NoError(t, err)
mitr := storage.NewMergeIterator([]iterator{iitr})
dm := map[interface{}]Timestamp{
1: 10000,
}
ct := &compactionTask{}
_, _, _, err = ct.merge(mitr, dm, &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "64"},
}},
}}, ct.GetCurrentTime())
assert.Error(t, err)
})
t.Run("Merge with meta type param error", func(t *testing.T) {
Params.CommonCfg.EntityExpirationTTL = 0
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
iitr, err := storage.NewInsertBinlogIterator(iblobs, 106, schemapb.DataType_Int64)
require.NoError(t, err)
mitr := storage.NewMergeIterator([]iterator{iitr})
dm := map[interface{}]Timestamp{
1: 10000,
}
ct := &compactionTask{}
_, _, _, err = ct.merge(mitr, dm, &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "dim"},
}},
}}, ct.GetCurrentTime())
assert.Error(t, err)
})
})
@ -503,7 +559,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 1,
}
cpaths, err := mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, dData, meta)
cpaths, err := mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, dData, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths.inPaths))
segBinlogs := []*datapb.CompactionSegmentBinlogs{
@ -553,7 +609,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
err = cm.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, deleteAllData, meta)
cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, deleteAllData, meta)
require.NoError(t, err)
plan.PlanID++
@ -568,7 +624,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
// Compact empty segment
err = cm.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, dData, meta)
cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, dData, meta)
require.NoError(t, err)
plan.PlanID = 999876
segmentBinlogsWithEmptySegment := []*datapb.CompactionSegmentBinlogs{
@ -585,7 +641,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
// Deltas in timetravel range
err = cm.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, dData, meta)
cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, dData, meta)
require.NoError(t, err)
plan.PlanID++
@ -601,7 +657,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
// Timeout
err = cm.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, dData, meta)
cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, dData, meta)
require.NoError(t, err)
plan.PlanID++
@ -678,11 +734,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 1,
}
cpaths1, err := mockbIO.upload(context.TODO(), c.segID1, c.parID, []*InsertData{iData1}, dData1, meta)
cpaths1, err := mockbIO.upload(context.TODO(), c.segID1, c.parID, []*InsertData{iData1}, []byte{}, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths1.inPaths))
cpaths2, err := mockbIO.upload(context.TODO(), c.segID2, c.parID, []*InsertData{iData2}, dData2, meta)
cpaths2, err := mockbIO.upload(context.TODO(), c.segID2, c.parID, []*InsertData{iData2}, []byte{}, dData2, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths2.inPaths))
@ -810,11 +866,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 0,
}
cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta)
cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, []byte{}, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths1.inPaths))
cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta)
cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, []byte{}, dData2, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths2.inPaths))
@ -862,7 +918,7 @@ type mockFlushManager struct {
var _ flushManager = (*mockFlushManager)(nil)
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error {
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error {
if mfm.returnError {
return fmt.Errorf("mock error")
}

View File

@ -384,8 +384,16 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
zap.Bool("dropped", task.dropped),
zap.Any("pos", endPositions[0]),
)
segStats, err := ibNode.replica.getSegmentStatslog(task.segmentID)
if err != nil {
log.Error("failed to get segment stats log", zap.Int64("segmentID", task.segmentID), zap.Error(err))
panic(err)
}
err = retry.Do(ibNode.ctx, func() error {
return ibNode.flushManager.flushBufferData(task.buffer,
segStats,
task.segmentID,
task.flushed,
task.dropped,

View File

@ -41,7 +41,7 @@ import (
// flushManager defines a flush manager signature
type flushManager interface {
// notify flush manager insert buffer data
flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error
flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error
// notify flush manager del buffer data
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error
// injectFlush injects compaction or other blocking task before flush sync
@ -335,7 +335,7 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush
// flushBufferData notifies flush manager insert buffer data.
// This method will be retired on errors. Final errors will be propagated upstream and logged.
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool,
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool,
dropped bool, pos *internalpb.MsgPosition) error {
tr := timerecord.NewTimeRecorder("flushDuration")
// empty flush
@ -360,12 +360,13 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
// encode data and convert output data
inCodec := storage.NewInsertCodec(meta)
binLogs, statsBinlogs, err := inCodec.Serialize(partID, segmentID, data.buffer)
binLogs, _, err := inCodec.Serialize(partID, segmentID, data.buffer)
if err != nil {
return err
}
start, _, err := m.allocIDBatch(uint32(len(binLogs)))
// binlogs + 1 statslog
start, _, err := m.allocIDBatch(uint32(len(binLogs) + 1))
if err != nil {
return err
}
@ -400,27 +401,22 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
field2Stats := make(map[UniqueID]*datapb.Binlog)
// write stats binlog
for _, blob := range statsBinlogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return err
}
logidx := field2Logidx[fieldID]
pkID := getPKID(meta)
if pkID == common.InvalidFieldID {
return fmt.Errorf("failed to get pk id for segment %d", segmentID)
}
// no error raise if alloc=false
k := JoinIDPath(collID, partID, segmentID, fieldID, logidx)
key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
kvs[key] = blob.Value
field2Stats[fieldID] = &datapb.Binlog{
EntriesNum: 0,
TimestampFrom: 0, //TODO
TimestampTo: 0, //TODO,
LogPath: key,
LogSize: int64(len(blob.Value)),
}
logidx := start + int64(len(binLogs))
k := JoinIDPath(collID, partID, segmentID, pkID, logidx)
key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
kvs[key] = segStats
field2Stats[pkID] = &datapb.Binlog{
EntriesNum: 0,
TimestampFrom: 0, //TODO
TimestampTo: 0, //TODO,
LogPath: key,
LogSize: int64(len(segStats)),
}
m.updateSegmentCheckPoint(segmentID)

View File

@ -166,7 +166,7 @@ func TestRendezvousFlushManager(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
@ -213,7 +213,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
@ -228,10 +228,10 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
rand.Read(id)
id2 := make([]byte, 10)
rand.Read(id2)
m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, 2, true, false, &internalpb.MsgPosition{
MsgID: id,
})
m.flushBufferData(nil, 3, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, 3, true, false, &internalpb.MsgPosition{
MsgID: id2,
})
@ -256,7 +256,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
finish.Add(1)
rand.Read(id)
m.flushBufferData(nil, 2, false, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, 2, false, false, &internalpb.MsgPosition{
MsgID: id,
})
ti = newTaskInjection(1, func(pack *segmentFlushPack) {
@ -340,7 +340,7 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
mut.RUnlock()
for i := 0; i < size/2; i++ {
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
}
@ -350,7 +350,7 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
mut.RUnlock()
for i := size / 2; i < size; i++ {
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
}
@ -384,7 +384,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
})
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, -1, true, false, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
@ -397,7 +397,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
target := make(map[int64]struct{})
for i := 1; i < 11; i++ {
target[int64(i)] = struct{}{}
m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, int64(i), true, false, &internalpb.MsgPosition{
MsgID: []byte{1},
})
m.flushDelData(nil, int64(i), &internalpb.MsgPosition{
@ -436,7 +436,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
})
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, -1, true, false, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
@ -457,7 +457,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
})
for i := 1; i < 11; i++ {
m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, int64(i), true, false, &internalpb.MsgPosition{
MsgID: []byte{1},
})
m.flushDelData(nil, int64(i), &internalpb.MsgPosition{
@ -504,7 +504,7 @@ func TestRendezvousFlushManager_close(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()

View File

@ -16,7 +16,11 @@
package datanode
import "github.com/milvus-io/milvus/internal/proto/datapb"
import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
)
// reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2
func reviseVChannelInfo(vChannel *datapb.VchannelInfo) {
@ -61,3 +65,14 @@ func reviseVChannelInfo(vChannel *datapb.VchannelInfo) {
}
vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds())
}
// getPKID returns the primary key field id from collection meta.
func getPKID(meta *etcdpb.CollectionMeta) UniqueID {
for _, field := range meta.GetSchema().GetFields() {
if field.GetIsPrimaryKey() {
return field.GetFieldID()
}
}
return common.InvalidFieldID
}

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
@ -79,6 +80,7 @@ type Replica interface {
refreshFlushedSegStatistics(segID UniqueID, numRows int64)
getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error)
segmentFlushed(segID UniqueID)
getSegmentStatslog(segID UniqueID) ([]byte, error)
}
// Segment is the data structure of segments in data node replica.
@ -168,6 +170,18 @@ func (s *Segment) updatePKRange(ids storage.FieldData) error {
return nil
}
func (s *Segment) getSegmentStatslog(pkID UniqueID, pkType schemapb.DataType) ([]byte, error) {
pks := storage.PrimaryKeyStats{
FieldID: pkID,
PkType: int64(pkType),
MaxPk: s.maxPK,
MinPk: s.minPK,
BF: s.pkFilter,
}
return json.Marshal(pks)
}
var _ Replica = &SegmentReplica{}
func newReplica(ctx context.Context, rc types.RootCoord, cm storage.ChunkManager, collID UniqueID) (*SegmentReplica, error) {
@ -864,3 +878,37 @@ func (replica *SegmentReplica) listNotFlushedSegmentIDs() []UniqueID {
return segIDs
}
// getSegmentStatslog returns the segment statslog for the provided segment id.
func (replica *SegmentReplica) getSegmentStatslog(segID UniqueID) ([]byte, error) {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
schema, err := replica.getCollectionSchema(replica.collectionID, 0)
if err != nil {
return nil, err
}
var pkID UniqueID
var pkType schemapb.DataType
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() {
pkID = field.GetFieldID()
pkType = field.GetDataType()
}
}
if seg, ok := replica.newSegments[segID]; ok {
return seg.getSegmentStatslog(pkID, pkType)
}
if seg, ok := replica.normalSegments[segID]; ok {
return seg.getSegmentStatslog(pkID, pkType)
}
if seg, ok := replica.flushedSegments[segID]; ok {
return seg.getSegmentStatslog(pkID, pkType)
}
return nil, fmt.Errorf("segment not found: %d", segID)
}

View File

@ -22,10 +22,12 @@ import (
"fmt"
"math/rand"
"testing"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -793,6 +795,52 @@ func TestSegmentReplica_UpdatePKRange(t *testing.T) {
}
}
func TestSegment_getSegmentStatslog(t *testing.T) {
rand.Seed(time.Now().UnixNano())
cases := make([][]int64, 0, 100)
for i := 0; i < 100; i++ {
tc := make([]int64, 0, 10)
for j := 0; j < 100; j++ {
tc = append(tc, rand.Int63())
}
cases = append(cases, tc)
}
buf := make([]byte, 8)
for _, tc := range cases {
seg := &Segment{
pkFilter: bloom.NewWithEstimates(100000, 0.005),
}
seg.updatePKRange(&storage.Int64FieldData{
Data: tc,
})
statBytes, err := seg.getSegmentStatslog(1, schemapb.DataType_Int64)
assert.NoError(t, err)
pks := storage.PrimaryKeyStats{}
err = json.Unmarshal(statBytes, &pks)
require.NoError(t, err)
assert.Equal(t, int64(1), pks.FieldID)
assert.Equal(t, int64(schemapb.DataType_Int64), pks.PkType)
for _, v := range tc {
pk := newInt64PrimaryKey(v)
assert.True(t, pks.MinPk.LE(pk))
assert.True(t, pks.MaxPk.GE(pk))
common.Endian.PutUint64(buf, uint64(v))
assert.True(t, seg.pkFilter.Test(buf))
}
}
pks := &storage.PrimaryKeyStats{}
_, err := json.Marshal(pks)
assert.NoError(t, err)
}
func TestReplica_UpdatePKRange(t *testing.T) {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
@ -844,3 +892,102 @@ func TestReplica_UpdatePKRange(t *testing.T) {
}
}
// SegmentReplicaSuite setup test suite for SegmentReplica
type SegmentReplicaSuite struct {
suite.Suite
sr *SegmentReplica
collID UniqueID
partID UniqueID
vchanName string
cm *storage.LocalChunkManager
}
func (s *SegmentReplicaSuite) SetupSuite() {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
s.collID = 1
s.cm = storage.NewLocalChunkManager(storage.RootPath(segmentReplicaNodeTestDir))
var err error
s.sr, err = newReplica(context.Background(), rc, s.cm, s.collID)
s.Require().NoError(err)
}
func (s *SegmentReplicaSuite) TearDownSuite() {
s.cm.RemoveWithPrefix("")
}
func (s *SegmentReplicaSuite) SetupTest() {
var err error
err = s.sr.addNewSegment(1, s.collID, s.partID, s.vchanName, &internalpb.MsgPosition{}, nil)
s.Require().NoError(err)
err = s.sr.addNormalSegment(2, s.collID, s.partID, s.vchanName, 10, nil, nil, 0)
s.Require().NoError(err)
err = s.sr.addFlushedSegment(3, s.collID, s.partID, s.vchanName, 10, nil, 0)
s.Require().NoError(err)
}
func (s *SegmentReplicaSuite) TearDownTest() {
s.sr.removeSegments(1, 2, 3)
}
func (s *SegmentReplicaSuite) TestGetSegmentStatslog() {
bs, err := s.sr.getSegmentStatslog(1)
s.NoError(err)
segment, ok := s.getSegmentByID(1)
s.Require().True(ok)
expected, err := segment.getSegmentStatslog(106, schemapb.DataType_Int64)
s.Require().NoError(err)
s.Equal(expected, bs)
bs, err = s.sr.getSegmentStatslog(2)
s.NoError(err)
segment, ok = s.getSegmentByID(2)
s.Require().True(ok)
expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64)
s.Require().NoError(err)
s.Equal(expected, bs)
bs, err = s.sr.getSegmentStatslog(3)
s.NoError(err)
segment, ok = s.getSegmentByID(3)
s.Require().True(ok)
expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64)
s.Require().NoError(err)
s.Equal(expected, bs)
_, err = s.sr.getSegmentStatslog(4)
s.Error(err)
}
func (s *SegmentReplicaSuite) getSegmentByID(id UniqueID) (*Segment, bool) {
s.sr.segMu.RLock()
defer s.sr.segMu.RUnlock()
seg, ok := s.sr.newSegments[id]
if ok {
return seg, true
}
seg, ok = s.sr.normalSegments[id]
if ok {
return seg, true
}
seg, ok = s.sr.flushedSegments[id]
if ok {
return seg, true
}
return nil, false
}
func TestSegmentReplicaSuite(t *testing.T) {
suite.Run(t, new(SegmentReplicaSuite))
}

View File

@ -600,8 +600,15 @@ func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment, binlogPath
stats, err := storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize stats", zap.Error(err))
return err
}
// just one BF, just use it
if len(stats) == 1 && stats[0].BF != nil {
segment.pkFilter = stats[0].BF
return nil
}
// legacy merge
for _, stat := range stats {
if stat.BF == nil {
log.Warn("stat log with nil bloom filter", zap.Int64("segmentID", segment.segmentID), zap.Any("stat", stat))