diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 009e3c477a..2ac47bff8f 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -260,7 +260,6 @@ func TestFlushSegment(t *testing.T) { key := path.Join(Params.StatsBinlogRootPath, k) _, values, _ := mockMinIO.LoadWithPrefix(key) assert.Equal(t, len(values), 1) - assert.Equal(t, values[0], `{"max":9,"min":0}`) } func genCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 14d908d08a..4790ea73f9 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -303,7 +303,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique statsWriter := &StatsWriter{} switch field.DataType { case schemapb.DataType_Int64: - err = statsWriter.StatsInt64(singleData.(*Int64FieldData).Data) + err = statsWriter.StatsInt64(field.FieldID, singleData.(*Int64FieldData).Data) } if err != nil { return nil, nil, err diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 42a4f1b647..7045d0ffc0 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -251,13 +251,13 @@ func TestInsertCodec(t *testing.T) { }, }, } - Blobs1, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1) + Blobs1, statsBlob1, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1) assert.Nil(t, err) for _, blob := range Blobs1 { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 100) assert.Equal(t, blob.GetKey(), blob.Key) } - Blobs2, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2) + Blobs2, statsBlob2, err := insertCodec.Serialize(PartitionID, SegmentID, insertData2) assert.Nil(t, err) for _, blob := range Blobs2 { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99) @@ -302,6 +302,12 @@ func TestInsertCodec(t *testing.T) { assert.NotNil(t, err) _, _, _, _, err = insertCodec.DeserializeAll(blobs) assert.NotNil(t, err) + + _, err = DeserializeStats(statsBlob1) + assert.Nil(t, err) + + _, err = DeserializeStats(statsBlob2) + assert.Nil(t, err) } func TestDeleteCodec(t *testing.T) { diff --git a/internal/storage/stats.go b/internal/storage/stats.go index d192074d8b..3f008c7de5 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -12,12 +12,27 @@ package storage import ( + "encoding/binary" "encoding/json" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/milvus-io/milvus/internal/common" ) +const ( + // TODO silverxia maybe need set from config + bloomFilterSize uint = 100000 + maxBloomFalsePositive float64 = 0.005 +) + +type Stats interface { +} + type Int64Stats struct { - Max int64 `json:"max"` - Min int64 `json:"min"` + FieldID int64 `json:"fieldID"` + Max int64 `json:"max"` + Min int64 `json:"min"` + BF *bloom.BloomFilter `json:"bf"` } type StatsWriter struct { @@ -28,15 +43,24 @@ func (sw *StatsWriter) GetBuffer() []byte { return sw.buffer } -func (sw *StatsWriter) StatsInt64(msgs []int64) error { +func (sw *StatsWriter) StatsInt64(fieldID int64, msgs []int64) error { if len(msgs) < 1 { // return error: msgs must has one element at least return nil } stats := &Int64Stats{ - Max: msgs[len(msgs)-1], - Min: msgs[0], + FieldID: fieldID, + Max: msgs[len(msgs)-1], + Min: msgs[0], + BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), + } + if fieldID == common.RowIDField { + b := make([]byte, 8) + for _, msg := range msgs { + binary.LittleEndian.PutUint64(b, uint64(msg)) + stats.BF.Add(b) + } } b, err := json.Marshal(stats) if err != nil { @@ -55,8 +79,29 @@ func (sr *StatsReader) SetBuffer(buffer []byte) { sr.buffer = buffer } -func (sr *StatsReader) GetInt64Stats() Int64Stats { - stats := Int64Stats{} - json.Unmarshal(sr.buffer, &stats) - return stats +func (sr *StatsReader) GetInt64Stats() (*Int64Stats, error) { + stats := &Int64Stats{} + err := json.Unmarshal(sr.buffer, &stats) + if err != nil { + return nil, err + } + return stats, nil +} + +func DeserializeStats(blobs []*Blob) ([]*Int64Stats, error) { + results := make([]*Int64Stats, len(blobs)) + for i, blob := range blobs { + if blob.Value == nil { + continue + } + sr := &StatsReader{} + sr.SetBuffer(blob.Value) + stats, err := sr.GetInt64Stats() + if err != nil { + return nil, err + } + results[i] = stats + } + return results, nil + } diff --git a/internal/storage/stats_test.go b/internal/storage/stats_test.go index db544ce0f6..78c2869e1f 100644 --- a/internal/storage/stats_test.go +++ b/internal/storage/stats_test.go @@ -12,30 +12,34 @@ package storage import ( + "encoding/binary" "testing" + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/rootcoord" "github.com/stretchr/testify/assert" ) func TestStatsWriter_StatsInt64(t *testing.T) { data := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9} sw := &StatsWriter{} - err := sw.StatsInt64(data) + err := sw.StatsInt64(common.RowIDField, data) assert.NoError(t, err) b := sw.GetBuffer() - assert.Equal(t, string(b), `{"max":9,"min":1}`) - sr := &StatsReader{} sr.SetBuffer(b) - stats := sr.GetInt64Stats() - expectedStats := Int64Stats{ - Max: 9, - Min: 1, + stats, err := sr.GetInt64Stats() + assert.Nil(t, err) + assert.Equal(t, stats.Max, int64(9)) + assert.Equal(t, stats.Min, int64(1)) + buffer := make([]byte, 8) + for _, id := range data { + binary.LittleEndian.PutUint64(buffer, uint64(id)) + assert.True(t, stats.BF.Test(buffer)) } - assert.Equal(t, stats, expectedStats) msgs := []int64{} - err = sw.StatsInt64(msgs) + err = sw.StatsInt64(rootcoord.RowIDField, msgs) assert.Nil(t, err) }