Update query node bloom filter (#10109)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-10-18 20:08:42 +08:00 committed by GitHub
parent 86f711dd26
commit 24a6fc096b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 271 additions and 74 deletions

View File

@ -41,4 +41,7 @@ const (
// DefaultShardsNum defines the default number of shards when creating a collection
DefaultShardsNum = int32(2)
// InvalidPartitionID indicates that the partition is not specified. It will be set when the partitionName is empty
InvalidPartitionID = int64(-1)
)

View File

@ -4696,6 +4696,7 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
}
dt.DeleteRequest.CollectionID = collID
// If partitionName is not empty, partitionID will be set.
if len(dt.req.PartitionName) > 0 {
partName := dt.req.PartitionName
if err := ValidatePartitionTag(partName, true); err != nil {
@ -4708,6 +4709,8 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
return err
}
dt.DeleteRequest.PartitionID = partID
} else {
dt.DeleteRequest.PartitionID = common.InvalidPartitionID
}
schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.req.CollectionName)

View File

@ -12,6 +12,8 @@
package querynode
import (
"encoding/binary"
"fmt"
"sync"
"github.com/opentracing/opentracing-go"
@ -128,6 +130,82 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
wg.Wait()
delData := &deleteData{
deleteIDs: make(map[UniqueID][]int64),
deleteTimestamps: make(map[UniqueID][]Timestamp),
deleteOffset: make(map[UniqueID]int64),
}
// 1. filter segment by bloom filter
for _, delMsg := range iMsg.deleteMessages {
var partitionIDs []UniqueID
var err error
if delMsg.PartitionID != -1 {
partitionIDs = []UniqueID{delMsg.PartitionID}
} else {
partitionIDs, err = iNode.replica.getPartitionIDs(delMsg.CollectionID)
if err != nil {
log.Warn(err.Error())
continue
}
}
resultSegmentIDs := make([]UniqueID, 0)
for _, partitionID := range partitionIDs {
segmentIDs, err := iNode.replica.getSegmentIDs(partitionID)
if err != nil {
log.Warn(err.Error())
continue
}
resultSegmentIDs = append(resultSegmentIDs, segmentIDs...)
}
for _, segmentID := range resultSegmentIDs {
segment, err := iNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Warn(err.Error())
continue
}
exist, err := filterSegmentsByPKs(delMsg.PrimaryKeys, segment)
if err != nil {
log.Warn(err.Error())
continue
}
if exist {
offset := segment.segmentPreDelete(len(delMsg.PrimaryKeys))
if err != nil {
log.Warn(err.Error())
continue
}
delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], delMsg.PrimaryKeys...)
delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], delMsg.Timestamps...)
delData.deleteOffset[segmentID] = offset
}
}
}
// 2. do preDelete
for segmentID := range delData.deleteIDs {
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Warn(err.Error())
}
var numOfRecords = len(delData.deleteIDs[segmentID])
if targetSegment != nil {
offset := targetSegment.segmentPreDelete(numOfRecords)
if err != nil {
log.Warn(err.Error())
}
delData.deleteOffset[segmentID] = offset
log.Debug("insertNode operator", zap.Int("delete size", numOfRecords), zap.Int64("delete offset", offset), zap.Int64("segment id", segmentID))
}
}
// 3. do delete
for segmentID := range delData.deleteIDs {
wg.Add(1)
go iNode.delete(delData, segmentID, &wg)
}
wg.Wait()
var res Msg = &serviceTimeMsg{
timeRange: iMsg.timeRange,
}
@ -138,6 +216,22 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
return []Msg{res}
}
func filterSegmentsByPKs(pks []int64, segment *Segment) (bool, error) {
if pks == nil {
return false, fmt.Errorf("pks is nil when getSegmentsByPKs")
}
if segment == nil {
return false, fmt.Errorf("segments is nil when getSegmentsByPKs")
}
buf := make([]byte, 8)
for _, pk := range pks {
binary.BigEndian.PutUint64(buf, uint64(pk))
exist := segment.pkFilter.Test(buf)
return exist, nil
}
return false, nil
}
func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) {
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)

View File

@ -12,11 +12,14 @@
package querynode
import (
"encoding/binary"
"sync"
"testing"
"github.com/bits-and-blooms/bloom/v3"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
@ -230,10 +233,92 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
msgInsertMsg, err := genSimpleInsertMsg()
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
iMsg := insertMsg{
insertMessages: []*msgstream.InsertMsg{
msgInsertMsg,
},
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&iMsg}
insertNode.Operate(msg)
})
t.Run("test invalid partitionID", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
err = replica.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeGrowing,
true)
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
msgDeleteMsg.PartitionID = common.InvalidPartitionID
assert.NoError(t, err)
iMsg := insertMsg{
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&iMsg}
insertNode.Operate(msg)
})
t.Run("test collection partition not exist", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
err = replica.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeGrowing,
true)
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
msgDeleteMsg.CollectionID = 9999
msgDeleteMsg.PartitionID = -1
assert.NoError(t, err)
iMsg := insertMsg{
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&iMsg}
insertNode.Operate(msg)
})
t.Run("test partition not exist", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
err = replica.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeGrowing,
true)
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
msgDeleteMsg.PartitionID = 9999
assert.NoError(t, err)
iMsg := insertMsg{
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&iMsg}
insertNode.Operate(msg)
@ -254,12 +339,41 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
msgInsertMsg, err := genSimpleInsertMsg()
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
iMsg := insertMsg{
insertMessages: []*msgstream.InsertMsg{
msgInsertMsg,
},
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&iMsg, &iMsg}
insertNode.Operate(msg)
})
}
func TestGetSegmentsByPKs(t *testing.T) {
buf := make([]byte, 8)
filter := bloom.NewWithEstimates(1000000, 0.01)
for i := 0; i < 3; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
filter.Add(buf)
}
segment := &Segment{
segmentID: 1,
pkFilter: filter,
}
exist, err := filterSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segment)
assert.Nil(t, err)
assert.True(t, exist)
exist, err = filterSegmentsByPKs([]int64{}, segment)
assert.Nil(t, err)
assert.False(t, exist)
_, err = filterSegmentsByPKs(nil, segment)
assert.NotNil(t, err)
_, err = filterSegmentsByPKs([]int64{0, 1, 2, 3, 4}, nil)
assert.NotNil(t, err)
}

View File

@ -1223,27 +1223,6 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
return nil
}
func getSegmentsByPKs(pks []int64, segments []*Segment) (map[int64][]int64, error) {
if pks == nil {
return nil, fmt.Errorf("pks is nil when getSegmentsByPKs")
}
if segments == nil {
return nil, fmt.Errorf("segments is nil when getSegmentsByPKs")
}
results := make(map[int64][]int64)
buf := make([]byte, 8)
for _, segment := range segments {
for _, pk := range pks {
binary.BigEndian.PutUint64(buf, uint64(pk))
exist := segment.pkFilter.Test(buf)
if exist {
results[segment.segmentID] = append(results[segment.segmentID], pk)
}
}
}
return results, nil
}
func mergeRetrieveResults(dataArr []*segcorepb.RetrieveResults) (*segcorepb.RetrieveResults, error) {
var final *segcorepb.RetrieveResults
for _, data := range dataArr {

View File

@ -9,7 +9,6 @@ import (
"testing"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
@ -210,56 +209,6 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
streaming.close()
}
func TestGetSegmentsByPKs(t *testing.T) {
buf := make([]byte, 8)
filter1 := bloom.NewWithEstimates(1000000, 0.01)
for i := 0; i < 3; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
filter1.Add(buf)
}
filter2 := bloom.NewWithEstimates(1000000, 0.01)
for i := 3; i < 5; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
filter2.Add(buf)
}
segment1 := &Segment{
segmentID: 1,
pkFilter: filter1,
}
segment2 := &Segment{
segmentID: 2,
pkFilter: filter1,
}
segment3 := &Segment{
segmentID: 3,
pkFilter: filter1,
}
segment4 := &Segment{
segmentID: 4,
pkFilter: filter2,
}
segment5 := &Segment{
segmentID: 5,
pkFilter: filter2,
}
segments := []*Segment{segment1, segment2, segment3, segment4, segment5}
results, err := getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segments)
assert.Nil(t, err)
expected := map[int64][]int64{
1: {0, 1, 2},
2: {0, 1, 2},
3: {0, 1, 2},
4: {3, 4},
5: {3, 4},
}
assert.Equal(t, expected, results)
_, err = getSegmentsByPKs(nil, segments)
assert.NotNil(t, err)
_, err = getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, nil)
assert.NotNil(t, err)
}
func TestQueryCollection_unsolvedMsg(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

View File

@ -50,6 +50,11 @@ const (
segmentTypeIndexing
)
const (
bloomFilterSize uint = 100000
maxBloomFalsePositive float64 = 0.005
)
type VectorFieldInfo struct {
fieldBinlog *datapb.FieldBinlog
}
@ -198,6 +203,8 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID
onService: onService,
indexInfos: make(map[int64]*indexInfo),
vectorFieldInfos: make(map[UniqueID]*VectorFieldInfo),
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
}
return segment
@ -593,6 +600,13 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
for _, id := range *entityIDs {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(id))
s.pkFilter.Add(b)
}
// Blobs to one big blob
var numOfRow = len(*entityIDs)
var sizeofPerRow = len((*records)[0].Value)

View File

@ -15,17 +15,19 @@ import (
"context"
"errors"
"fmt"
"path"
"strconv"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
@ -182,6 +184,12 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment
if err != nil {
return err
}
log.Debug("loading bloom filter...")
err = loader.loadSegmentBloomFilter(segment)
if err != nil {
return err
}
for _, id := range indexedFieldIDs {
log.Debug("loading index...")
err = loader.indexLoader.loadIndex(segment, id)
@ -296,7 +304,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
default:
return errors.New("unexpected field data type")
}
if fieldID == rootcoord.TimeStampField {
if fieldID == common.TimeStampField {
segment.setIDBinlogRowSizes(numRows)
}
totalNumRows := int64(0)
@ -312,6 +320,39 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
return nil
}
func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment) error {
// Todo: get path from etcd
p := path.Join("files/stats_log", JoinIDPath(segment.collectionID, segment.partitionID, segment.segmentID, common.RowIDField))
keys, values, err := loader.minioKV.LoadWithPrefix(p)
if err != nil {
return err
}
blobs := make([]*storage.Blob, 0)
for i := 0; i < len(keys); i++ {
blobs = append(blobs, &storage.Blob{Key: keys[i], Value: []byte(values[i])})
}
stats, err := storage.DeserializeStats(blobs)
if err != nil {
return err
}
for _, stat := range stats {
err = segment.pkFilter.Merge(stat.BF)
if err != nil {
return err
}
}
return nil
}
// JoinIDPath joins ids to path format.
func JoinIDPath(ids ...UniqueID) string {
idStr := make([]string, len(ids))
for _, id := range ids {
idStr = append(idStr, strconv.FormatInt(id, 10))
}
return path.Join(idStr...)
}
func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader {
option := &minioKV.Option{