mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-10 16:32:06 +08:00
26f06dd732
Signed-off-by: SimFG <bang.fu@zilliz.com>
295 lines
8.0 KiB
Go
295 lines
8.0 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package msgdispatcher
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/metrics"
|
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
|
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
type signal int32
|
|
|
|
const (
|
|
start signal = 0
|
|
pause signal = 1
|
|
resume signal = 2
|
|
terminate signal = 3
|
|
)
|
|
|
|
var signalString = map[int32]string{
|
|
0: "start",
|
|
1: "pause",
|
|
2: "resume",
|
|
3: "terminate",
|
|
}
|
|
|
|
func (s signal) String() string {
|
|
return signalString[int32(s)]
|
|
}
|
|
|
|
type Dispatcher struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
done chan struct{}
|
|
wg sync.WaitGroup
|
|
once sync.Once
|
|
|
|
isMain bool // indicates if it's a main dispatcher
|
|
pchannel string
|
|
curTs atomic.Uint64
|
|
|
|
lagNotifyChan chan struct{}
|
|
lagTargets *typeutil.ConcurrentMap[string, *target] // vchannel -> *target
|
|
|
|
// vchannel -> *target, lock free since we guarantee that
|
|
// it's modified only after dispatcher paused or terminated
|
|
targets map[string]*target
|
|
|
|
stream msgstream.MsgStream
|
|
}
|
|
|
|
func NewDispatcher(ctx context.Context,
|
|
factory msgstream.Factory,
|
|
isMain bool,
|
|
pchannel string,
|
|
position *Pos,
|
|
subName string,
|
|
subPos SubPos,
|
|
lagNotifyChan chan struct{},
|
|
lagTargets *typeutil.ConcurrentMap[string, *target],
|
|
) (*Dispatcher, error) {
|
|
log := log.With(zap.String("pchannel", pchannel),
|
|
zap.String("subName", subName), zap.Bool("isMain", isMain))
|
|
log.Info("creating dispatcher...")
|
|
stream, err := factory.NewTtMsgStream(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if position != nil && len(position.MsgID) != 0 {
|
|
position.ChannelName = funcutil.ToPhysicalChannel(position.ChannelName)
|
|
err = stream.AsConsumer(ctx, []string{pchannel}, subName, mqwrapper.SubscriptionPositionUnknown)
|
|
if err != nil {
|
|
log.Error("asConsumer failed", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
err = stream.Seek(ctx, []*Pos{position})
|
|
if err != nil {
|
|
stream.Close()
|
|
log.Error("seek failed", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
posTime := tsoutil.PhysicalTime(position.GetTimestamp())
|
|
log.Info("seek successfully", zap.Time("posTime", posTime),
|
|
zap.Duration("tsLag", time.Since(posTime)))
|
|
} else {
|
|
err := stream.AsConsumer(ctx, []string{pchannel}, subName, subPos)
|
|
if err != nil {
|
|
log.Error("asConsumer failed", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
log.Info("asConsumer successfully")
|
|
}
|
|
|
|
d := &Dispatcher{
|
|
done: make(chan struct{}, 1),
|
|
isMain: isMain,
|
|
pchannel: pchannel,
|
|
lagNotifyChan: lagNotifyChan,
|
|
lagTargets: lagTargets,
|
|
targets: make(map[string]*target),
|
|
stream: stream,
|
|
}
|
|
|
|
metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Inc()
|
|
return d, nil
|
|
}
|
|
|
|
func (d *Dispatcher) CurTs() typeutil.Timestamp {
|
|
return d.curTs.Load()
|
|
}
|
|
|
|
func (d *Dispatcher) AddTarget(t *target) {
|
|
log := log.With(zap.String("vchannel", t.vchannel), zap.Bool("isMain", d.isMain))
|
|
if _, ok := d.targets[t.vchannel]; ok {
|
|
log.Warn("target exists")
|
|
return
|
|
}
|
|
d.targets[t.vchannel] = t
|
|
log.Info("add new target")
|
|
}
|
|
|
|
func (d *Dispatcher) GetTarget(vchannel string) (*target, error) {
|
|
if t, ok := d.targets[vchannel]; ok {
|
|
return t, nil
|
|
}
|
|
return nil, fmt.Errorf("cannot find target, vchannel=%s, isMain=%t", vchannel, d.isMain)
|
|
}
|
|
|
|
func (d *Dispatcher) CloseTarget(vchannel string) {
|
|
log := log.With(zap.String("vchannel", vchannel), zap.Bool("isMain", d.isMain))
|
|
if t, ok := d.targets[vchannel]; ok {
|
|
t.close()
|
|
delete(d.targets, vchannel)
|
|
log.Info("closed target")
|
|
} else {
|
|
log.Warn("target not exist")
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) TargetNum() int {
|
|
return len(d.targets)
|
|
}
|
|
|
|
func (d *Dispatcher) Handle(signal signal) {
|
|
log := log.With(zap.String("pchannel", d.pchannel),
|
|
zap.String("signal", signal.String()), zap.Bool("isMain", d.isMain))
|
|
log.Info("get signal")
|
|
switch signal {
|
|
case start:
|
|
d.ctx, d.cancel = context.WithCancel(context.Background())
|
|
d.wg.Add(1)
|
|
go d.work()
|
|
case pause:
|
|
d.done <- struct{}{}
|
|
d.cancel()
|
|
d.wg.Wait()
|
|
case resume:
|
|
d.ctx, d.cancel = context.WithCancel(context.Background())
|
|
d.wg.Add(1)
|
|
go d.work()
|
|
case terminate:
|
|
d.done <- struct{}{}
|
|
d.cancel()
|
|
d.wg.Wait()
|
|
d.once.Do(func() {
|
|
metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Dec()
|
|
d.stream.Close()
|
|
})
|
|
}
|
|
log.Info("handle signal done")
|
|
}
|
|
|
|
func (d *Dispatcher) work() {
|
|
log := log.With(zap.String("pchannel", d.pchannel), zap.Bool("isMain", d.isMain))
|
|
log.Info("begin to work")
|
|
defer d.wg.Done()
|
|
for {
|
|
select {
|
|
case <-d.done:
|
|
log.Info("stop working")
|
|
return
|
|
case pack := <-d.stream.Chan():
|
|
if pack == nil || len(pack.EndPositions) != 1 {
|
|
log.Error("consumed invalid msgPack")
|
|
continue
|
|
}
|
|
d.curTs.Store(pack.EndPositions[0].GetTimestamp())
|
|
|
|
targetPacks := d.groupingMsgs(pack)
|
|
for vchannel, p := range targetPacks {
|
|
var err error
|
|
t := d.targets[vchannel]
|
|
if d.isMain {
|
|
// for main dispatcher, split target if err occurs
|
|
err = t.send(p)
|
|
} else {
|
|
// for solo dispatcher, only 1 target exists, we should
|
|
// keep retrying if err occurs, unless it paused or terminated.
|
|
for {
|
|
err = t.send(p)
|
|
if err == nil || !funcutil.CheckCtxValid(d.ctx) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if err != nil {
|
|
t.pos = pack.StartPositions[0]
|
|
// replace the pChannel with vChannel
|
|
t.pos.ChannelName = t.vchannel
|
|
d.lagTargets.Insert(t.vchannel, t)
|
|
d.nonBlockingNotify()
|
|
delete(d.targets, vchannel)
|
|
log.Warn("lag target notified", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) groupingMsgs(pack *MsgPack) map[string]*MsgPack {
|
|
// init packs for all targets, even though there's no msg in pack,
|
|
// but we still need to dispatch time ticks to the targets.
|
|
targetPacks := make(map[string]*MsgPack)
|
|
for vchannel := range d.targets {
|
|
targetPacks[vchannel] = &MsgPack{
|
|
BeginTs: pack.BeginTs,
|
|
EndTs: pack.EndTs,
|
|
Msgs: make([]msgstream.TsMsg, 0),
|
|
StartPositions: pack.StartPositions,
|
|
EndPositions: pack.EndPositions,
|
|
}
|
|
}
|
|
// group messages by vchannel
|
|
for _, msg := range pack.Msgs {
|
|
var vchannel string
|
|
switch msg.Type() {
|
|
case commonpb.MsgType_Insert:
|
|
vchannel = msg.(*msgstream.InsertMsg).GetShardName()
|
|
case commonpb.MsgType_Delete:
|
|
vchannel = msg.(*msgstream.DeleteMsg).GetShardName()
|
|
}
|
|
if vchannel == "" {
|
|
// for non-dml msg, such as CreateCollection, DropCollection, ...
|
|
// we need to dispatch it to all the vchannels.
|
|
for k := range targetPacks {
|
|
// TODO: There's data race when non-dml msg is sent to different flow graph.
|
|
// Wrong open-trancing information is generated, Fix in future.
|
|
targetPacks[k].Msgs = append(targetPacks[k].Msgs, msg)
|
|
}
|
|
continue
|
|
}
|
|
if _, ok := targetPacks[vchannel]; ok {
|
|
targetPacks[vchannel].Msgs = append(targetPacks[vchannel].Msgs, msg)
|
|
}
|
|
}
|
|
return targetPacks
|
|
}
|
|
|
|
func (d *Dispatcher) nonBlockingNotify() {
|
|
select {
|
|
case d.lagNotifyChan <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|