Add bloom filter for stats (#9630)

* Add bloom filter for stats

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* trigger GitHub actions

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-10-13 10:22:33 +08:00 committed by GitHub
parent 7384b6d798
commit 59ab0e441c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 22 deletions

View File

@ -260,7 +260,6 @@ func TestFlushSegment(t *testing.T) {
key := path.Join(Params.StatsBinlogRootPath, k)
_, values, _ := mockMinIO.LoadWithPrefix(key)
assert.Equal(t, len(values), 1)
assert.Equal(t, values[0], `{"max":9,"min":0}`)
}
func genCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {

View File

@ -303,7 +303,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
statsWriter := &StatsWriter{}
switch field.DataType {
case schemapb.DataType_Int64:
err = statsWriter.StatsInt64(singleData.(*Int64FieldData).Data)
err = statsWriter.StatsInt64(field.FieldID, singleData.(*Int64FieldData).Data)
}
if err != nil {
return nil, nil, err

View File

@ -251,13 +251,13 @@ func TestInsertCodec(t *testing.T) {
},
},
}
Blobs1, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1)
Blobs1, statsBlob1, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1)
assert.Nil(t, err)
for _, blob := range Blobs1 {
blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 100)
assert.Equal(t, blob.GetKey(), blob.Key)
}
Blobs2, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2)
Blobs2, statsBlob2, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2)
assert.Nil(t, err)
for _, blob := range Blobs2 {
blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99)
@ -302,6 +302,12 @@ func TestInsertCodec(t *testing.T) {
assert.NotNil(t, err)
_, _, _, _, err = insertCodec.DeserializeAll(blobs)
assert.NotNil(t, err)
_, err = DeserializeStats(statsBlob1)
assert.Nil(t, err)
_, err = DeserializeStats(statsBlob2)
assert.Nil(t, err)
}
func TestDeleteCodec(t *testing.T) {

View File

@ -12,12 +12,27 @@
package storage
import (
"encoding/binary"
"encoding/json"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/common"
)
const (
// TODO silverxia maybe need set from config
bloomFilterSize uint = 100000
maxBloomFalsePositive float64 = 0.005
)
type Stats interface {
}
type Int64Stats struct {
Max int64 `json:"max"`
Min int64 `json:"min"`
FieldID int64 `json:"fieldID"`
Max int64 `json:"max"`
Min int64 `json:"min"`
BF *bloom.BloomFilter `json:"bf"`
}
type StatsWriter struct {
@ -28,15 +43,24 @@ func (sw *StatsWriter) GetBuffer() []byte {
return sw.buffer
}
func (sw *StatsWriter) StatsInt64(msgs []int64) error {
func (sw *StatsWriter) StatsInt64(fieldID int64, msgs []int64) error {
if len(msgs) < 1 {
// return error: msgs must has one element at least
return nil
}
stats := &Int64Stats{
Max: msgs[len(msgs)-1],
Min: msgs[0],
FieldID: fieldID,
Max: msgs[len(msgs)-1],
Min: msgs[0],
BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
}
if fieldID == common.RowIDField {
b := make([]byte, 8)
for _, msg := range msgs {
binary.LittleEndian.PutUint64(b, uint64(msg))
stats.BF.Add(b)
}
}
b, err := json.Marshal(stats)
if err != nil {
@ -55,8 +79,29 @@ func (sr *StatsReader) SetBuffer(buffer []byte) {
sr.buffer = buffer
}
func (sr *StatsReader) GetInt64Stats() Int64Stats {
stats := Int64Stats{}
json.Unmarshal(sr.buffer, &stats)
return stats
func (sr *StatsReader) GetInt64Stats() (*Int64Stats, error) {
stats := &Int64Stats{}
err := json.Unmarshal(sr.buffer, &stats)
if err != nil {
return nil, err
}
return stats, nil
}
func DeserializeStats(blobs []*Blob) ([]*Int64Stats, error) {
results := make([]*Int64Stats, len(blobs))
for i, blob := range blobs {
if blob.Value == nil {
continue
}
sr := &StatsReader{}
sr.SetBuffer(blob.Value)
stats, err := sr.GetInt64Stats()
if err != nil {
return nil, err
}
results[i] = stats
}
return results, nil
}

View File

@ -12,30 +12,34 @@
package storage
import (
"encoding/binary"
"testing"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/stretchr/testify/assert"
)
func TestStatsWriter_StatsInt64(t *testing.T) {
data := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}
sw := &StatsWriter{}
err := sw.StatsInt64(data)
err := sw.StatsInt64(common.RowIDField, data)
assert.NoError(t, err)
b := sw.GetBuffer()
assert.Equal(t, string(b), `{"max":9,"min":1}`)
sr := &StatsReader{}
sr.SetBuffer(b)
stats := sr.GetInt64Stats()
expectedStats := Int64Stats{
Max: 9,
Min: 1,
stats, err := sr.GetInt64Stats()
assert.Nil(t, err)
assert.Equal(t, stats.Max, int64(9))
assert.Equal(t, stats.Min, int64(1))
buffer := make([]byte, 8)
for _, id := range data {
binary.LittleEndian.PutUint64(buffer, uint64(id))
assert.True(t, stats.BF.Test(buffer))
}
assert.Equal(t, stats, expectedStats)
msgs := []int64{}
err = sw.StatsInt64(msgs)
err = sw.StatsInt64(rootcoord.RowIDField, msgs)
assert.Nil(t, err)
}