Change segment replica struct (#8608)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-09-27 14:38:00 +08:00 committed by GitHub
parent 5ff2c41c97
commit 77a773dff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 33 deletions

View File

@ -190,7 +190,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err
}
dn := newDeleteDNode(dsService.replica)
dn := newDeleteNode(dsService.replica, vchanInfo.GetChannelName())
var deleteNode Node = dn

View File

@ -24,7 +24,8 @@ import (
type deleteNode struct {
BaseNode
replica Replica
channelName string
replica Replica
}
func (dn *deleteNode) Name() string {
@ -52,36 +53,36 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
return []Msg{}
}
// getSegmentsByPKs returns the bloom filter check result.
// filterSegmentByPK returns the bloom filter check result.
// If the key may exists in the segment, returns it in map.
// If the key not exists in the segment, the segment is filter out.
func getSegmentsByPKs(pks []int64, segments []*Segment) (map[int64][]int64, error) {
func (dn *deleteNode) filterSegmentByPK(pks []int64) (map[int64][]int64, error) {
if pks == nil {
return nil, errors.New("pks is nil")
}
if segments == nil {
return nil, errors.New("segments is nil")
}
results := make(map[int64][]int64)
buf := make([]byte, 8)
segments := dn.replica.getSegments(dn.channelName)
for _, segment := range segments {
for _, pk := range pks {
binary.BigEndian.PutUint64(buf, uint64(pk))
exist := segment.pkFilter.Test(buf)
if exist {
results[segment.segmentID] = append(results[segment.segmentID], pk)
results[pk] = append(results[pk], segment.segmentID)
}
}
}
return results, nil
}
func newDeleteDNode(replica Replica) *deleteNode {
func newDeleteNode(replica Replica, channelName string) *deleteNode {
baseNode := BaseNode{}
baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)
return &deleteNode{
BaseNode: baseNode,
replica: replica,
channelName: channelName,
replica: replica,
}
}

View File

@ -19,6 +19,28 @@ import (
"github.com/stretchr/testify/assert"
)
type mockReplica struct {
Replica
newSegments map[UniqueID]*Segment
normalSegments map[UniqueID]*Segment
flushedSegments map[UniqueID]*Segment
}
func (replica *mockReplica) getSegments(channelName string) []*Segment {
results := make([]*Segment, 0)
for _, value := range replica.newSegments {
results = append(results, value)
}
for _, value := range replica.normalSegments {
results = append(results, value)
}
for _, value := range replica.flushedSegments {
results = append(results, value)
}
return results
}
func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
tests := []struct {
replica Replica
@ -30,7 +52,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn := newDeleteDNode(test.replica)
dn := newDeleteNode(test.replica, "")
assert.NotNil(t, dn)
assert.Equal(t, "deleteNode", dn.Name())
@ -84,39 +106,57 @@ func Test_GetSegmentsByPKs(t *testing.T) {
filter2.Add(buf)
}
segment1 := &Segment{
segmentID: 1,
pkFilter: filter1,
segmentID: 1,
channelName: "test",
pkFilter: filter1,
}
segment2 := &Segment{
segmentID: 2,
pkFilter: filter1,
segmentID: 2,
channelName: "test",
pkFilter: filter1,
}
segment3 := &Segment{
segmentID: 3,
pkFilter: filter1,
segmentID: 3,
channelName: "test",
pkFilter: filter1,
}
segment4 := &Segment{
segmentID: 4,
pkFilter: filter2,
segmentID: 4,
channelName: "test",
pkFilter: filter2,
}
segment5 := &Segment{
segmentID: 5,
pkFilter: filter2,
segmentID: 5,
channelName: "test",
pkFilter: filter2,
}
segments := []*Segment{segment1, segment2, segment3, segment4, segment5}
results, err := getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segments)
segment6 := &Segment{
segmentID: 5,
channelName: "test_error",
pkFilter: filter2,
}
mockReplica := &mockReplica{}
mockReplica.newSegments = make(map[int64]*Segment)
mockReplica.normalSegments = make(map[int64]*Segment)
mockReplica.flushedSegments = make(map[int64]*Segment)
mockReplica.newSegments[segment1.segmentID] = segment1
mockReplica.newSegments[segment2.segmentID] = segment2
mockReplica.normalSegments[segment3.segmentID] = segment3
mockReplica.normalSegments[segment4.segmentID] = segment4
mockReplica.flushedSegments[segment5.segmentID] = segment5
mockReplica.flushedSegments[segment6.segmentID] = segment6
dn := newDeleteNode(mockReplica, "test")
results, err := dn.filterSegmentByPK([]int64{0, 1, 2, 3, 4})
assert.Nil(t, err)
expected := map[int64][]int64{
1: {0, 1, 2},
2: {0, 1, 2},
3: {0, 1, 2},
4: {3, 4},
5: {3, 4},
0: {1, 2, 3},
1: {1, 2, 3},
2: {1, 2, 3},
3: {4, 5},
4: {4, 5},
}
for key, value := range expected {
assert.EqualValues(t, value, results[key])
}
assert.Equal(t, expected, results)
_, err = getSegmentsByPKs(nil, segments)
assert.NotNil(t, err)
_, err = getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, nil)
assert.NotNil(t, err)
}

View File

@ -43,6 +43,7 @@ type Replica interface {
addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error
addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error
getSegments(channelName string) []*Segment
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
@ -222,6 +223,29 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID
return nil
}
// getSegments return segments with same channelName
func (replica *SegmentReplica) getSegments(channelName string) []*Segment {
replica.segMu.Lock()
defer replica.segMu.Unlock()
results := make([]*Segment, 0)
for _, value := range replica.newSegments {
if value.channelName == channelName {
results = append(results, value)
}
}
for _, value := range replica.normalSegments {
if value.channelName == channelName {
results = append(results, value)
}
}
for _, value := range replica.flushedSegments {
if value.channelName == channelName {
results = append(results, value)
}
}
return results
}
// addNormalSegment adds a *NotNew* and *NotFlushed* segment. Before add, please make sure there's no
// such segment by `hasSegment`
func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error {
@ -265,6 +289,46 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
return nil
}
// addFlushedSegment adds a *Flushed* segment. Before add, please make sure there's no
// such segment by `hasSegment`
func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64) error {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if collID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
zap.Int64("expected ID", replica.collectionID))
return fmt.Errorf("Mismatch collection, ID=%d", collID)
}
log.Debug("Add Normal segment",
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName),
)
seg := &Segment{
collectionID: collID,
partitionID: partitionID,
segmentID: segID,
channelName: channelName,
numRows: numOfRows,
//TODO silverxia, normal segments bloom filter and pk range should be loaded from serialized files
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
minPK: math.MaxInt64, // use max value, represents no value
maxPK: math.MinInt64, // use min value represents no value
}
seg.isNew.Store(false)
seg.isFlushed.Store(true)
replica.flushedSegments[segID] = seg
return nil
}
// listNewSegmentsStartPositions gets all *New Segments* start positions and
// transfer segments states from *New* to *Normal*.
func (replica *SegmentReplica) listNewSegmentsStartPositions() []*datapb.SegmentStartPosition {

View File

@ -555,6 +555,12 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
assert.Equal(t, int64(10), replica.normalSegments[UniqueID(0)].checkPoint.numRows)
replica.updateSegmentCheckPoint(1)
assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows)
err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0))
assert.Nil(t, err)
totalSegments := replica.getSegments("insert-01")
assert.Equal(t, len(totalSegments), 3)
})
}