mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Filter duplicated sealed segments which are both online and offline (#17593)
Signed-off-by: Letian Jiang <letian.jiang@zilliz.com>
This commit is contained in:
parent
1f6fbf91b2
commit
be4077421e
@ -50,6 +50,8 @@ import (
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
@ -61,7 +63,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
)
|
||||
|
||||
// make sure QueryNode implements types.QueryNode
|
||||
@ -376,6 +377,8 @@ func (node *QueryNode) handleSealedSegmentsChangeInfo(info *querypb.SealedSegmen
|
||||
log.Warn("failed to validate vchannel for SegmentChangeInfo", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
// ignore segments that are online and offline in the same QueryNode
|
||||
filterDuplicateChangeInfo(line)
|
||||
|
||||
node.ShardClusterService.HandoffVChannelSegments(vchannel, line)
|
||||
}
|
||||
@ -407,3 +410,41 @@ func validateChangeChannel(info *querypb.SegmentChangeInfo) (string, error) {
|
||||
|
||||
return channelName, nil
|
||||
}
|
||||
|
||||
// filterDuplicateChangeInfo filters out duplicated sealed segments which are both online and offline (Fix issue#17347)
|
||||
func filterDuplicateChangeInfo(line *querypb.SegmentChangeInfo) {
|
||||
if line.OnlineNodeID == line.OfflineNodeID {
|
||||
dupSegmentIDs := make(map[UniqueID]struct{})
|
||||
for _, onlineSegment := range line.OnlineSegments {
|
||||
for _, offlineSegment := range line.OfflineSegments {
|
||||
if onlineSegment.SegmentID == offlineSegment.SegmentID && onlineSegment.SegmentState == segmentTypeSealed && offlineSegment.SegmentState == segmentTypeSealed {
|
||||
dupSegmentIDs[onlineSegment.SegmentID] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(dupSegmentIDs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var dupSegmentIDsList []UniqueID
|
||||
for sid := range dupSegmentIDs {
|
||||
dupSegmentIDsList = append(dupSegmentIDsList, sid)
|
||||
}
|
||||
log.Warn("Found sealed segments are that are online and offline.", zap.Int64s("SegmentIDs", dupSegmentIDsList))
|
||||
|
||||
var filteredOnlineSegments []*querypb.SegmentInfo
|
||||
for _, onlineSegment := range line.OnlineSegments {
|
||||
if _, ok := dupSegmentIDs[onlineSegment.SegmentID]; !ok {
|
||||
filteredOnlineSegments = append(filteredOnlineSegments, onlineSegment)
|
||||
}
|
||||
}
|
||||
line.OnlineSegments = filteredOnlineSegments
|
||||
var filteredOfflineSegments []*querypb.SegmentInfo
|
||||
for _, offlineSegment := range line.OfflineSegments {
|
||||
if _, ok := dupSegmentIDs[offlineSegment.SegmentID]; !ok {
|
||||
filteredOfflineSegments = append(filteredOfflineSegments, offlineSegment)
|
||||
}
|
||||
}
|
||||
line.OfflineSegments = filteredOfflineSegments
|
||||
}
|
||||
}
|
||||
|
@ -413,6 +413,74 @@ func TestQueryNode_validateChangeChannel(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryNode_filterDuplicateChangeInfo(t *testing.T) {
|
||||
t.Run("dup change info", func(t *testing.T) {
|
||||
info := &querypb.SegmentChangeInfo{
|
||||
OnlineNodeID: 233,
|
||||
OnlineSegments: []*querypb.SegmentInfo{
|
||||
{
|
||||
SegmentID: 23333,
|
||||
SegmentState: segmentTypeSealed,
|
||||
},
|
||||
},
|
||||
OfflineNodeID: 233,
|
||||
OfflineSegments: []*querypb.SegmentInfo{
|
||||
{
|
||||
SegmentID: 23333,
|
||||
SegmentState: segmentTypeSealed,
|
||||
},
|
||||
},
|
||||
}
|
||||
filterDuplicateChangeInfo(info)
|
||||
assert.Equal(t, 0, len(info.OnlineSegments))
|
||||
assert.Equal(t, 0, len(info.OfflineSegments))
|
||||
})
|
||||
|
||||
t.Run("normal change info1", func(t *testing.T) {
|
||||
info := &querypb.SegmentChangeInfo{
|
||||
OnlineNodeID: 233,
|
||||
OnlineSegments: []*querypb.SegmentInfo{
|
||||
{
|
||||
SegmentID: 23333,
|
||||
SegmentState: segmentTypeSealed,
|
||||
},
|
||||
},
|
||||
OfflineNodeID: 234,
|
||||
OfflineSegments: []*querypb.SegmentInfo{
|
||||
{
|
||||
SegmentID: 23333,
|
||||
SegmentState: segmentTypeSealed,
|
||||
},
|
||||
},
|
||||
}
|
||||
filterDuplicateChangeInfo(info)
|
||||
assert.Equal(t, 1, len(info.OnlineSegments))
|
||||
assert.Equal(t, 1, len(info.OfflineSegments))
|
||||
})
|
||||
|
||||
t.Run("normal change info2", func(t *testing.T) {
|
||||
info := &querypb.SegmentChangeInfo{
|
||||
OnlineNodeID: 233,
|
||||
OnlineSegments: []*querypb.SegmentInfo{
|
||||
{
|
||||
SegmentID: 23333,
|
||||
SegmentState: segmentTypeSealed,
|
||||
},
|
||||
},
|
||||
OfflineNodeID: 234,
|
||||
OfflineSegments: []*querypb.SegmentInfo{
|
||||
{
|
||||
SegmentID: 23333,
|
||||
SegmentState: segmentTypeSealed,
|
||||
},
|
||||
},
|
||||
}
|
||||
filterDuplicateChangeInfo(info)
|
||||
assert.Equal(t, 1, len(info.OnlineSegments))
|
||||
assert.Equal(t, 1, len(info.OfflineSegments))
|
||||
})
|
||||
}
|
||||
|
||||
func TestQueryNode_handleSealedSegmentsChangeInfo(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
Loading…
Reference in New Issue
Block a user