2022-10-11 11:39:22 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2022-09-15 18:48:32 +08:00
|
|
|
package observers
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2023-09-21 09:45:27 +08:00
|
|
|
"github.com/samber/lo"
|
2023-04-06 19:14:32 +08:00
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2023-06-09 01:28:37 +08:00
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
2022-09-15 18:48:32 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
2023-04-06 19:14:32 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
2022-09-15 18:48:32 +08:00
|
|
|
)
|
|
|
|
|
2023-03-27 00:42:00 +08:00
|
|
|
const (
|
|
|
|
interval = 1 * time.Second
|
|
|
|
RPCTimeout = 3 * time.Second
|
|
|
|
)
|
2022-09-15 18:48:32 +08:00
|
|
|
|
|
|
|
// LeaderObserver is to sync the distribution with leader
|
|
|
|
type LeaderObserver struct {
|
2023-06-27 11:48:45 +08:00
|
|
|
wg sync.WaitGroup
|
|
|
|
closeCh chan struct{}
|
|
|
|
dist *meta.DistributionManager
|
|
|
|
meta *meta.Meta
|
|
|
|
target *meta.TargetManager
|
|
|
|
broker meta.Broker
|
|
|
|
cluster session.Cluster
|
|
|
|
manualCheck chan checkRequest
|
2022-10-19 12:13:28 +08:00
|
|
|
|
|
|
|
stopOnce sync.Once
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (o *LeaderObserver) Start(ctx context.Context) {
|
|
|
|
o.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer o.wg.Done()
|
|
|
|
ticker := time.NewTicker(interval)
|
2023-02-23 18:59:45 +08:00
|
|
|
defer ticker.Stop()
|
2022-09-15 18:48:32 +08:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-o.closeCh:
|
|
|
|
log.Info("stop leader observer")
|
|
|
|
return
|
|
|
|
case <-ctx.Done():
|
|
|
|
log.Info("stop leader observer due to ctx done")
|
|
|
|
return
|
2023-06-27 11:48:45 +08:00
|
|
|
case req := <-o.manualCheck:
|
|
|
|
log.Info("triggering manual check")
|
|
|
|
ret := o.observeCollection(ctx, req.CollectionID)
|
|
|
|
req.Notifier <- ret
|
|
|
|
log.Info("manual check done", zap.Bool("result", ret))
|
|
|
|
|
2022-09-15 18:48:32 +08:00
|
|
|
case <-ticker.C:
|
|
|
|
o.observe(ctx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *LeaderObserver) Stop() {
|
2022-10-19 12:13:28 +08:00
|
|
|
o.stopOnce.Do(func() {
|
|
|
|
close(o.closeCh)
|
|
|
|
o.wg.Wait()
|
|
|
|
})
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (o *LeaderObserver) observe(ctx context.Context) {
|
|
|
|
o.observeSegmentsDist(ctx)
|
|
|
|
}
|
|
|
|
|
2023-09-01 11:17:01 +08:00
|
|
|
func (o *LeaderObserver) readyToObserve(collectionID int64) bool {
|
|
|
|
metaExist := (o.meta.GetCollection(collectionID) != nil)
|
|
|
|
targetExist := o.target.IsNextTargetExist(collectionID) || o.target.IsCurrentTargetExist(collectionID)
|
|
|
|
|
|
|
|
return metaExist && targetExist
|
|
|
|
}
|
|
|
|
|
2022-09-15 18:48:32 +08:00
|
|
|
func (o *LeaderObserver) observeSegmentsDist(ctx context.Context) {
|
|
|
|
collectionIDs := o.meta.CollectionManager.GetAll()
|
|
|
|
for _, cid := range collectionIDs {
|
2023-09-01 11:17:01 +08:00
|
|
|
if o.readyToObserve(cid) {
|
|
|
|
o.observeCollection(ctx, cid)
|
|
|
|
}
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-27 11:48:45 +08:00
|
|
|
func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64) bool {
|
2022-09-15 18:48:32 +08:00
|
|
|
replicas := o.meta.ReplicaManager.GetByCollection(collection)
|
2023-06-27 11:48:45 +08:00
|
|
|
result := true
|
2022-09-15 18:48:32 +08:00
|
|
|
for _, replica := range replicas {
|
|
|
|
leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
|
|
|
|
for ch, leaderID := range leaders {
|
|
|
|
leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch)
|
|
|
|
if leaderView == nil {
|
|
|
|
continue
|
|
|
|
}
|
2022-09-23 15:16:51 +08:00
|
|
|
dists := o.dist.SegmentDistManager.GetByShardWithReplica(ch, replica)
|
2023-06-27 11:48:45 +08:00
|
|
|
|
|
|
|
actions := o.findNeedLoadedSegments(leaderView, dists)
|
|
|
|
actions = append(actions, o.findNeedRemovedSegments(leaderView, dists)...)
|
2023-08-03 15:55:09 +08:00
|
|
|
updateVersionAction := o.checkNeedUpdateTargetVersion(ctx, leaderView)
|
2023-06-27 11:48:45 +08:00
|
|
|
if updateVersionAction != nil {
|
|
|
|
actions = append(actions, updateVersionAction)
|
|
|
|
}
|
|
|
|
success := o.sync(ctx, replica.GetID(), leaderView, actions)
|
|
|
|
if !success {
|
|
|
|
result = false
|
|
|
|
}
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
}
|
2023-06-27 11:48:45 +08:00
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ob *LeaderObserver) CheckTargetVersion(collectionID int64) bool {
|
|
|
|
notifier := make(chan bool)
|
|
|
|
ob.manualCheck <- checkRequest{
|
|
|
|
CollectionID: collectionID,
|
|
|
|
Notifier: notifier,
|
|
|
|
}
|
|
|
|
return <-notifier
|
|
|
|
}
|
|
|
|
|
2023-08-03 15:55:09 +08:00
|
|
|
func (o *LeaderObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction {
|
|
|
|
log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60)
|
2023-06-27 11:48:45 +08:00
|
|
|
targetVersion := o.target.GetCollectionTargetVersion(leaderView.CollectionID, meta.CurrentTarget)
|
|
|
|
|
|
|
|
if targetVersion <= leaderView.TargetVersion {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-08-03 15:55:09 +08:00
|
|
|
log.RatedInfo(10, "Update readable segment version",
|
2023-06-27 11:48:45 +08:00
|
|
|
zap.Int64("collectionID", leaderView.CollectionID),
|
|
|
|
zap.String("channelName", leaderView.Channel),
|
|
|
|
zap.Int64("nodeID", leaderView.ID),
|
|
|
|
zap.Int64("oldVersion", leaderView.TargetVersion),
|
|
|
|
zap.Int64("newVersion", targetVersion),
|
|
|
|
)
|
|
|
|
|
2023-07-06 10:30:26 +08:00
|
|
|
sealedSegments := o.target.GetHistoricalSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget)
|
|
|
|
growingSegments := o.target.GetStreamingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget)
|
2023-07-24 14:09:00 +08:00
|
|
|
droppedSegments := o.target.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget)
|
|
|
|
|
2023-06-27 11:48:45 +08:00
|
|
|
return &querypb.SyncAction{
|
|
|
|
Type: querypb.SyncType_UpdateVersion,
|
|
|
|
GrowingInTarget: growingSegments.Collect(),
|
|
|
|
SealedInTarget: lo.Keys(sealedSegments),
|
2023-07-24 14:09:00 +08:00
|
|
|
DroppedInTarget: droppedSegments,
|
2023-06-27 11:48:45 +08:00
|
|
|
TargetVersion: targetVersion,
|
|
|
|
}
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dists []*meta.Segment) []*querypb.SyncAction {
|
|
|
|
ret := make([]*querypb.SyncAction, 0)
|
|
|
|
dists = utils.FindMaxVersionSegments(dists)
|
|
|
|
for _, s := range dists {
|
2022-09-28 12:10:54 +08:00
|
|
|
version, ok := leaderView.Segments[s.GetID()]
|
2023-03-27 00:42:00 +08:00
|
|
|
currentTarget := o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.CurrentTarget)
|
|
|
|
existInCurrentTarget := currentTarget != nil
|
2022-11-11 11:43:06 +08:00
|
|
|
existInNextTarget := o.target.GetHistoricalSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil
|
2023-03-27 00:42:00 +08:00
|
|
|
|
|
|
|
if !existInCurrentTarget && !existInNextTarget {
|
2022-09-15 18:48:32 +08:00
|
|
|
continue
|
|
|
|
}
|
2023-03-27 00:42:00 +08:00
|
|
|
|
|
|
|
if !ok || version.GetVersion() < s.Version { // Leader misses this segment
|
|
|
|
ctx := context.Background()
|
|
|
|
resp, err := o.broker.GetSegmentInfo(ctx, s.GetID())
|
|
|
|
if err != nil || len(resp.GetInfos()) == 0 {
|
|
|
|
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
|
|
|
|
continue
|
|
|
|
}
|
2023-08-08 11:17:08 +08:00
|
|
|
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
|
2023-03-27 00:42:00 +08:00
|
|
|
|
2023-08-11 11:21:32 +08:00
|
|
|
log.Debug("leader observer append a segment to set",
|
|
|
|
zap.Int64("collectionID", leaderView.CollectionID),
|
|
|
|
zap.String("channel", leaderView.Channel),
|
|
|
|
zap.Int64("leaderViewID", leaderView.ID),
|
|
|
|
zap.Int64("segmentID", s.GetID()),
|
|
|
|
zap.Int64("nodeID", s.Node))
|
2023-03-27 00:42:00 +08:00
|
|
|
ret = append(ret, &querypb.SyncAction{
|
|
|
|
Type: querypb.SyncType_Set,
|
|
|
|
PartitionID: s.GetPartitionID(),
|
|
|
|
SegmentID: s.GetID(),
|
|
|
|
NodeID: s.Node,
|
|
|
|
Version: s.Version,
|
|
|
|
Info: loadInfo,
|
|
|
|
})
|
|
|
|
}
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, dists []*meta.Segment) []*querypb.SyncAction {
|
|
|
|
ret := make([]*querypb.SyncAction, 0)
|
|
|
|
distMap := make(map[int64]struct{})
|
|
|
|
for _, s := range dists {
|
|
|
|
distMap[s.GetID()] = struct{}{}
|
|
|
|
}
|
2023-03-29 14:06:02 +08:00
|
|
|
for sid, s := range leaderView.Segments {
|
2022-09-15 18:48:32 +08:00
|
|
|
_, ok := distMap[sid]
|
2022-11-11 11:43:06 +08:00
|
|
|
existInCurrentTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil
|
|
|
|
existInNextTarget := o.target.GetHistoricalSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil
|
|
|
|
if ok || existInCurrentTarget || existInNextTarget {
|
2022-09-15 18:48:32 +08:00
|
|
|
continue
|
|
|
|
}
|
2023-08-11 11:21:32 +08:00
|
|
|
log.Debug("leader observer append a segment to remove",
|
|
|
|
zap.Int64("collectionID", leaderView.CollectionID),
|
|
|
|
zap.String("channel", leaderView.Channel),
|
|
|
|
zap.Int64("leaderViewID", leaderView.ID),
|
|
|
|
zap.Int64("segmentID", sid),
|
|
|
|
zap.Int64("nodeID", s.NodeID))
|
2022-09-15 18:48:32 +08:00
|
|
|
ret = append(ret, &querypb.SyncAction{
|
|
|
|
Type: querypb.SyncType_Remove,
|
|
|
|
SegmentID: sid,
|
2023-03-29 14:06:02 +08:00
|
|
|
NodeID: s.NodeID,
|
2022-09-15 18:48:32 +08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
2023-06-27 11:48:45 +08:00
|
|
|
func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool {
|
2022-09-28 12:10:54 +08:00
|
|
|
if len(diffs) == 0 {
|
2023-06-27 11:48:45 +08:00
|
|
|
return true
|
2022-09-28 12:10:54 +08:00
|
|
|
}
|
|
|
|
|
2022-09-15 18:48:32 +08:00
|
|
|
log := log.With(
|
|
|
|
zap.Int64("leaderID", leaderView.ID),
|
|
|
|
zap.Int64("collectionID", leaderView.CollectionID),
|
|
|
|
zap.String("channel", leaderView.Channel),
|
|
|
|
)
|
2023-04-07 19:32:29 +08:00
|
|
|
|
|
|
|
schema, err := o.broker.GetCollectionSchema(ctx, leaderView.CollectionID)
|
|
|
|
if err != nil {
|
2023-08-14 18:57:32 +08:00
|
|
|
log.Warn("sync distribution failed, cannot get schema of collection", zap.Error(err))
|
2023-06-27 11:48:45 +08:00
|
|
|
return false
|
2023-04-07 19:32:29 +08:00
|
|
|
}
|
|
|
|
partitions, err := utils.GetPartitions(o.meta.CollectionManager, leaderView.CollectionID)
|
|
|
|
if err != nil {
|
2023-08-14 18:57:32 +08:00
|
|
|
log.Warn("sync distribution failed, cannot get partitions of collection", zap.Error(err))
|
2023-06-27 11:48:45 +08:00
|
|
|
return false
|
2023-04-07 19:32:29 +08:00
|
|
|
}
|
|
|
|
|
2022-09-15 18:48:32 +08:00
|
|
|
req := &querypb.SyncDistributionRequest{
|
2022-10-21 15:57:28 +08:00
|
|
|
Base: commonpbutil.NewMsgBase(
|
|
|
|
commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),
|
|
|
|
),
|
2022-09-15 18:48:32 +08:00
|
|
|
CollectionID: leaderView.CollectionID,
|
2023-04-07 19:32:29 +08:00
|
|
|
ReplicaID: replicaID,
|
2022-09-15 18:48:32 +08:00
|
|
|
Channel: leaderView.Channel,
|
|
|
|
Actions: diffs,
|
2023-04-07 19:32:29 +08:00
|
|
|
Schema: schema,
|
|
|
|
LoadMeta: &querypb.LoadMetaInfo{
|
|
|
|
LoadType: o.meta.GetLoadType(leaderView.CollectionID),
|
|
|
|
CollectionID: leaderView.CollectionID,
|
|
|
|
PartitionIDs: partitions,
|
|
|
|
},
|
|
|
|
Version: time.Now().UnixNano(),
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
resp, err := o.cluster.SyncDistribution(ctx, leaderView.ID, req)
|
|
|
|
if err != nil {
|
2023-08-14 18:57:32 +08:00
|
|
|
log.Warn("failed to sync distribution", zap.Error(err))
|
2023-06-27 11:48:45 +08:00
|
|
|
return false
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
if resp.ErrorCode != commonpb.ErrorCode_Success {
|
2023-08-14 18:57:32 +08:00
|
|
|
log.Warn("failed to sync distribution", zap.String("reason", resp.GetReason()))
|
2023-06-27 11:48:45 +08:00
|
|
|
return false
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
2023-06-27 11:48:45 +08:00
|
|
|
|
|
|
|
return true
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewLeaderObserver(
|
|
|
|
dist *meta.DistributionManager,
|
|
|
|
meta *meta.Meta,
|
|
|
|
targetMgr *meta.TargetManager,
|
2023-03-27 00:42:00 +08:00
|
|
|
broker meta.Broker,
|
2022-09-15 18:48:32 +08:00
|
|
|
cluster session.Cluster,
|
|
|
|
) *LeaderObserver {
|
|
|
|
return &LeaderObserver{
|
2023-06-27 11:48:45 +08:00
|
|
|
closeCh: make(chan struct{}),
|
|
|
|
dist: dist,
|
|
|
|
meta: meta,
|
|
|
|
target: targetMgr,
|
|
|
|
broker: broker,
|
|
|
|
cluster: cluster,
|
|
|
|
manualCheck: make(chan checkRequest, 10),
|
2022-09-15 18:48:32 +08:00
|
|
|
}
|
|
|
|
}
|