diff --git a/internal/common/common.go b/internal/common/common.go index 6f6e9b010a..3e7d4e7ab8 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -41,4 +41,7 @@ const ( // DefaultShardsNum defines the default number of shards when creating a collection DefaultShardsNum = int32(2) + + // InvalidPartitionID indicates that the partition is not specified. It will be set when the partitionName is empty + InvalidPartitionID = int64(-1) ) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index bd332f9c5b..adfa71a546 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -4696,6 +4696,7 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { } dt.DeleteRequest.CollectionID = collID + // If partitionName is not empty, partitionID will be set. if len(dt.req.PartitionName) > 0 { partName := dt.req.PartitionName if err := ValidatePartitionTag(partName, true); err != nil { @@ -4708,6 +4709,8 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { return err } dt.DeleteRequest.PartitionID = partID + } else { + dt.DeleteRequest.PartitionID = common.InvalidPartitionID } schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.req.CollectionName) diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index a437634680..800032188e 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -12,6 +12,8 @@ package querynode import ( + "encoding/binary" + "fmt" "sync" "github.com/opentracing/opentracing-go" @@ -128,6 +130,82 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } wg.Wait() + delData := &deleteData{ + deleteIDs: make(map[UniqueID][]int64), + deleteTimestamps: make(map[UniqueID][]Timestamp), + deleteOffset: make(map[UniqueID]int64), + } + // 1. filter segment by bloom filter + for _, delMsg := range iMsg.deleteMessages { + var partitionIDs []UniqueID + var err error + if delMsg.PartitionID != -1 { + partitionIDs = []UniqueID{delMsg.PartitionID} + } else { + partitionIDs, err = iNode.replica.getPartitionIDs(delMsg.CollectionID) + if err != nil { + log.Warn(err.Error()) + continue + } + } + resultSegmentIDs := make([]UniqueID, 0) + for _, partitionID := range partitionIDs { + segmentIDs, err := iNode.replica.getSegmentIDs(partitionID) + if err != nil { + log.Warn(err.Error()) + continue + } + resultSegmentIDs = append(resultSegmentIDs, segmentIDs...) + } + for _, segmentID := range resultSegmentIDs { + segment, err := iNode.replica.getSegmentByID(segmentID) + if err != nil { + log.Warn(err.Error()) + continue + } + exist, err := filterSegmentsByPKs(delMsg.PrimaryKeys, segment) + if err != nil { + log.Warn(err.Error()) + continue + } + if exist { + offset := segment.segmentPreDelete(len(delMsg.PrimaryKeys)) + if err != nil { + log.Warn(err.Error()) + continue + } + delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], delMsg.PrimaryKeys...) + delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], delMsg.Timestamps...) + delData.deleteOffset[segmentID] = offset + } + } + } + + // 2. do preDelete + for segmentID := range delData.deleteIDs { + var targetSegment, err = iNode.replica.getSegmentByID(segmentID) + if err != nil { + log.Warn(err.Error()) + } + + var numOfRecords = len(delData.deleteIDs[segmentID]) + if targetSegment != nil { + offset := targetSegment.segmentPreDelete(numOfRecords) + if err != nil { + log.Warn(err.Error()) + } + delData.deleteOffset[segmentID] = offset + log.Debug("insertNode operator", zap.Int("delete size", numOfRecords), zap.Int64("delete offset", offset), zap.Int64("segment id", segmentID)) + } + } + + // 3. do delete + for segmentID := range delData.deleteIDs { + wg.Add(1) + go iNode.delete(delData, segmentID, &wg) + } + wg.Wait() + var res Msg = &serviceTimeMsg{ timeRange: iMsg.timeRange, } @@ -138,6 +216,22 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return []Msg{res} } +func filterSegmentsByPKs(pks []int64, segment *Segment) (bool, error) { + if pks == nil { + return false, fmt.Errorf("pks is nil when getSegmentsByPKs") + } + if segment == nil { + return false, fmt.Errorf("segments is nil when getSegmentsByPKs") + } + buf := make([]byte, 8) + for _, pk := range pks { + binary.BigEndian.PutUint64(buf, uint64(pk)) + exist := segment.pkFilter.Test(buf) + return exist, nil + } + return false, nil +} + func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) { log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID)) var targetSegment, err = iNode.replica.getSegmentByID(segmentID) diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index 1dffa27ae3..5440468b79 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -12,11 +12,14 @@ package querynode import ( + "encoding/binary" "sync" "testing" + "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/util/flowgraph" @@ -230,10 +233,92 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { msgInsertMsg, err := genSimpleInsertMsg() assert.NoError(t, err) + msgDeleteMsg, err := genSimpleDeleteMsg() + assert.NoError(t, err) iMsg := insertMsg{ insertMessages: []*msgstream.InsertMsg{ msgInsertMsg, }, + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, + } + msg := []flowgraph.Msg{&iMsg} + insertNode.Operate(msg) + }) + + t.Run("test invalid partitionID", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + msgDeleteMsg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + msgDeleteMsg.PartitionID = common.InvalidPartitionID + assert.NoError(t, err) + iMsg := insertMsg{ + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, + } + msg := []flowgraph.Msg{&iMsg} + insertNode.Operate(msg) + }) + + t.Run("test collection partition not exist", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + msgDeleteMsg, err := genSimpleDeleteMsg() + msgDeleteMsg.CollectionID = 9999 + msgDeleteMsg.PartitionID = -1 + assert.NoError(t, err) + iMsg := insertMsg{ + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, + } + msg := []flowgraph.Msg{&iMsg} + insertNode.Operate(msg) + }) + + t.Run("test partition not exist", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + msgDeleteMsg, err := genSimpleDeleteMsg() + msgDeleteMsg.PartitionID = 9999 + assert.NoError(t, err) + iMsg := insertMsg{ + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, } msg := []flowgraph.Msg{&iMsg} insertNode.Operate(msg) @@ -254,12 +339,41 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { msgInsertMsg, err := genSimpleInsertMsg() assert.NoError(t, err) + msgDeleteMsg, err := genSimpleDeleteMsg() + assert.NoError(t, err) iMsg := insertMsg{ insertMessages: []*msgstream.InsertMsg{ msgInsertMsg, }, + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, } msg := []flowgraph.Msg{&iMsg, &iMsg} insertNode.Operate(msg) }) } + +func TestGetSegmentsByPKs(t *testing.T) { + buf := make([]byte, 8) + filter := bloom.NewWithEstimates(1000000, 0.01) + for i := 0; i < 3; i++ { + binary.BigEndian.PutUint64(buf, uint64(i)) + filter.Add(buf) + } + segment := &Segment{ + segmentID: 1, + pkFilter: filter, + } + exist, err := filterSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segment) + assert.Nil(t, err) + assert.True(t, exist) + + exist, err = filterSegmentsByPKs([]int64{}, segment) + assert.Nil(t, err) + assert.False(t, exist) + _, err = filterSegmentsByPKs(nil, segment) + assert.NotNil(t, err) + _, err = filterSegmentsByPKs([]int64{0, 1, 2, 3, 4}, nil) + assert.NotNil(t, err) +} diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 455b3fda24..9151dbf501 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -1223,27 +1223,6 @@ func (q *queryCollection) retrieve(msg queryMsg) error { return nil } -func getSegmentsByPKs(pks []int64, segments []*Segment) (map[int64][]int64, error) { - if pks == nil { - return nil, fmt.Errorf("pks is nil when getSegmentsByPKs") - } - if segments == nil { - return nil, fmt.Errorf("segments is nil when getSegmentsByPKs") - } - results := make(map[int64][]int64) - buf := make([]byte, 8) - for _, segment := range segments { - for _, pk := range pks { - binary.BigEndian.PutUint64(buf, uint64(pk)) - exist := segment.pkFilter.Test(buf) - if exist { - results[segment.segmentID] = append(results[segment.segmentID], pk) - } - } - } - return results, nil -} - func mergeRetrieveResults(dataArr []*segcorepb.RetrieveResults) (*segcorepb.RetrieveResults, error) { var final *segcorepb.RetrieveResults for _, data := range dataArr { diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index 62e0f53523..034bfc8fc2 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - "github.com/bits-and-blooms/bloom/v3" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" @@ -210,56 +209,6 @@ func TestQueryCollection_withoutVChannel(t *testing.T) { streaming.close() } -func TestGetSegmentsByPKs(t *testing.T) { - buf := make([]byte, 8) - filter1 := bloom.NewWithEstimates(1000000, 0.01) - for i := 0; i < 3; i++ { - binary.BigEndian.PutUint64(buf, uint64(i)) - filter1.Add(buf) - } - filter2 := bloom.NewWithEstimates(1000000, 0.01) - for i := 3; i < 5; i++ { - binary.BigEndian.PutUint64(buf, uint64(i)) - filter2.Add(buf) - } - segment1 := &Segment{ - segmentID: 1, - pkFilter: filter1, - } - segment2 := &Segment{ - segmentID: 2, - pkFilter: filter1, - } - segment3 := &Segment{ - segmentID: 3, - pkFilter: filter1, - } - segment4 := &Segment{ - segmentID: 4, - pkFilter: filter2, - } - segment5 := &Segment{ - segmentID: 5, - pkFilter: filter2, - } - segments := []*Segment{segment1, segment2, segment3, segment4, segment5} - results, err := getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segments) - assert.Nil(t, err) - expected := map[int64][]int64{ - 1: {0, 1, 2}, - 2: {0, 1, 2}, - 3: {0, 1, 2}, - 4: {3, 4}, - 5: {3, 4}, - } - assert.Equal(t, expected, results) - - _, err = getSegmentsByPKs(nil, segments) - assert.NotNil(t, err) - _, err = getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, nil) - assert.NotNil(t, err) -} - func TestQueryCollection_unsolvedMsg(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 578b8427cd..91d5625e4d 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -50,6 +50,11 @@ const ( segmentTypeIndexing ) +const ( + bloomFilterSize uint = 100000 + maxBloomFalsePositive float64 = 0.005 +) + type VectorFieldInfo struct { fieldBinlog *datapb.FieldBinlog } @@ -198,6 +203,8 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID onService: onService, indexInfos: make(map[int64]*indexInfo), vectorFieldInfos: make(map[UniqueID]*VectorFieldInfo), + + pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), } return segment @@ -593,6 +600,13 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps if s.segmentPtr == nil { return errors.New("null seg core pointer") } + + for _, id := range *entityIDs { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(id)) + s.pkFilter.Add(b) + } + // Blobs to one big blob var numOfRow = len(*entityIDs) var sizeofPerRow = len((*records)[0].Value) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index f91accca2a..03970d61cc 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -15,17 +15,19 @@ import ( "context" "errors" "fmt" + "path" + "strconv" "github.com/golang/protobuf/proto" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" minioKV "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -182,6 +184,12 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment if err != nil { return err } + + log.Debug("loading bloom filter...") + err = loader.loadSegmentBloomFilter(segment) + if err != nil { + return err + } for _, id := range indexedFieldIDs { log.Debug("loading index...") err = loader.indexLoader.loadIndex(segment, id) @@ -296,7 +304,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog default: return errors.New("unexpected field data type") } - if fieldID == rootcoord.TimeStampField { + if fieldID == common.TimeStampField { segment.setIDBinlogRowSizes(numRows) } totalNumRows := int64(0) @@ -312,6 +320,39 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog return nil } +func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment) error { + // Todo: get path from etcd + p := path.Join("files/stats_log", JoinIDPath(segment.collectionID, segment.partitionID, segment.segmentID, common.RowIDField)) + keys, values, err := loader.minioKV.LoadWithPrefix(p) + if err != nil { + return err + } + blobs := make([]*storage.Blob, 0) + for i := 0; i < len(keys); i++ { + blobs = append(blobs, &storage.Blob{Key: keys[i], Value: []byte(values[i])}) + } + + stats, err := storage.DeserializeStats(blobs) + if err != nil { + return err + } + for _, stat := range stats { + err = segment.pkFilter.Merge(stat.BF) + if err != nil { + return err + } + } + return nil +} + +// JoinIDPath joins ids to path format. +func JoinIDPath(ids ...UniqueID) string { + idStr := make([]string, len(ids)) + for _, id := range ids { + idStr = append(idStr, strconv.FormatInt(id, 10)) + } + return path.Join(idStr...) +} func newSegmentLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface, etcdKV *etcdkv.EtcdKV) *segmentLoader { option := &minioKV.Option{