mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Refine datanode metacache and implement CoW (#27985)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
ccaec44930
commit
98e2aad752
83
internal/datanode/metacache/actions.go
Normal file
83
internal/datanode/metacache/actions.go
Normal file
@ -0,0 +1,83 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metacache
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
)
|
||||
|
||||
type SegmentFilter func(info *SegmentInfo) bool
|
||||
|
||||
func WithPartitionID(partitionID int64) SegmentFilter {
|
||||
return func(info *SegmentInfo) bool {
|
||||
return info.partitionID == partitionID
|
||||
}
|
||||
}
|
||||
|
||||
func WithSegmentID(segmentID int64) SegmentFilter {
|
||||
return func(info *SegmentInfo) bool {
|
||||
return info.segmentID == segmentID
|
||||
}
|
||||
}
|
||||
|
||||
func WithSegmentState(state commonpb.SegmentState) SegmentFilter {
|
||||
return func(info *SegmentInfo) bool {
|
||||
return info.state == state
|
||||
}
|
||||
}
|
||||
|
||||
func WithStartPosNotRecorded() SegmentFilter {
|
||||
return func(info *SegmentInfo) bool {
|
||||
return !info.startPosRecorded
|
||||
}
|
||||
}
|
||||
|
||||
type SegmentAction func(info *SegmentInfo)
|
||||
|
||||
func UpdateState(state commonpb.SegmentState) SegmentAction {
|
||||
return func(info *SegmentInfo) {
|
||||
info.state = state
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateCheckpoint(checkpoint *msgpb.MsgPosition) SegmentAction {
|
||||
return func(info *SegmentInfo) {
|
||||
info.checkpoint = checkpoint
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateNumOfRows(numOfRows int64) SegmentAction {
|
||||
return func(info *SegmentInfo) {
|
||||
info.numOfRows = numOfRows
|
||||
}
|
||||
}
|
||||
|
||||
func RollStats() SegmentAction {
|
||||
return func(info *SegmentInfo) {
|
||||
info.bfs.Roll()
|
||||
}
|
||||
}
|
||||
|
||||
// MergeSegmentAction is the util function to merge multiple SegmentActions into one.
|
||||
func MergeSegmentAction(actions ...SegmentAction) SegmentAction {
|
||||
return func(info *SegmentInfo) {
|
||||
for _, action := range actions {
|
||||
action(info)
|
||||
}
|
||||
}
|
||||
}
|
121
internal/datanode/metacache/actions_test.go
Normal file
121
internal/datanode/metacache/actions_test.go
Normal file
@ -0,0 +1,121 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metacache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
)
|
||||
|
||||
type SegmentFilterSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func (s *SegmentFilterSuite) TestFilters() {
|
||||
info := &SegmentInfo{}
|
||||
|
||||
partitionID := int64(1001)
|
||||
filter := WithPartitionID(partitionID)
|
||||
info.partitionID = partitionID + 1
|
||||
s.False(filter(info))
|
||||
info.partitionID = partitionID
|
||||
s.True(filter(info))
|
||||
|
||||
segmentID := int64(10001)
|
||||
filter = WithSegmentID(segmentID)
|
||||
info.segmentID = segmentID + 1
|
||||
s.False(filter(info))
|
||||
info.segmentID = segmentID
|
||||
s.True(filter(info))
|
||||
|
||||
state := commonpb.SegmentState_Growing
|
||||
filter = WithSegmentState(state)
|
||||
info.state = commonpb.SegmentState_Flushed
|
||||
s.False(filter(info))
|
||||
info.state = state
|
||||
s.True(filter(info))
|
||||
|
||||
filter = WithStartPosNotRecorded()
|
||||
info.startPosRecorded = true
|
||||
s.False(filter(info))
|
||||
info.startPosRecorded = false
|
||||
s.True(filter(info))
|
||||
}
|
||||
|
||||
func TestFilters(t *testing.T) {
|
||||
suite.Run(t, new(SegmentFilterSuite))
|
||||
}
|
||||
|
||||
type SegmentActionSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func (s *SegmentActionSuite) TestActions() {
|
||||
info := &SegmentInfo{}
|
||||
|
||||
state := commonpb.SegmentState_Flushed
|
||||
action := UpdateState(state)
|
||||
action(info)
|
||||
s.Equal(state, info.State())
|
||||
|
||||
cp := &msgpb.MsgPosition{
|
||||
MsgID: []byte{1, 2, 3, 4},
|
||||
ChannelName: "channel_1",
|
||||
Timestamp: 20000,
|
||||
}
|
||||
action = UpdateCheckpoint(cp)
|
||||
action(info)
|
||||
s.Equal(cp, info.Checkpoint())
|
||||
|
||||
numOfRows := int64(2048)
|
||||
action = UpdateNumOfRows(numOfRows)
|
||||
action(info)
|
||||
s.Equal(numOfRows, info.NumOfRows())
|
||||
}
|
||||
|
||||
func (s *SegmentActionSuite) TestMergeActions() {
|
||||
info := &SegmentInfo{}
|
||||
|
||||
var actions []SegmentAction
|
||||
state := commonpb.SegmentState_Flushed
|
||||
actions = append(actions, UpdateState(state))
|
||||
|
||||
cp := &msgpb.MsgPosition{
|
||||
MsgID: []byte{1, 2, 3, 4},
|
||||
ChannelName: "channel_1",
|
||||
Timestamp: 20000,
|
||||
}
|
||||
actions = append(actions, UpdateCheckpoint(cp))
|
||||
|
||||
numOfRows := int64(2048)
|
||||
actions = append(actions, UpdateNumOfRows(numOfRows))
|
||||
|
||||
action := MergeSegmentAction(actions...)
|
||||
action(info)
|
||||
|
||||
s.Equal(state, info.State())
|
||||
s.Equal(numOfRows, info.NumOfRows())
|
||||
s.Equal(cp, info.Checkpoint())
|
||||
}
|
||||
|
||||
func TestActions(t *testing.T) {
|
||||
suite.Run(t, new(SegmentActionSuite))
|
||||
}
|
80
internal/datanode/metacache/bloom_filter_set.go
Normal file
80
internal/datanode/metacache/bloom_filter_set.go
Normal file
@ -0,0 +1,80 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metacache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/bits-and-blooms/bloom/v3"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
)
|
||||
|
||||
type BloomFilterSet struct {
|
||||
mut sync.Mutex
|
||||
current *storage.PkStatistics
|
||||
history []*storage.PkStatistics
|
||||
}
|
||||
|
||||
func newBloomFilterSet() *BloomFilterSet {
|
||||
return &BloomFilterSet{}
|
||||
}
|
||||
|
||||
func (bfs *BloomFilterSet) PkExists(pk storage.PrimaryKey) bool {
|
||||
bfs.mut.Lock()
|
||||
defer bfs.mut.Unlock()
|
||||
if bfs.current != nil && bfs.current.PkExist(pk) {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, bf := range bfs.history {
|
||||
if bf.PkExist(pk) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error {
|
||||
bfs.mut.Lock()
|
||||
defer bfs.mut.Unlock()
|
||||
|
||||
if bfs.current == nil {
|
||||
bfs.current = &storage.PkStatistics{
|
||||
PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive),
|
||||
}
|
||||
}
|
||||
|
||||
return bfs.current.UpdatePKRange(ids)
|
||||
}
|
||||
|
||||
func (bfs *BloomFilterSet) Roll() {
|
||||
bfs.mut.Lock()
|
||||
defer bfs.mut.Unlock()
|
||||
|
||||
if bfs.current != nil {
|
||||
bfs.history = append(bfs.history, bfs.current)
|
||||
bfs.current = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (bfs *BloomFilterSet) GetHistory() []*storage.PkStatistics {
|
||||
bfs.mut.Lock()
|
||||
defer bfs.mut.Unlock()
|
||||
|
||||
return bfs.history
|
||||
}
|
93
internal/datanode/metacache/bloom_filter_set_test.go
Normal file
93
internal/datanode/metacache/bloom_filter_set_test.go
Normal file
@ -0,0 +1,93 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metacache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
)
|
||||
|
||||
type BloomFilterSetSuite struct {
|
||||
suite.Suite
|
||||
bfs *BloomFilterSet
|
||||
}
|
||||
|
||||
func (s *BloomFilterSetSuite) SetupTest() {
|
||||
s.bfs = newBloomFilterSet()
|
||||
}
|
||||
|
||||
func (s *BloomFilterSetSuite) TearDownSuite() {
|
||||
s.bfs = nil
|
||||
}
|
||||
|
||||
func (s *BloomFilterSetSuite) GetFieldData(ids []int64) storage.FieldData {
|
||||
fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{
|
||||
FieldID: 101,
|
||||
Name: "ID",
|
||||
IsPrimaryKey: true,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
|
||||
for _, id := range ids {
|
||||
err = fd.AppendRow(id)
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
return fd
|
||||
}
|
||||
|
||||
func (s *BloomFilterSetSuite) TestWriteRead() {
|
||||
ids := []int64{1, 2, 3, 4, 5}
|
||||
|
||||
for _, id := range ids {
|
||||
s.False(s.bfs.PkExists(storage.NewInt64PrimaryKey(id)), "pk shall not exist before update")
|
||||
}
|
||||
|
||||
err := s.bfs.UpdatePKRange(s.GetFieldData(ids))
|
||||
s.NoError(err)
|
||||
|
||||
for _, id := range ids {
|
||||
s.True(s.bfs.PkExists(storage.NewInt64PrimaryKey(id)), "pk shall return exist after update")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BloomFilterSetSuite) TestRoll() {
|
||||
history := s.bfs.GetHistory()
|
||||
|
||||
s.Equal(0, len(history), "history empty for new bfs")
|
||||
|
||||
ids := []int64{1, 2, 3, 4, 5}
|
||||
err := s.bfs.UpdatePKRange(s.GetFieldData(ids))
|
||||
s.NoError(err)
|
||||
|
||||
s.bfs.Roll()
|
||||
|
||||
history = s.bfs.GetHistory()
|
||||
s.Equal(1, len(history), "history shall have one entry after roll with current data")
|
||||
|
||||
s.bfs.Roll()
|
||||
history = s.bfs.GetHistory()
|
||||
s.Equal(1, len(history), "history shall have one entry after empty roll")
|
||||
}
|
||||
|
||||
func TestBloomFilterSet(t *testing.T) {
|
||||
suite.Run(t, new(BloomFilterSetSuite))
|
||||
}
|
@ -19,78 +19,69 @@ package metacache
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/pingcap/log"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
type MetaCache interface {
|
||||
NewSegment(segmentID, partitionID int64)
|
||||
UpdateSegment(newSegmentID, partitionID int64, dropSegmentIDs ...int64)
|
||||
UpdateSegments(action SegmentAction, filters ...SegmentFilter)
|
||||
CompactSegments(newSegmentID, partitionID int64, oldSegmentIDs ...int64)
|
||||
GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo
|
||||
GetSegmentIDsBy(filters ...SegmentFilter) []int64
|
||||
}
|
||||
|
||||
type SegmentFilter func(info *SegmentInfo) bool
|
||||
var _ MetaCache = (*metaCacheImpl)(nil)
|
||||
|
||||
type SegmentInfo struct {
|
||||
segmentID int64
|
||||
partitionID int64
|
||||
}
|
||||
type PkStatsFactory func(vchannel *datapb.SegmentInfo) *BloomFilterSet
|
||||
|
||||
func newSegmentInfo(segmentID, partitionID int64) *SegmentInfo {
|
||||
return &SegmentInfo{
|
||||
segmentID: segmentID,
|
||||
partitionID: partitionID,
|
||||
}
|
||||
}
|
||||
|
||||
func WithPartitionID(partitionID int64) func(info *SegmentInfo) bool {
|
||||
return func(info *SegmentInfo) bool {
|
||||
return info.partitionID == partitionID
|
||||
}
|
||||
}
|
||||
|
||||
var _ MetaCache = (*MetaCacheImpl)(nil)
|
||||
|
||||
type MetaCacheImpl struct {
|
||||
type metaCacheImpl struct {
|
||||
collectionID int64
|
||||
vChannelName string
|
||||
segmentInfos map[int64]*SegmentInfo
|
||||
mu sync.Mutex
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMetaCache(vchannel *datapb.VchannelInfo) MetaCache {
|
||||
cache := &MetaCacheImpl{
|
||||
func NewMetaCache(vchannel *datapb.VchannelInfo, factory PkStatsFactory) MetaCache {
|
||||
cache := &metaCacheImpl{
|
||||
collectionID: vchannel.GetCollectionID(),
|
||||
vChannelName: vchannel.GetChannelName(),
|
||||
segmentInfos: make(map[int64]*SegmentInfo),
|
||||
}
|
||||
|
||||
cache.init(vchannel)
|
||||
cache.init(vchannel, factory)
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *MetaCacheImpl) init(vchannel *datapb.VchannelInfo) {
|
||||
func (c *metaCacheImpl) init(vchannel *datapb.VchannelInfo, factory PkStatsFactory) {
|
||||
for _, seg := range vchannel.FlushedSegments {
|
||||
c.segmentInfos[seg.GetID()] = newSegmentInfo(seg.GetID(), seg.GetPartitionID())
|
||||
c.segmentInfos[seg.GetID()] = newSegmentInfo(seg, factory(seg))
|
||||
}
|
||||
|
||||
for _, seg := range vchannel.UnflushedSegments {
|
||||
c.segmentInfos[seg.GetID()] = newSegmentInfo(seg.GetID(), seg.GetPartitionID())
|
||||
c.segmentInfos[seg.GetID()] = newSegmentInfo(seg, factory(seg))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MetaCacheImpl) NewSegment(segmentID, partitionID int64) {
|
||||
func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if _, ok := c.segmentInfos[segmentID]; !ok {
|
||||
c.segmentInfos[segmentID] = newSegmentInfo(segmentID, partitionID)
|
||||
c.segmentInfos[segmentID] = &SegmentInfo{
|
||||
segmentID: segmentID,
|
||||
partitionID: partitionID,
|
||||
state: commonpb.SegmentState_Growing,
|
||||
startPosRecorded: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MetaCacheImpl) UpdateSegment(newSegmentID, partitionID int64, dropSegmentIDs ...int64) {
|
||||
func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, dropSegmentIDs ...int64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
@ -106,15 +97,53 @@ func (c *MetaCacheImpl) UpdateSegment(newSegmentID, partitionID int64, dropSegme
|
||||
}
|
||||
|
||||
if _, ok := c.segmentInfos[newSegmentID]; !ok {
|
||||
c.segmentInfos[newSegmentID] = newSegmentInfo(newSegmentID, partitionID)
|
||||
c.segmentInfos[newSegmentID] = &SegmentInfo{
|
||||
segmentID: newSegmentID,
|
||||
partitionID: partitionID,
|
||||
state: commonpb.SegmentState_Flushed,
|
||||
startPosRecorded: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MetaCacheImpl) GetSegmentIDsBy(filters ...SegmentFilter) []int64 {
|
||||
func (c *metaCacheImpl) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
filter := c.mergeFilters(filters...)
|
||||
|
||||
var segments []*SegmentInfo
|
||||
for _, info := range c.segmentInfos {
|
||||
if filter(info) {
|
||||
segments = append(segments, info)
|
||||
}
|
||||
}
|
||||
return segments
|
||||
}
|
||||
|
||||
func (c *metaCacheImpl) GetSegmentIDsBy(filters ...SegmentFilter) []int64 {
|
||||
segments := c.GetSegmentsBy(filters...)
|
||||
return lo.Map(segments, func(info *SegmentInfo, _ int) int64 { return info.SegmentID() })
|
||||
}
|
||||
|
||||
func (c *metaCacheImpl) UpdateSegments(action SegmentAction, filters ...SegmentFilter) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
filter := func(info *SegmentInfo) bool {
|
||||
filter := c.mergeFilters(filters...)
|
||||
|
||||
for id, info := range c.segmentInfos {
|
||||
if !filter(info) {
|
||||
continue
|
||||
}
|
||||
nInfo := info.Clone()
|
||||
action(nInfo)
|
||||
c.segmentInfos[id] = nInfo
|
||||
}
|
||||
}
|
||||
|
||||
func (c *metaCacheImpl) mergeFilters(filters ...SegmentFilter) SegmentFilter {
|
||||
return func(info *SegmentInfo) bool {
|
||||
for _, filter := range filters {
|
||||
if !filter(info) {
|
||||
return false
|
||||
@ -122,12 +151,4 @@ func (c *MetaCacheImpl) GetSegmentIDsBy(filters ...SegmentFilter) []int64 {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
segments := []int64{}
|
||||
for _, info := range c.segmentInfos {
|
||||
if filter(info) {
|
||||
segments = append(segments, info.segmentID)
|
||||
}
|
||||
}
|
||||
return segments
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
)
|
||||
|
||||
@ -36,6 +37,8 @@ type MetaCacheSuite struct {
|
||||
growingSegments []int64
|
||||
newSegments []int64
|
||||
cache MetaCache
|
||||
|
||||
bfsFactory PkStatsFactory
|
||||
}
|
||||
|
||||
func (s *MetaCacheSuite) SetupSuite() {
|
||||
@ -46,6 +49,9 @@ func (s *MetaCacheSuite) SetupSuite() {
|
||||
s.growingSegments = []int64{5, 6, 7, 8}
|
||||
s.newSegments = []int64{9, 10, 11, 12}
|
||||
s.invaliedSeg = 111
|
||||
s.bfsFactory = func(*datapb.SegmentInfo) *BloomFilterSet {
|
||||
return newBloomFilterSet()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *MetaCacheSuite) SetupTest() {
|
||||
@ -53,6 +59,7 @@ func (s *MetaCacheSuite) SetupTest() {
|
||||
return &datapb.SegmentInfo{
|
||||
ID: s.flushedSegments[i],
|
||||
PartitionID: s.partitionIDs[i],
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
}
|
||||
})
|
||||
|
||||
@ -60,6 +67,7 @@ func (s *MetaCacheSuite) SetupTest() {
|
||||
return &datapb.SegmentInfo{
|
||||
ID: s.growingSegments[i],
|
||||
PartitionID: s.partitionIDs[i],
|
||||
State: commonpb.SegmentState_Growing,
|
||||
}
|
||||
})
|
||||
|
||||
@ -68,7 +76,7 @@ func (s *MetaCacheSuite) SetupTest() {
|
||||
ChannelName: s.vchannel,
|
||||
FlushedSegments: flushSegmentInfos,
|
||||
UnflushedSegments: growingSegmentInfos,
|
||||
})
|
||||
}, s.bfsFactory)
|
||||
}
|
||||
|
||||
func (s *MetaCacheSuite) TestNewSegment() {
|
||||
@ -86,10 +94,10 @@ func (s *MetaCacheSuite) TestNewSegment() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *MetaCacheSuite) TestUpdateSegment() {
|
||||
func (s *MetaCacheSuite) TestCompactSegments() {
|
||||
for i, seg := range s.newSegments {
|
||||
// compaction from flushed[i], unflushed[i] and invalidSeg to new[i]
|
||||
s.cache.UpdateSegment(seg, s.partitionIDs[i], s.flushedSegments[i], s.growingSegments[i], s.invaliedSeg)
|
||||
s.cache.CompactSegments(seg, s.partitionIDs[i], s.flushedSegments[i], s.growingSegments[i], s.invaliedSeg)
|
||||
}
|
||||
|
||||
for i, partitionID := range s.partitionIDs {
|
||||
@ -101,6 +109,14 @@ func (s *MetaCacheSuite) TestUpdateSegment() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *MetaCacheSuite) TestUpdateSegments() {
|
||||
s.cache.UpdateSegments(UpdateState(commonpb.SegmentState_Flushed), WithSegmentID(5))
|
||||
segments := s.cache.GetSegmentsBy(WithSegmentID(5))
|
||||
s.Require().Equal(1, len(segments))
|
||||
segment := segments[0]
|
||||
s.Equal(commonpb.SegmentState_Flushed, segment.State())
|
||||
}
|
||||
|
||||
func TestMetaCacheSuite(t *testing.T) {
|
||||
suite.Run(t, new(MetaCacheSuite))
|
||||
}
|
||||
|
89
internal/datanode/metacache/segment.go
Normal file
89
internal/datanode/metacache/segment.go
Normal file
@ -0,0 +1,89 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metacache
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
)
|
||||
|
||||
type SegmentInfo struct {
|
||||
segmentID int64
|
||||
partitionID int64
|
||||
state commonpb.SegmentState
|
||||
startPosition *msgpb.MsgPosition
|
||||
checkpoint *msgpb.MsgPosition
|
||||
startPosRecorded bool
|
||||
numOfRows int64
|
||||
bfs *BloomFilterSet
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) SegmentID() int64 {
|
||||
return s.segmentID
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) PartitionID() int64 {
|
||||
return s.partitionID
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) State() commonpb.SegmentState {
|
||||
return s.state
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) NumOfRows() int64 {
|
||||
return s.numOfRows
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) StartPosition() *msgpb.MsgPosition {
|
||||
return s.startPosition
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) Checkpoint() *msgpb.MsgPosition {
|
||||
return s.checkpoint
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) GetHistory() []*storage.PkStatistics {
|
||||
return s.bfs.GetHistory()
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) Clone() *SegmentInfo {
|
||||
return &SegmentInfo{
|
||||
segmentID: s.segmentID,
|
||||
partitionID: s.partitionID,
|
||||
state: s.state,
|
||||
startPosition: s.startPosition,
|
||||
checkpoint: s.checkpoint,
|
||||
startPosRecorded: s.startPosRecorded,
|
||||
numOfRows: s.numOfRows,
|
||||
bfs: s.bfs,
|
||||
}
|
||||
}
|
||||
|
||||
func newSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo {
|
||||
return &SegmentInfo{
|
||||
segmentID: info.GetID(),
|
||||
partitionID: info.GetPartitionID(),
|
||||
state: info.GetState(),
|
||||
numOfRows: info.GetNumOfRows(),
|
||||
startPosition: info.GetStartPosition(),
|
||||
checkpoint: info.GetDmlPosition(),
|
||||
startPosRecorded: true,
|
||||
bfs: bfs,
|
||||
}
|
||||
}
|
60
internal/datanode/metacache/segment_test.go
Normal file
60
internal/datanode/metacache/segment_test.go
Normal file
@ -0,0 +1,60 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metacache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
)
|
||||
|
||||
type SegmentSuite struct {
|
||||
suite.Suite
|
||||
|
||||
info *datapb.SegmentInfo
|
||||
}
|
||||
|
||||
func (s *SegmentSuite) TestBasic() {
|
||||
bfs := newBloomFilterSet()
|
||||
segment := newSegmentInfo(s.info, bfs)
|
||||
s.Equal(s.info.GetID(), segment.SegmentID())
|
||||
s.Equal(s.info.GetPartitionID(), segment.PartitionID())
|
||||
s.Equal(s.info.GetNumOfRows(), segment.NumOfRows())
|
||||
s.Equal(s.info.GetStartPosition(), segment.StartPosition())
|
||||
s.Equal(s.info.GetDmlPosition(), segment.Checkpoint())
|
||||
s.Equal(bfs.GetHistory(), segment.GetHistory())
|
||||
s.True(segment.startPosRecorded)
|
||||
}
|
||||
|
||||
func (s *SegmentSuite) TestClone() {
|
||||
bfs := newBloomFilterSet()
|
||||
segment := newSegmentInfo(s.info, bfs)
|
||||
cloned := segment.Clone()
|
||||
s.Equal(segment.SegmentID(), cloned.SegmentID())
|
||||
s.Equal(segment.PartitionID(), cloned.PartitionID())
|
||||
s.Equal(segment.NumOfRows(), cloned.NumOfRows())
|
||||
s.Equal(segment.StartPosition(), cloned.StartPosition())
|
||||
s.Equal(segment.Checkpoint(), cloned.Checkpoint())
|
||||
s.Equal(segment.GetHistory(), cloned.GetHistory())
|
||||
s.Equal(segment.startPosRecorded, cloned.startPosRecorded)
|
||||
}
|
||||
|
||||
func TestSegment(t *testing.T) {
|
||||
suite.Run(t, new(SegmentSuite))
|
||||
}
|
Loading…
Reference in New Issue
Block a user