mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
a52a52064d
See also: #31209 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>
445 lines
14 KiB
Go
445 lines
14 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datanode
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/kv"
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/util/logutil"
|
|
)
|
|
|
|
const retryWatchInterval = 20 * time.Second
|
|
|
|
func (node *DataNode) startWatchChannelsAtBackground(ctx context.Context) {
|
|
node.stopWaiter.Add(1)
|
|
go node.StartWatchChannels(ctx)
|
|
}
|
|
|
|
// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now)
|
|
func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
|
defer node.stopWaiter.Done()
|
|
defer logutil.LogPanic()
|
|
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
|
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
|
|
watchPrefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID))
|
|
log.Info("Start watch channel", zap.String("prefix", watchPrefix))
|
|
evtChan := node.watchKv.WatchWithPrefix(watchPrefix)
|
|
// after watch, first check all exists nodes first
|
|
err := node.checkWatchedList()
|
|
if err != nil {
|
|
log.Warn("StartWatchChannels failed", zap.Error(err))
|
|
return
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Info("watch etcd loop quit")
|
|
return
|
|
case event, ok := <-evtChan:
|
|
if !ok {
|
|
log.Warn("datanode failed to watch channel, return")
|
|
node.startWatchChannelsAtBackground(ctx)
|
|
return
|
|
}
|
|
|
|
if err := event.Err(); err != nil {
|
|
log.Warn("datanode watch channel canceled", zap.Error(event.Err()))
|
|
// https://github.com/etcd-io/etcd/issues/8980
|
|
if event.Err() == v3rpc.ErrCompacted {
|
|
node.startWatchChannelsAtBackground(ctx)
|
|
return
|
|
}
|
|
// if watch loop return due to event canceled, the datanode is not functional anymore
|
|
log.Panic("datanode is not functional for event canceled", zap.Error(err))
|
|
return
|
|
}
|
|
for _, evt := range event.Events {
|
|
// We need to stay in order until events enqueued
|
|
node.handleChannelEvt(evt)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodes are watched
|
|
// serves the corner case for etcd connection lost and missing some events
|
|
func (node *DataNode) checkWatchedList() error {
|
|
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
|
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.serverID))
|
|
keys, values, err := node.watchKv.LoadWithPrefix(prefix)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i, val := range values {
|
|
node.handleWatchInfo(&event{eventType: putEventType}, keys[i], []byte(val))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
|
|
switch e.eventType {
|
|
case putEventType:
|
|
watchInfo, err := parsePutEventData(data)
|
|
if err != nil {
|
|
log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if isEndWatchState(watchInfo.State) {
|
|
log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
|
|
return
|
|
}
|
|
|
|
if watchInfo.Progress != 0 {
|
|
log.Info("DataNode received a PUT event with tickler update progress", zap.String("channel", watchInfo.Vchan.ChannelName), zap.Int64("version", e.version))
|
|
return
|
|
}
|
|
|
|
e.info = watchInfo
|
|
e.vChanName = watchInfo.GetVchan().GetChannelName()
|
|
log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.String("watch state", watchInfo.GetState().String()))
|
|
case deleteEventType:
|
|
e.vChanName = parseDeleteEventKey(key)
|
|
log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))
|
|
}
|
|
|
|
actualManager := node.eventManager.GetOrInsert(e.vChanName, newChannelEventManager(
|
|
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
|
|
))
|
|
|
|
actualManager.handleEvent(*e)
|
|
|
|
// Whenever a delete event comes, this eventManager will be removed from map
|
|
if e.eventType == deleteEventType {
|
|
node.eventManager.Remove(e.vChanName)
|
|
}
|
|
}
|
|
|
|
func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) {
|
|
watchInfo := datapb.ChannelWatchInfo{}
|
|
err := proto.Unmarshal(data, &watchInfo)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err)
|
|
}
|
|
|
|
if watchInfo.Vchan == nil {
|
|
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo")
|
|
}
|
|
reviseVChannelInfo(watchInfo.GetVchan())
|
|
return &watchInfo, nil
|
|
}
|
|
|
|
func parseDeleteEventKey(key string) string {
|
|
parts := strings.Split(key, "/")
|
|
vChanName := parts[len(parts)-1]
|
|
return vChanName
|
|
}
|
|
|
|
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
|
|
vChanName := watchInfo.GetVchan().GetChannelName()
|
|
key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName)
|
|
tickler := newEtcdTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second))
|
|
|
|
switch watchInfo.State {
|
|
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
|
|
if err := node.flowgraphManager.AddandStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
|
|
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
|
|
watchInfo.State = datapb.ChannelWatchState_WatchFailure
|
|
} else {
|
|
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
|
|
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
|
|
}
|
|
case datapb.ChannelWatchState_ToRelease:
|
|
// there is no reason why we release fail
|
|
node.tryToReleaseFlowgraph(vChanName)
|
|
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
|
|
}
|
|
|
|
liteInfo := GetLiteChannelWatchInfo(watchInfo)
|
|
v, err := proto.Marshal(liteInfo)
|
|
if err != nil {
|
|
return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, liteInfo.State.String(), err)
|
|
}
|
|
|
|
success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v))
|
|
// etcd error
|
|
if err != nil {
|
|
// flow graph will leak if not release, causing new datanode failed to subscribe
|
|
node.tryToReleaseFlowgraph(vChanName)
|
|
log.Warn("fail to update watch state to etcd", zap.String("vChanName", vChanName),
|
|
zap.String("state", watchInfo.State.String()), zap.Error(err))
|
|
return err
|
|
}
|
|
// etcd valid but the states updated.
|
|
if !success {
|
|
log.Info("handle put event: failed to compare version and swap, release flowgraph",
|
|
zap.String("key", key), zap.String("state", watchInfo.State.String()),
|
|
zap.String("vChanName", vChanName))
|
|
// flow graph will leak if not release, causing new datanode failed to subscribe
|
|
node.tryToReleaseFlowgraph(vChanName)
|
|
return nil
|
|
}
|
|
log.Info("handle put event success", zap.String("key", key),
|
|
zap.String("state", watchInfo.State.String()), zap.String("vChanName", vChanName))
|
|
return nil
|
|
}
|
|
|
|
func (node *DataNode) handleDeleteEvent(vChanName string) {
|
|
node.tryToReleaseFlowgraph(vChanName)
|
|
}
|
|
|
|
type event struct {
|
|
eventType int
|
|
vChanName string
|
|
version int64
|
|
info *datapb.ChannelWatchInfo
|
|
}
|
|
|
|
type channelEventManager struct {
|
|
sync.Once
|
|
wg sync.WaitGroup
|
|
eventChan chan event
|
|
closeChan chan struct{}
|
|
handlePutEvent func(watchInfo *datapb.ChannelWatchInfo, version int64) error // node.handlePutEvent
|
|
handleDeleteEvent func(vChanName string) // node.handleDeleteEvent
|
|
retryInterval time.Duration
|
|
}
|
|
|
|
const (
|
|
putEventType = 1
|
|
deleteEventType = 2
|
|
)
|
|
|
|
func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) error,
|
|
handleDel func(string), retryInterval time.Duration,
|
|
) *channelEventManager {
|
|
return &channelEventManager{
|
|
eventChan: make(chan event, 10),
|
|
closeChan: make(chan struct{}),
|
|
handlePutEvent: handlePut,
|
|
handleDeleteEvent: handleDel,
|
|
retryInterval: retryInterval,
|
|
}
|
|
}
|
|
|
|
func (e *channelEventManager) Run() {
|
|
e.wg.Add(1)
|
|
go func() {
|
|
defer e.wg.Done()
|
|
for {
|
|
select {
|
|
case event := <-e.eventChan:
|
|
switch event.eventType {
|
|
case putEventType:
|
|
err := e.handlePutEvent(event.info, event.version)
|
|
if err != nil {
|
|
// logging the error is convenient for follow-up investigation of problems
|
|
log.Warn("handle put event failed", zap.String("vChanName", event.vChanName), zap.Error(err))
|
|
}
|
|
case deleteEventType:
|
|
e.handleDeleteEvent(event.vChanName)
|
|
}
|
|
case <-e.closeChan:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (e *channelEventManager) handleEvent(event event) {
|
|
e.eventChan <- event
|
|
}
|
|
|
|
func (e *channelEventManager) Close() {
|
|
e.Do(func() {
|
|
close(e.closeChan)
|
|
e.wg.Wait()
|
|
})
|
|
}
|
|
|
|
func isEndWatchState(state datapb.ChannelWatchState) bool {
|
|
return state != datapb.ChannelWatchState_ToWatch && // start watch
|
|
state != datapb.ChannelWatchState_ToRelease && // start release
|
|
state != datapb.ChannelWatchState_Uncomplete // legacy state, equal to ToWatch
|
|
}
|
|
|
|
type etcdTickler struct {
|
|
progress *atomic.Int32
|
|
version int64
|
|
|
|
kv kv.WatchKV
|
|
path string
|
|
watchInfo *datapb.ChannelWatchInfo
|
|
|
|
interval time.Duration
|
|
closeCh chan struct{}
|
|
closeWg sync.WaitGroup
|
|
isWatchFailed *atomic.Bool
|
|
}
|
|
|
|
func (t *etcdTickler) inc() {
|
|
t.progress.Inc()
|
|
}
|
|
|
|
func (t *etcdTickler) watch() {
|
|
if t.interval == 0 {
|
|
log.Info("zero interval, close ticler watch",
|
|
zap.String("channelName", t.watchInfo.GetVchan().GetChannelName()),
|
|
)
|
|
return
|
|
}
|
|
|
|
t.closeWg.Add(1)
|
|
go func() {
|
|
defer t.closeWg.Done()
|
|
ticker := time.NewTicker(t.interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
nowProgress := t.progress.Load()
|
|
if t.watchInfo.Progress == nowProgress {
|
|
continue
|
|
}
|
|
|
|
t.watchInfo.Progress = nowProgress
|
|
v, err := proto.Marshal(t.watchInfo)
|
|
if err != nil {
|
|
log.Error("fail to marshal watchInfo with progress at tickler",
|
|
zap.String("vChanName", t.watchInfo.Vchan.ChannelName),
|
|
zap.Int32("progree", nowProgress),
|
|
zap.Error(err))
|
|
t.isWatchFailed.Store(true)
|
|
return
|
|
}
|
|
success, err := t.kv.CompareVersionAndSwap(t.path, t.version, string(v))
|
|
if err != nil {
|
|
log.Error("tickler update failed", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
if !success {
|
|
log.Error("tickler update failed: failed to compare version and swap",
|
|
zap.String("key", t.path), zap.Int32("progress", nowProgress), zap.Int64("version", t.version),
|
|
zap.String("vChanName", t.watchInfo.GetVchan().ChannelName))
|
|
t.isWatchFailed.Store(true)
|
|
return
|
|
}
|
|
log.Debug("tickler update success", zap.Int32("progress", nowProgress), zap.Int64("version", t.version),
|
|
zap.String("vChanName", t.watchInfo.GetVchan().ChannelName))
|
|
t.version++
|
|
case <-t.closeCh:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (t *etcdTickler) stop() {
|
|
close(t.closeCh)
|
|
t.closeWg.Wait()
|
|
}
|
|
|
|
func newEtcdTickler(version int64, path string, watchInfo *datapb.ChannelWatchInfo, kv kv.WatchKV, interval time.Duration) *etcdTickler {
|
|
liteWatchInfo := GetLiteChannelWatchInfo(watchInfo)
|
|
return &etcdTickler{
|
|
progress: atomic.NewInt32(0),
|
|
path: path,
|
|
kv: kv,
|
|
watchInfo: liteWatchInfo,
|
|
version: version,
|
|
interval: interval,
|
|
closeCh: make(chan struct{}),
|
|
isWatchFailed: atomic.NewBool(false),
|
|
}
|
|
}
|
|
|
|
// GetLiteChannelWatchInfo clones watchInfo without segmentIDs to reduce the size of the message
|
|
func GetLiteChannelWatchInfo(watchInfo *datapb.ChannelWatchInfo) *datapb.ChannelWatchInfo {
|
|
return &datapb.ChannelWatchInfo{
|
|
Vchan: &datapb.VchannelInfo{
|
|
CollectionID: watchInfo.GetVchan().GetCollectionID(),
|
|
ChannelName: watchInfo.GetVchan().GetChannelName(),
|
|
SeekPosition: watchInfo.GetVchan().GetSeekPosition(),
|
|
},
|
|
StartTs: watchInfo.GetStartTs(),
|
|
State: watchInfo.GetState(),
|
|
TimeoutTs: watchInfo.GetTimeoutTs(),
|
|
Schema: watchInfo.GetSchema(),
|
|
Progress: watchInfo.GetProgress(),
|
|
}
|
|
}
|
|
|
|
type EventManager struct {
|
|
channelGuard sync.Mutex
|
|
channelManagers map[string]*channelEventManager
|
|
}
|
|
|
|
func NewEventManager() *EventManager {
|
|
return &EventManager{
|
|
channelManagers: make(map[string]*channelEventManager),
|
|
}
|
|
}
|
|
|
|
func (m *EventManager) GetOrInsert(channel string, newManager *channelEventManager) *channelEventManager {
|
|
m.channelGuard.Lock()
|
|
defer m.channelGuard.Unlock()
|
|
|
|
eManager, got := m.channelManagers[channel]
|
|
if !got {
|
|
newManager.Run()
|
|
m.channelManagers[channel] = newManager
|
|
return newManager
|
|
}
|
|
|
|
return eManager
|
|
}
|
|
|
|
func (m *EventManager) Remove(channel string) {
|
|
m.channelGuard.Lock()
|
|
eManager, got := m.channelManagers[channel]
|
|
delete(m.channelManagers, channel)
|
|
m.channelGuard.Unlock()
|
|
|
|
if got {
|
|
eManager.Close()
|
|
}
|
|
}
|
|
|
|
func (m *EventManager) CloseAll() {
|
|
m.channelGuard.Lock()
|
|
defer m.channelGuard.Unlock()
|
|
|
|
for channel, eManager := range m.channelManagers {
|
|
delete(m.channelManagers, channel)
|
|
eManager.Close()
|
|
}
|
|
}
|