mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 12:29:36 +08:00
enhance: remove deprecated code within channel manager (#34340)
issue: https://github.com/milvus-io/milvus/issues/33994 only remove deprecated code, no additional changes. Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
0fd0fcfe1d
commit
d1f57aa4ba
@ -269,15 +269,6 @@ func (c *ChannelOpSet) MarshalLogArray(enc zapcore.ArrayEncoder) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChannelStore must satisfy RWChannelStore.
|
|
||||||
var _ RWChannelStore = (*ChannelStore)(nil)
|
|
||||||
|
|
||||||
// ChannelStore maintains a mapping between channels and data nodes.
|
|
||||||
type ChannelStore struct {
|
|
||||||
store kv.TxnKV // A kv store with (NodeChannelKey) -> (ChannelWatchInfos) information.
|
|
||||||
channelsInfo map[int64]*NodeChannelInfo // A map of (nodeID) -> (NodeChannelInfo).
|
|
||||||
}
|
|
||||||
|
|
||||||
// NodeChannelInfo stores the nodeID and its channels.
|
// NodeChannelInfo stores the nodeID and its channels.
|
||||||
type NodeChannelInfo struct {
|
type NodeChannelInfo struct {
|
||||||
NodeID int64
|
NodeID int64
|
||||||
@ -315,261 +306,6 @@ func (info *NodeChannelInfo) GetChannels() []RWChannel {
|
|||||||
return lo.Values(info.Channels)
|
return lo.Values(info.Channels)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChannelStore creates and returns a new ChannelStore.
|
|
||||||
func NewChannelStore(kv kv.TxnKV) *ChannelStore {
|
|
||||||
c := &ChannelStore{
|
|
||||||
store: kv,
|
|
||||||
channelsInfo: make(map[int64]*NodeChannelInfo),
|
|
||||||
}
|
|
||||||
c.channelsInfo[bufferID] = &NodeChannelInfo{
|
|
||||||
NodeID: bufferID,
|
|
||||||
Channels: make(map[string]RWChannel),
|
|
||||||
}
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reload restores the buffer channels and node-channels mapping from kv.
|
|
||||||
func (c *ChannelStore) Reload() error {
|
|
||||||
record := timerecord.NewTimeRecorder("datacoord")
|
|
||||||
keys, values, err := c.store.LoadWithPrefix(Params.CommonCfg.DataCoordWatchSubPath.GetValue())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for i := 0; i < len(keys); i++ {
|
|
||||||
k := keys[i]
|
|
||||||
v := values[i]
|
|
||||||
nodeID, err := parseNodeKey(k)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cw := &datapb.ChannelWatchInfo{}
|
|
||||||
if err := proto.Unmarshal([]byte(v), cw); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
reviseVChannelInfo(cw.GetVchan())
|
|
||||||
|
|
||||||
c.AddNode(nodeID)
|
|
||||||
channel := &channelMeta{
|
|
||||||
Name: cw.GetVchan().GetChannelName(),
|
|
||||||
CollectionID: cw.GetVchan().GetCollectionID(),
|
|
||||||
Schema: cw.GetSchema(),
|
|
||||||
WatchInfo: cw,
|
|
||||||
}
|
|
||||||
c.channelsInfo[nodeID].AddChannel(channel)
|
|
||||||
|
|
||||||
log.Info("channel store reload channel",
|
|
||||||
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
|
|
||||||
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
|
|
||||||
}
|
|
||||||
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddNode creates a new node-channels mapping for the given node, and assigns no channels to it.
|
|
||||||
// Returns immediately if the node's already in the channel.
|
|
||||||
func (c *ChannelStore) AddNode(nodeID int64) {
|
|
||||||
if _, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.channelsInfo[nodeID] = NewNodeChannelInfo(nodeID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update applies the channel operations in opSet.
|
|
||||||
func (c *ChannelStore) Update(opSet *ChannelOpSet) error {
|
|
||||||
totalChannelNum := opSet.GetChannelNumber()
|
|
||||||
if totalChannelNum <= maxOperationsPerTxn {
|
|
||||||
return c.update(opSet)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Split opset into multiple txn. Operations on the same channel must be executed in one txn.
|
|
||||||
perChOps := opSet.SplitByChannel()
|
|
||||||
|
|
||||||
// Execute a txn for every 64 operations.
|
|
||||||
count := 0
|
|
||||||
operations := make([]*ChannelOp, 0, maxOperationsPerTxn)
|
|
||||||
for _, opset := range perChOps {
|
|
||||||
if count+opset.Len() > maxOperationsPerTxn {
|
|
||||||
if err := c.update(NewChannelOpSet(operations...)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
count = 0
|
|
||||||
operations = make([]*ChannelOp, 0, maxOperationsPerTxn)
|
|
||||||
}
|
|
||||||
count += opset.Len()
|
|
||||||
operations = append(operations, opset.Collect()...)
|
|
||||||
}
|
|
||||||
if count == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return c.update(NewChannelOpSet(operations...))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ChannelStore) checkIfExist(nodeID int64, channel RWChannel) bool {
|
|
||||||
if info, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
if ch, ok := info.Channels[channel.GetName()]; ok {
|
|
||||||
return ch.GetCollectionID() == channel.GetCollectionID()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// update applies the ADD/DELETE operations to the current channel store.
|
|
||||||
func (c *ChannelStore) update(opSet *ChannelOpSet) error {
|
|
||||||
// Update ChannelStore's kv store.
|
|
||||||
if err := c.txn(opSet); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update node id -> channel mapping.
|
|
||||||
for _, op := range opSet.Collect() {
|
|
||||||
switch op.Type {
|
|
||||||
case Add, Watch, Release:
|
|
||||||
for _, ch := range op.Channels {
|
|
||||||
if c.checkIfExist(op.NodeID, ch) {
|
|
||||||
continue // prevent adding duplicated channel info
|
|
||||||
}
|
|
||||||
// Append target channels to channel store.
|
|
||||||
c.channelsInfo[op.NodeID].AddChannel(ch)
|
|
||||||
}
|
|
||||||
case Delete:
|
|
||||||
info := c.channelsInfo[op.NodeID]
|
|
||||||
for _, channelName := range op.GetChannelNames() {
|
|
||||||
info.RemoveChannel(channelName)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return errUnknownOpType
|
|
||||||
}
|
|
||||||
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(op.NodeID, 10)).Set(float64(len(c.channelsInfo[op.NodeID].Channels)))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetChannels returns information of all channels.
|
|
||||||
func (c *ChannelStore) GetChannels() []*NodeChannelInfo {
|
|
||||||
ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo))
|
|
||||||
for _, info := range c.channelsInfo {
|
|
||||||
ret = append(ret, info)
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNodesChannels returns the channels assigned to real nodes.
|
|
||||||
func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo {
|
|
||||||
ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo))
|
|
||||||
for id, info := range c.channelsInfo {
|
|
||||||
if id != bufferID {
|
|
||||||
ret = append(ret, info)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
|
|
||||||
nodeChs := make(map[UniqueID][]string)
|
|
||||||
for id, info := range c.channelsInfo {
|
|
||||||
if id == bufferID {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var channelNames []string
|
|
||||||
for name, ch := range info.Channels {
|
|
||||||
if ch.GetCollectionID() == collectionID {
|
|
||||||
channelNames = append(channelNames, name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nodeChs[id] = channelNames
|
|
||||||
}
|
|
||||||
return nodeChs
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBufferChannelInfo returns all unassigned channels.
|
|
||||||
func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
|
|
||||||
if info, ok := c.channelsInfo[bufferID]; ok {
|
|
||||||
return info
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNode returns the channel info of a given node.
|
|
||||||
func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo {
|
|
||||||
if info, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
return info
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int {
|
|
||||||
if info, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
return len(info.Channels)
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveNode removes the given node from the channel store and returns its channels.
|
|
||||||
func (c *ChannelStore) RemoveNode(nodeID int64) {
|
|
||||||
delete(c.channelsInfo, nodeID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNodes returns a slice of all nodes ids in the current channel store.
|
|
||||||
func (c *ChannelStore) GetNodes() []int64 {
|
|
||||||
ids := make([]int64, 0, len(c.channelsInfo))
|
|
||||||
for id := range c.channelsInfo {
|
|
||||||
if id != bufferID {
|
|
||||||
ids = append(ids, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ids
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove deletes kv pairs from the kv store where keys have given nodeID as prefix.
|
|
||||||
func (c *ChannelStore) remove(nodeID int64) error {
|
|
||||||
k := buildKeyPrefix(nodeID)
|
|
||||||
return c.store.RemoveWithPrefix(k)
|
|
||||||
}
|
|
||||||
|
|
||||||
// txn updates the channelStore's kv store with the given channel ops.
|
|
||||||
func (c *ChannelStore) txn(opSet *ChannelOpSet) error {
|
|
||||||
var (
|
|
||||||
saves = make(map[string]string)
|
|
||||||
removals []string
|
|
||||||
)
|
|
||||||
for _, op := range opSet.Collect() {
|
|
||||||
opSaves, opRemovals, err := op.BuildKV()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
saves = lo.Assign(opSaves, saves)
|
|
||||||
removals = append(removals, opRemovals...)
|
|
||||||
}
|
|
||||||
return c.store.MultiSaveAndRemove(saves, removals)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ChannelStore) HasChannel(channel string) bool {
|
|
||||||
for _, info := range c.channelsInfo {
|
|
||||||
for _, ch := range info.Channels {
|
|
||||||
if ch.GetName() == channel {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
|
|
||||||
log.Error("ChannelStore doesn't implement GetNodeChannelsBy")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
|
|
||||||
log.Error("ChannelStore doesn't implement UpdateState")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) {
|
|
||||||
log.Error("ChannelStore doesn't implement SetLegacyChannelByNode")
|
|
||||||
}
|
|
||||||
|
|
||||||
// buildNodeChannelKey generates a key for kv store, where the key is a concatenation of ChannelWatchSubPath, nodeID and channel name.
|
// buildNodeChannelKey generates a key for kv store, where the key is a concatenation of ChannelWatchSubPath, nodeID and channel name.
|
||||||
// ${WatchSubPath}/${nodeID}/${channelName}
|
// ${WatchSubPath}/${nodeID}/${channelName}
|
||||||
func buildNodeChannelKey(nodeID int64, chName string) string {
|
func buildNodeChannelKey(nodeID int64, chName string) string {
|
||||||
@ -589,3 +325,437 @@ func parseNodeKey(key string) (int64, error) {
|
|||||||
}
|
}
|
||||||
return strconv.ParseInt(s[len(s)-2], 10, 64)
|
return strconv.ParseInt(s[len(s)-2], 10, 64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StateChannelStore struct {
|
||||||
|
store kv.TxnKV
|
||||||
|
channelsInfo map[int64]*NodeChannelInfo // A map of (nodeID) -> (NodeChannelInfo).
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ RWChannelStore = (*StateChannelStore)(nil)
|
||||||
|
|
||||||
|
var errChannelNotExistInNode = errors.New("channel doesn't exist in given node")
|
||||||
|
|
||||||
|
func NewChannelStoreV2(kv kv.TxnKV) RWChannelStore {
|
||||||
|
return NewStateChannelStore(kv)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStateChannelStore(kv kv.TxnKV) *StateChannelStore {
|
||||||
|
c := StateChannelStore{
|
||||||
|
store: kv,
|
||||||
|
channelsInfo: make(map[int64]*NodeChannelInfo),
|
||||||
|
}
|
||||||
|
c.channelsInfo[bufferID] = &NodeChannelInfo{
|
||||||
|
NodeID: bufferID,
|
||||||
|
Channels: make(map[string]RWChannel),
|
||||||
|
}
|
||||||
|
return &c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) Reload() error {
|
||||||
|
record := timerecord.NewTimeRecorder("datacoord")
|
||||||
|
keys, values, err := c.store.LoadWithPrefix(Params.CommonCfg.DataCoordWatchSubPath.GetValue())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for i := 0; i < len(keys); i++ {
|
||||||
|
k := keys[i]
|
||||||
|
v := values[i]
|
||||||
|
nodeID, err := parseNodeKey(k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
info := &datapb.ChannelWatchInfo{}
|
||||||
|
if err := proto.Unmarshal([]byte(v), info); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
reviseVChannelInfo(info.GetVchan())
|
||||||
|
|
||||||
|
c.AddNode(nodeID)
|
||||||
|
|
||||||
|
channel := NewStateChannelByWatchInfo(nodeID, info)
|
||||||
|
c.channelsInfo[nodeID].AddChannel(channel)
|
||||||
|
log.Info("channel store reload channel",
|
||||||
|
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
|
||||||
|
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
|
||||||
|
}
|
||||||
|
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) AddNode(nodeID int64) {
|
||||||
|
if _, ok := c.channelsInfo[nodeID]; ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.channelsInfo[nodeID] = &NodeChannelInfo{
|
||||||
|
NodeID: nodeID,
|
||||||
|
Channels: make(map[string]RWChannel),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
|
||||||
|
lo.ForEach(channels, func(ch RWChannel, _ int) {
|
||||||
|
for _, cInfo := range c.channelsInfo {
|
||||||
|
if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok {
|
||||||
|
if isSuccessful {
|
||||||
|
stateChannel.(*StateChannel).TransitionOnSuccess()
|
||||||
|
} else {
|
||||||
|
stateChannel.(*StateChannel).TransitionOnFailure()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) {
|
||||||
|
lo.ForEach(nodeIDs, func(nodeID int64, _ int) {
|
||||||
|
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
||||||
|
for _, ch := range cInfo.Channels {
|
||||||
|
ch.(*StateChannel).setState(Legacy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) Update(opSet *ChannelOpSet) error {
|
||||||
|
// Split opset into multiple txn. Operations on the same channel must be executed in one txn.
|
||||||
|
perChOps := opSet.SplitByChannel()
|
||||||
|
|
||||||
|
// Execute a txn for every 64 operations.
|
||||||
|
count := 0
|
||||||
|
operations := make([]*ChannelOp, 0, maxOperationsPerTxn)
|
||||||
|
for _, opset := range perChOps {
|
||||||
|
if !c.sanityCheckPerChannelOpSet(opset) {
|
||||||
|
log.Error("unsupported ChannelOpSet", zap.Any("OpSet", opset))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if opset.Len() > maxOperationsPerTxn {
|
||||||
|
log.Error("Operations for one channel exceeds maxOperationsPerTxn",
|
||||||
|
zap.Any("opset size", opset.Len()),
|
||||||
|
zap.Int("limit", maxOperationsPerTxn))
|
||||||
|
}
|
||||||
|
if count+opset.Len() > maxOperationsPerTxn {
|
||||||
|
if err := c.updateMeta(NewChannelOpSet(operations...)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
count = 0
|
||||||
|
operations = make([]*ChannelOp, 0, maxOperationsPerTxn)
|
||||||
|
}
|
||||||
|
count += opset.Len()
|
||||||
|
operations = append(operations, opset.Collect()...)
|
||||||
|
}
|
||||||
|
if count == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.updateMeta(NewChannelOpSet(operations...))
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove from the assignments
|
||||||
|
func (c *StateChannelStore) removeAssignment(nodeID int64, channelName string) {
|
||||||
|
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
||||||
|
delete(cInfo.Channels, channelName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) addAssignment(nodeID int64, channel RWChannel) {
|
||||||
|
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
||||||
|
cInfo.Channels[channel.GetName()] = channel
|
||||||
|
} else {
|
||||||
|
c.channelsInfo[nodeID] = &NodeChannelInfo{
|
||||||
|
NodeID: nodeID,
|
||||||
|
Channels: map[string]RWChannel{
|
||||||
|
channel.GetName(): channel,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateMeta applies the WATCH/RELEASE/DELETE operations to the current channel store.
|
||||||
|
// DELETE + WATCH ---> from bufferID to nodeID
|
||||||
|
// DELETE + WATCH ---> from lagecyID to nodeID
|
||||||
|
// DELETE + WATCH ---> from deletedNode to nodeID/bufferID
|
||||||
|
// DELETE + WATCH ---> from releasedNode to nodeID/bufferID
|
||||||
|
// RELEASE ---> release from nodeID
|
||||||
|
// WATCH ---> watch to a new channel
|
||||||
|
// DELETE ---> remove the channel
|
||||||
|
func (c *StateChannelStore) sanityCheckPerChannelOpSet(opSet *ChannelOpSet) bool {
|
||||||
|
if opSet.Len() == 2 {
|
||||||
|
ops := opSet.Collect()
|
||||||
|
return (ops[0].Type == Delete && ops[1].Type == Watch) || (ops[1].Type == Delete && ops[0].Type == Watch)
|
||||||
|
} else if opSet.Len() == 1 {
|
||||||
|
t := opSet.Collect()[0].Type
|
||||||
|
return t == Delete || t == Watch || t == Release
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// DELETE + WATCH
|
||||||
|
func (c *StateChannelStore) updateMetaMemoryForPairOp(chName string, opSet *ChannelOpSet) error {
|
||||||
|
if !c.sanityCheckPerChannelOpSet(opSet) {
|
||||||
|
return errUnknownOpType
|
||||||
|
}
|
||||||
|
ops := opSet.Collect()
|
||||||
|
op1 := ops[1]
|
||||||
|
op2 := ops[0]
|
||||||
|
if ops[0].Type == Delete {
|
||||||
|
op1 = ops[0]
|
||||||
|
op2 = ops[1]
|
||||||
|
}
|
||||||
|
cInfo, ok := c.channelsInfo[op1.NodeID]
|
||||||
|
if !ok {
|
||||||
|
return errChannelNotExistInNode
|
||||||
|
}
|
||||||
|
var ch *StateChannel
|
||||||
|
if channel, ok := cInfo.Channels[chName]; ok {
|
||||||
|
ch = channel.(*StateChannel)
|
||||||
|
c.addAssignment(op2.NodeID, ch)
|
||||||
|
c.removeAssignment(op1.NodeID, chName)
|
||||||
|
} else {
|
||||||
|
if cInfo, ok = c.channelsInfo[op2.NodeID]; ok {
|
||||||
|
if channel2, ok := cInfo.Channels[chName]; ok {
|
||||||
|
ch = channel2.(*StateChannel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// update channel
|
||||||
|
if ch != nil {
|
||||||
|
ch.Assign(op2.NodeID)
|
||||||
|
if op2.NodeID == bufferID {
|
||||||
|
ch.setState(Standby)
|
||||||
|
} else {
|
||||||
|
ch.setState(ToWatch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) getChannel(nodeID int64, channelName string) *StateChannel {
|
||||||
|
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
||||||
|
if storedChannel, ok := cInfo.Channels[channelName]; ok {
|
||||||
|
return storedChannel.(*StateChannel)
|
||||||
|
}
|
||||||
|
log.Debug("Channel doesn't exist in Node", zap.String("channel", channelName), zap.Int64("nodeID", nodeID))
|
||||||
|
} else {
|
||||||
|
log.Error("Node doesn't exist", zap.Int64("NodeID", nodeID))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) updateMetaMemoryForSingleOp(op *ChannelOp) error {
|
||||||
|
lo.ForEach(op.Channels, func(ch RWChannel, _ int) {
|
||||||
|
switch op.Type {
|
||||||
|
case Release: // release an already exsits storedChannel-node pair
|
||||||
|
if channel := c.getChannel(op.NodeID, ch.GetName()); channel != nil {
|
||||||
|
channel.setState(ToRelease)
|
||||||
|
}
|
||||||
|
case Watch:
|
||||||
|
storedChannel := c.getChannel(op.NodeID, ch.GetName())
|
||||||
|
if storedChannel == nil { // New Channel
|
||||||
|
// set the correct assigment and state for NEW stateChannel
|
||||||
|
newChannel := NewStateChannel(ch)
|
||||||
|
newChannel.Assign(op.NodeID)
|
||||||
|
|
||||||
|
if op.NodeID != bufferID {
|
||||||
|
newChannel.setState(ToWatch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// add channel to memory
|
||||||
|
c.addAssignment(op.NodeID, newChannel)
|
||||||
|
} else { // assign to the original nodes
|
||||||
|
storedChannel.setState(ToWatch)
|
||||||
|
}
|
||||||
|
case Delete: // Remove Channel
|
||||||
|
// if not Delete from bufferID, remove from channel
|
||||||
|
if op.NodeID != bufferID {
|
||||||
|
c.removeAssignment(op.NodeID, ch.GetName())
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.Error("unknown opType in updateMetaMemoryForSingleOp", zap.Any("type", op.Type))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) updateMeta(opSet *ChannelOpSet) error {
|
||||||
|
// Update ChannelStore's kv store.
|
||||||
|
if err := c.txn(opSet); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update memory
|
||||||
|
chOpSet := opSet.SplitByChannel()
|
||||||
|
for chName, ops := range chOpSet {
|
||||||
|
// DELETE + WATCH
|
||||||
|
if ops.Len() == 2 {
|
||||||
|
c.updateMetaMemoryForPairOp(chName, ops)
|
||||||
|
// RELEASE, DELETE, WATCH
|
||||||
|
} else if ops.Len() == 1 {
|
||||||
|
c.updateMetaMemoryForSingleOp(ops.Collect()[0])
|
||||||
|
} else {
|
||||||
|
log.Error("unsupported ChannelOpSet", zap.Any("OpSet", ops))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// txn updates the channelStore's kv store with the given channel ops.
|
||||||
|
func (c *StateChannelStore) txn(opSet *ChannelOpSet) error {
|
||||||
|
var (
|
||||||
|
saves = make(map[string]string)
|
||||||
|
removals []string
|
||||||
|
)
|
||||||
|
for _, op := range opSet.Collect() {
|
||||||
|
opSaves, opRemovals, err := op.BuildKV()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
saves = lo.Assign(opSaves, saves)
|
||||||
|
removals = append(removals, opRemovals...)
|
||||||
|
}
|
||||||
|
return c.store.MultiSaveAndRemove(saves, removals)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) RemoveNode(nodeID int64) {
|
||||||
|
delete(c.channelsInfo, nodeID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) HasChannel(channel string) bool {
|
||||||
|
for _, info := range c.channelsInfo {
|
||||||
|
if _, ok := info.Channels[channel]; ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
ChannelSelector func(ch *StateChannel) bool
|
||||||
|
NodeSelector func(ID int64) bool
|
||||||
|
)
|
||||||
|
|
||||||
|
func WithAllNodes() NodeSelector {
|
||||||
|
return func(ID int64) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithoutBufferNode() NodeSelector {
|
||||||
|
return func(ID int64) bool {
|
||||||
|
return ID != int64(bufferID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithNodeIDs(IDs ...int64) NodeSelector {
|
||||||
|
return func(ID int64) bool {
|
||||||
|
return lo.Contains(IDs, ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithoutNodeIDs(IDs ...int64) NodeSelector {
|
||||||
|
return func(ID int64) bool {
|
||||||
|
return !lo.Contains(IDs, ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithChannelName(channel string) ChannelSelector {
|
||||||
|
return func(ch *StateChannel) bool {
|
||||||
|
return ch.GetName() == channel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithCollectionIDV2(collectionID int64) ChannelSelector {
|
||||||
|
return func(ch *StateChannel) bool {
|
||||||
|
return ch.GetCollectionID() == collectionID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithChannelStates(states ...ChannelState) ChannelSelector {
|
||||||
|
return func(ch *StateChannel) bool {
|
||||||
|
return lo.Contains(states, ch.currentState)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
|
||||||
|
var nodeChannels []*NodeChannelInfo
|
||||||
|
for nodeID, cInfo := range c.channelsInfo {
|
||||||
|
if nodeSelector(nodeID) {
|
||||||
|
selected := make(map[string]RWChannel)
|
||||||
|
for chName, channel := range cInfo.Channels {
|
||||||
|
var sel bool = true
|
||||||
|
for _, selector := range channelSelectors {
|
||||||
|
if !selector(channel.(*StateChannel)) {
|
||||||
|
sel = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sel {
|
||||||
|
selected[chName] = channel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodeChannels = append(nodeChannels, &NodeChannelInfo{
|
||||||
|
NodeID: nodeID,
|
||||||
|
Channels: selected,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nodeChannels
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo {
|
||||||
|
ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo))
|
||||||
|
for id, info := range c.channelsInfo {
|
||||||
|
if id != bufferID {
|
||||||
|
ret = append(ret, info)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
|
||||||
|
nodeChs := make(map[UniqueID][]string)
|
||||||
|
for id, info := range c.channelsInfo {
|
||||||
|
if id == bufferID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var channelNames []string
|
||||||
|
for name, ch := range info.Channels {
|
||||||
|
if ch.GetCollectionID() == collectionID {
|
||||||
|
channelNames = append(channelNames, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodeChs[id] = channelNames
|
||||||
|
}
|
||||||
|
return nodeChs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
|
||||||
|
return c.GetNode(bufferID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) GetNode(nodeID int64) *NodeChannelInfo {
|
||||||
|
if info, ok := c.channelsInfo[nodeID]; ok {
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) GetNodeChannelCount(nodeID int64) int {
|
||||||
|
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
||||||
|
return len(cInfo.Channels)
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StateChannelStore) GetNodes() []int64 {
|
||||||
|
return lo.Filter(lo.Keys(c.channelsInfo), func(ID int64, _ int) bool {
|
||||||
|
return ID != bufferID
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove deletes kv pairs from the kv store where keys have given nodeID as prefix.
|
||||||
|
func (c *StateChannelStore) remove(nodeID int64) error {
|
||||||
|
k := buildKeyPrefix(nodeID)
|
||||||
|
return c.store.RemoveWithPrefix(k)
|
||||||
|
}
|
||||||
|
@ -1,19 +1,3 @@
|
|||||||
// Licensed to the LF AI & Data foundation under one
|
|
||||||
// or more contributor license agreements. See the NOTICE file
|
|
||||||
// distributed with this work for additional information
|
|
||||||
// regarding copyright ownership. The ASF licenses this file
|
|
||||||
// to you under the Apache License, Version 2.0 (the
|
|
||||||
// "License"); you may not use this file except in compliance
|
|
||||||
// with the License. You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package datacoord
|
package datacoord
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -22,26 +6,34 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/samber/lo"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/kv/mocks"
|
"github.com/milvus-io/milvus/internal/kv/mocks"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
|
"github.com/milvus-io/milvus/pkg/kv/predicates"
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/util/testutils"
|
"github.com/milvus-io/milvus/pkg/util/testutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ChannelStoreReloadSuite struct {
|
func TestStateChannelStore(t *testing.T) {
|
||||||
|
suite.Run(t, new(StateChannelStoreSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
type StateChannelStoreSuite struct {
|
||||||
testutils.PromMetricsSuite
|
testutils.PromMetricsSuite
|
||||||
|
|
||||||
mockTxn *mocks.TxnKV
|
mockTxn *mocks.TxnKV
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ChannelStoreReloadSuite) SetupTest() {
|
func (s *StateChannelStoreSuite) SetupTest() {
|
||||||
suite.mockTxn = mocks.NewTxnKV(suite.T())
|
s.mockTxn = mocks.NewTxnKV(s.T())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ChannelStoreReloadSuite) generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.ChannelWatchInfo {
|
func generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.ChannelWatchInfo {
|
||||||
return &datapb.ChannelWatchInfo{
|
return &datapb.ChannelWatchInfo{
|
||||||
Vchan: &datapb.VchannelInfo{
|
Vchan: &datapb.VchannelInfo{
|
||||||
ChannelName: name,
|
ChannelName: name,
|
||||||
@ -50,7 +42,410 @@ func (suite *ChannelStoreReloadSuite) generateWatchInfo(name string, state datap
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ChannelStoreReloadSuite) TestReload() {
|
func (s *StateChannelStoreSuite) createChannelInfo(nodeID int64, channels ...RWChannel) *NodeChannelInfo {
|
||||||
|
cInfo := &NodeChannelInfo{
|
||||||
|
NodeID: nodeID,
|
||||||
|
Channels: make(map[string]RWChannel),
|
||||||
|
}
|
||||||
|
for _, channel := range channels {
|
||||||
|
cInfo.Channels[channel.GetName()] = channel
|
||||||
|
}
|
||||||
|
return cInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StateChannelStoreSuite) TestGetNodeChannelsBy() {
|
||||||
|
nodes := []int64{bufferID, 100, 101, 102}
|
||||||
|
nodesExcludeBufferID := []int64{100, 101, 102}
|
||||||
|
channels := []*StateChannel{
|
||||||
|
getChannel("ch1", 1),
|
||||||
|
getChannel("ch2", 1),
|
||||||
|
getChannel("ch3", 1),
|
||||||
|
getChannel("ch4", 1),
|
||||||
|
getChannel("ch5", 1),
|
||||||
|
getChannel("ch6", 1),
|
||||||
|
getChannel("ch7", 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
channelsInfo := map[int64]*NodeChannelInfo{
|
||||||
|
bufferID: s.createChannelInfo(bufferID, channels[0]),
|
||||||
|
100: s.createChannelInfo(100, channels[1], channels[2]),
|
||||||
|
101: s.createChannelInfo(101, channels[3], channels[4]),
|
||||||
|
102: s.createChannelInfo(102, channels[5], channels[6]), // legacy nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
store := NewStateChannelStore(s.mockTxn)
|
||||||
|
lo.ForEach(nodes, func(nodeID int64, _ int) { store.AddNode(nodeID) })
|
||||||
|
store.channelsInfo = channelsInfo
|
||||||
|
lo.ForEach(channels, func(ch *StateChannel, _ int) {
|
||||||
|
if ch.GetName() == "ch6" || ch.GetName() == "ch7" {
|
||||||
|
ch.setState(Legacy)
|
||||||
|
}
|
||||||
|
s.Require().True(store.HasChannel(ch.GetName()))
|
||||||
|
})
|
||||||
|
s.Require().ElementsMatch(nodesExcludeBufferID, store.GetNodes())
|
||||||
|
store.SetLegacyChannelByNode(102)
|
||||||
|
|
||||||
|
s.Run("test AddNode RemoveNode", func() {
|
||||||
|
var nodeID int64 = 19530
|
||||||
|
_, ok := store.channelsInfo[nodeID]
|
||||||
|
s.Require().False(ok)
|
||||||
|
store.AddNode(nodeID)
|
||||||
|
_, ok = store.channelsInfo[nodeID]
|
||||||
|
s.True(ok)
|
||||||
|
|
||||||
|
store.RemoveNode(nodeID)
|
||||||
|
_, ok = store.channelsInfo[nodeID]
|
||||||
|
s.False(ok)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("test GetNodeChannels", func() {
|
||||||
|
infos := store.GetNodesChannels()
|
||||||
|
expectedResults := map[int64][]string{
|
||||||
|
100: {"ch2", "ch3"},
|
||||||
|
101: {"ch4", "ch5"},
|
||||||
|
102: {"ch6", "ch7"},
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Equal(3, len(infos))
|
||||||
|
|
||||||
|
lo.ForEach(infos, func(info *NodeChannelInfo, _ int) {
|
||||||
|
expectedChannels, ok := expectedResults[info.NodeID]
|
||||||
|
s.True(ok)
|
||||||
|
|
||||||
|
gotChannels := lo.Keys(info.Channels)
|
||||||
|
s.ElementsMatch(expectedChannels, gotChannels)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("test GetBufferChannelInfo", func() {
|
||||||
|
info := store.GetBufferChannelInfo()
|
||||||
|
s.NotNil(info)
|
||||||
|
|
||||||
|
gotChannels := lo.Keys(info.Channels)
|
||||||
|
s.ElementsMatch([]string{"ch1"}, gotChannels)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("test GetNode", func() {
|
||||||
|
info := store.GetNode(19530)
|
||||||
|
s.Nil(info)
|
||||||
|
|
||||||
|
info = store.GetNode(bufferID)
|
||||||
|
s.NotNil(info)
|
||||||
|
|
||||||
|
gotChannels := lo.Keys(info.Channels)
|
||||||
|
s.ElementsMatch([]string{"ch1"}, gotChannels)
|
||||||
|
})
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
nodeSelector NodeSelector
|
||||||
|
channelSelectors []ChannelSelector
|
||||||
|
|
||||||
|
expectedResult map[int64][]string
|
||||||
|
}{
|
||||||
|
{"test withnodeIDs bufferID", WithNodeIDs(bufferID), nil, map[int64][]string{bufferID: {"ch1"}}},
|
||||||
|
{"test withnodeIDs 100", WithNodeIDs(100), nil, map[int64][]string{100: {"ch2", "ch3"}}},
|
||||||
|
{"test withnodeIDs 101 102", WithNodeIDs(101, 102), nil, map[int64][]string{
|
||||||
|
101: {"ch4", "ch5"},
|
||||||
|
102: {"ch6", "ch7"},
|
||||||
|
}},
|
||||||
|
{"test withAllNodes", WithAllNodes(), nil, map[int64][]string{
|
||||||
|
bufferID: {"ch1"},
|
||||||
|
100: {"ch2", "ch3"},
|
||||||
|
101: {"ch4", "ch5"},
|
||||||
|
102: {"ch6", "ch7"},
|
||||||
|
}},
|
||||||
|
{"test WithoutBufferNode", WithoutBufferNode(), nil, map[int64][]string{
|
||||||
|
100: {"ch2", "ch3"},
|
||||||
|
101: {"ch4", "ch5"},
|
||||||
|
102: {"ch6", "ch7"},
|
||||||
|
}},
|
||||||
|
{"test WithoutNodeIDs 100, 101", WithoutNodeIDs(100, 101), nil, map[int64][]string{
|
||||||
|
bufferID: {"ch1"},
|
||||||
|
102: {"ch6", "ch7"},
|
||||||
|
}},
|
||||||
|
{
|
||||||
|
"test WithChannelName ch1", WithNodeIDs(bufferID),
|
||||||
|
[]ChannelSelector{WithChannelName("ch1")},
|
||||||
|
map[int64][]string{
|
||||||
|
bufferID: {"ch1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test WithChannelName ch1, collectionID 1", WithNodeIDs(100),
|
||||||
|
[]ChannelSelector{
|
||||||
|
WithChannelName("ch2"),
|
||||||
|
WithCollectionIDV2(1),
|
||||||
|
},
|
||||||
|
map[int64][]string{100: {"ch2"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test WithCollectionID 1", WithAllNodes(),
|
||||||
|
[]ChannelSelector{
|
||||||
|
WithCollectionIDV2(1),
|
||||||
|
},
|
||||||
|
map[int64][]string{
|
||||||
|
bufferID: {"ch1"},
|
||||||
|
100: {"ch2", "ch3"},
|
||||||
|
101: {"ch4", "ch5"},
|
||||||
|
102: {"ch6", "ch7"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"test WithChannelState", WithNodeIDs(102),
|
||||||
|
[]ChannelSelector{
|
||||||
|
WithChannelStates(Legacy),
|
||||||
|
},
|
||||||
|
map[int64][]string{
|
||||||
|
102: {"ch6", "ch7"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
s.Run(test.description, func() {
|
||||||
|
if test.channelSelectors == nil {
|
||||||
|
test.channelSelectors = []ChannelSelector{}
|
||||||
|
}
|
||||||
|
|
||||||
|
infos := store.GetNodeChannelsBy(test.nodeSelector, test.channelSelectors...)
|
||||||
|
log.Info("got test infos", zap.Any("infos", infos))
|
||||||
|
s.Equal(len(test.expectedResult), len(infos))
|
||||||
|
|
||||||
|
lo.ForEach(infos, func(info *NodeChannelInfo, _ int) {
|
||||||
|
expectedChannels, ok := test.expectedResult[info.NodeID]
|
||||||
|
s.True(ok)
|
||||||
|
|
||||||
|
gotChannels := lo.Keys(info.Channels)
|
||||||
|
s.ElementsMatch(expectedChannels, gotChannels)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StateChannelStoreSuite) TestUpdateWithTxnLimit() {
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
inOpCount int
|
||||||
|
outTxnCount int
|
||||||
|
}{
|
||||||
|
{"operations count < maxPerTxn", maxOperationsPerTxn - 1, 1},
|
||||||
|
{"operations count = maxPerTxn", maxOperationsPerTxn, 1},
|
||||||
|
{"operations count > maxPerTxn", maxOperationsPerTxn + 1, 2},
|
||||||
|
{"operations count = 2*maxPerTxn", maxOperationsPerTxn * 2, 2},
|
||||||
|
{"operations count = 2*maxPerTxn+1", maxOperationsPerTxn*2 + 1, 3},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
s.SetupTest()
|
||||||
|
s.Run(test.description, func() {
|
||||||
|
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
|
||||||
|
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
|
||||||
|
log.Info("test save and remove", zap.Any("saves", saves), zap.Any("removals", removals))
|
||||||
|
}).Return(nil).Times(test.outTxnCount)
|
||||||
|
|
||||||
|
store := NewStateChannelStore(s.mockTxn)
|
||||||
|
store.AddNode(1)
|
||||||
|
s.Require().ElementsMatch([]int64{1}, store.GetNodes())
|
||||||
|
s.Require().Equal(0, store.GetNodeChannelCount(1))
|
||||||
|
|
||||||
|
// Get operations
|
||||||
|
ops := genChannelOperations(1, Watch, test.inOpCount)
|
||||||
|
err := store.Update(ops)
|
||||||
|
s.NoError(err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StateChannelStoreSuite) TestUpdateMeta2000kSegs() {
|
||||||
|
ch := getChannel("ch1", 1)
|
||||||
|
info := ch.GetWatchInfo()
|
||||||
|
// way larger than limit=2097152
|
||||||
|
seg2000k := make([]int64, 2000000)
|
||||||
|
for i := range seg2000k {
|
||||||
|
seg2000k[i] = int64(i)
|
||||||
|
}
|
||||||
|
info.Vchan.FlushedSegmentIds = seg2000k
|
||||||
|
ch.UpdateWatchInfo(info)
|
||||||
|
|
||||||
|
opSet := NewChannelOpSet(
|
||||||
|
NewChannelOp(bufferID, Delete, ch),
|
||||||
|
NewChannelOp(100, Watch, ch),
|
||||||
|
)
|
||||||
|
s.SetupTest()
|
||||||
|
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
|
||||||
|
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
|
||||||
|
}).Return(nil).Once()
|
||||||
|
|
||||||
|
store := NewStateChannelStore(s.mockTxn)
|
||||||
|
store.AddNode(100)
|
||||||
|
s.Require().Equal(0, store.GetNodeChannelCount(100))
|
||||||
|
store.addAssignment(bufferID, ch)
|
||||||
|
s.Require().Equal(1, store.GetNodeChannelCount(bufferID))
|
||||||
|
|
||||||
|
err := store.updateMeta(opSet)
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
|
got := store.GetNodeChannelsBy(WithNodeIDs(100))
|
||||||
|
s.NotNil(got)
|
||||||
|
s.Require().Equal(1, len(got))
|
||||||
|
gotInfo := got[0]
|
||||||
|
s.ElementsMatch([]string{"ch1"}, lo.Keys(gotInfo.Channels))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StateChannelStoreSuite) TestUpdateMeta() {
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
|
||||||
|
opSet *ChannelOpSet
|
||||||
|
nodeIDs []int64
|
||||||
|
channels []*StateChannel
|
||||||
|
assignments map[int64][]string
|
||||||
|
|
||||||
|
outAssignments map[int64][]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"delete_watch_ch1 from bufferID to nodeID=100",
|
||||||
|
NewChannelOpSet(
|
||||||
|
NewChannelOp(bufferID, Delete, getChannel("ch1", 1)),
|
||||||
|
NewChannelOp(100, Watch, getChannel("ch1", 1)),
|
||||||
|
),
|
||||||
|
[]int64{bufferID, 100},
|
||||||
|
[]*StateChannel{getChannel("ch1", 1)},
|
||||||
|
map[int64][]string{
|
||||||
|
bufferID: {"ch1"},
|
||||||
|
},
|
||||||
|
map[int64][]string{
|
||||||
|
100: {"ch1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"delete_watch_ch1 from lagecyID=99 to nodeID=100",
|
||||||
|
NewChannelOpSet(
|
||||||
|
NewChannelOp(99, Delete, getChannel("ch1", 1)),
|
||||||
|
NewChannelOp(100, Watch, getChannel("ch1", 1)),
|
||||||
|
),
|
||||||
|
[]int64{bufferID, 99, 100},
|
||||||
|
[]*StateChannel{getChannel("ch1", 1)},
|
||||||
|
map[int64][]string{
|
||||||
|
99: {"ch1"},
|
||||||
|
},
|
||||||
|
map[int64][]string{
|
||||||
|
100: {"ch1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"release from nodeID=100",
|
||||||
|
NewChannelOpSet(
|
||||||
|
NewChannelOp(100, Release, getChannel("ch1", 1)),
|
||||||
|
),
|
||||||
|
[]int64{bufferID, 100},
|
||||||
|
[]*StateChannel{getChannel("ch1", 1)},
|
||||||
|
map[int64][]string{
|
||||||
|
100: {"ch1"},
|
||||||
|
},
|
||||||
|
map[int64][]string{
|
||||||
|
100: {"ch1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"watch a new channel from nodeID=100",
|
||||||
|
NewChannelOpSet(
|
||||||
|
NewChannelOp(100, Watch, getChannel("ch1", 1)),
|
||||||
|
),
|
||||||
|
[]int64{bufferID, 100},
|
||||||
|
[]*StateChannel{getChannel("ch1", 1)},
|
||||||
|
map[int64][]string{
|
||||||
|
100: {"ch1"},
|
||||||
|
},
|
||||||
|
map[int64][]string{
|
||||||
|
100: {"ch1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Delete remove a channelfrom nodeID=100",
|
||||||
|
NewChannelOpSet(
|
||||||
|
NewChannelOp(100, Delete, getChannel("ch1", 1)),
|
||||||
|
),
|
||||||
|
[]int64{bufferID, 100},
|
||||||
|
[]*StateChannel{getChannel("ch1", 1)},
|
||||||
|
map[int64][]string{
|
||||||
|
100: {"ch1"},
|
||||||
|
},
|
||||||
|
map[int64][]string{
|
||||||
|
100: {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
s.SetupTest()
|
||||||
|
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
|
||||||
|
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
|
||||||
|
}).Return(nil).Times(len(tests))
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
s.Run(test.description, func() {
|
||||||
|
store := NewStateChannelStore(s.mockTxn)
|
||||||
|
|
||||||
|
lo.ForEach(test.nodeIDs, func(nodeID int64, _ int) {
|
||||||
|
store.AddNode(nodeID)
|
||||||
|
s.Require().Equal(0, store.GetNodeChannelCount(nodeID))
|
||||||
|
})
|
||||||
|
c := make(map[string]*StateChannel)
|
||||||
|
lo.ForEach(test.channels, func(ch *StateChannel, _ int) { c[ch.GetName()] = ch })
|
||||||
|
for nodeID, channels := range test.assignments {
|
||||||
|
lo.ForEach(channels, func(ch string, _ int) {
|
||||||
|
store.addAssignment(nodeID, c[ch])
|
||||||
|
})
|
||||||
|
s.Require().Equal(1, store.GetNodeChannelCount(nodeID))
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.updateMeta(test.opSet)
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
|
for nodeID, channels := range test.outAssignments {
|
||||||
|
got := store.GetNodeChannelsBy(WithNodeIDs(nodeID))
|
||||||
|
s.NotNil(got)
|
||||||
|
s.Require().Equal(1, len(got))
|
||||||
|
info := got[0]
|
||||||
|
s.ElementsMatch(channels, lo.Keys(info.Channels))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StateChannelStoreSuite) TestUpdateState() {
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
|
||||||
|
inSuccess bool
|
||||||
|
inChannelState ChannelState
|
||||||
|
outChannelState ChannelState
|
||||||
|
}{
|
||||||
|
{"input standby, fail", false, Standby, Standby},
|
||||||
|
{"input standby, success", true, Standby, ToWatch},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
s.Run(test.description, func() {
|
||||||
|
store := NewStateChannelStore(s.mockTxn)
|
||||||
|
|
||||||
|
ch := "ch-1"
|
||||||
|
channel := NewStateChannel(getChannel(ch, 1))
|
||||||
|
channel.setState(test.inChannelState)
|
||||||
|
store.channelsInfo[1] = &NodeChannelInfo{
|
||||||
|
NodeID: bufferID,
|
||||||
|
Channels: map[string]RWChannel{
|
||||||
|
ch: channel,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
store.UpdateState(test.inSuccess, channel)
|
||||||
|
s.Equal(test.outChannelState, channel.currentState)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StateChannelStoreSuite) TestReload() {
|
||||||
type item struct {
|
type item struct {
|
||||||
nodeID int64
|
nodeID int64
|
||||||
channelName string
|
channelName string
|
||||||
@ -86,30 +481,39 @@ func (suite *ChannelStoreReloadSuite) TestReload() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
suite.Run(tc.tag, func() {
|
s.Run(tc.tag, func() {
|
||||||
suite.mockTxn.ExpectedCalls = nil
|
s.mockTxn.ExpectedCalls = nil
|
||||||
|
|
||||||
var keys, values []string
|
var keys, values []string
|
||||||
for _, item := range tc.items {
|
for _, item := range tc.items {
|
||||||
keys = append(keys, fmt.Sprintf("channel_store/%d/%s", item.nodeID, item.channelName))
|
keys = append(keys, fmt.Sprintf("channel_store/%d/%s", item.nodeID, item.channelName))
|
||||||
info := suite.generateWatchInfo(item.channelName, datapb.ChannelWatchState_WatchSuccess)
|
info := generateWatchInfo(item.channelName, datapb.ChannelWatchState_WatchSuccess)
|
||||||
bs, err := proto.Marshal(info)
|
bs, err := proto.Marshal(info)
|
||||||
suite.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
values = append(values, string(bs))
|
values = append(values, string(bs))
|
||||||
}
|
}
|
||||||
suite.mockTxn.EXPECT().LoadWithPrefix(mock.AnythingOfType("string")).Return(keys, values, nil)
|
s.mockTxn.EXPECT().LoadWithPrefix(mock.AnythingOfType("string")).Return(keys, values, nil)
|
||||||
|
|
||||||
store := NewChannelStore(suite.mockTxn)
|
store := NewStateChannelStore(s.mockTxn)
|
||||||
err := store.Reload()
|
err := store.Reload()
|
||||||
suite.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
for nodeID, expect := range tc.expect {
|
for nodeID, expect := range tc.expect {
|
||||||
suite.MetricsEqual(metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)), float64(expect))
|
s.MetricsEqual(metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)), float64(expect))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestChannelStore(t *testing.T) {
|
func genChannelOperations(nodeID int64, opType ChannelOpType, num int) *ChannelOpSet {
|
||||||
suite.Run(t, new(ChannelStoreReloadSuite))
|
channels := make([]RWChannel, 0, num)
|
||||||
|
for i := 0; i < num; i++ {
|
||||||
|
name := fmt.Sprintf("ch%d", i)
|
||||||
|
channel := NewStateChannel(getChannel(name, 1))
|
||||||
|
channel.Info = generateWatchInfo(name, datapb.ChannelWatchState_ToWatch)
|
||||||
|
channels = append(channels, channel)
|
||||||
|
}
|
||||||
|
|
||||||
|
ops := NewChannelOpSet(NewChannelOp(nodeID, opType, channels...))
|
||||||
|
return ops
|
||||||
}
|
}
|
||||||
|
@ -1,450 +0,0 @@
|
|||||||
package datacoord
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
|
||||||
"github.com/golang/protobuf/proto"
|
|
||||||
"github.com/samber/lo"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
||||||
"github.com/milvus-io/milvus/pkg/kv"
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
|
||||||
)
|
|
||||||
|
|
||||||
type StateChannelStore struct {
|
|
||||||
store kv.TxnKV
|
|
||||||
channelsInfo map[int64]*NodeChannelInfo // A map of (nodeID) -> (NodeChannelInfo).
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ RWChannelStore = (*StateChannelStore)(nil)
|
|
||||||
|
|
||||||
var errChannelNotExistInNode = errors.New("channel doesn't exist in given node")
|
|
||||||
|
|
||||||
func NewChannelStoreV2(kv kv.TxnKV) RWChannelStore {
|
|
||||||
return NewStateChannelStore(kv)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewStateChannelStore(kv kv.TxnKV) *StateChannelStore {
|
|
||||||
c := StateChannelStore{
|
|
||||||
store: kv,
|
|
||||||
channelsInfo: make(map[int64]*NodeChannelInfo),
|
|
||||||
}
|
|
||||||
c.channelsInfo[bufferID] = &NodeChannelInfo{
|
|
||||||
NodeID: bufferID,
|
|
||||||
Channels: make(map[string]RWChannel),
|
|
||||||
}
|
|
||||||
return &c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) Reload() error {
|
|
||||||
record := timerecord.NewTimeRecorder("datacoord")
|
|
||||||
keys, values, err := c.store.LoadWithPrefix(Params.CommonCfg.DataCoordWatchSubPath.GetValue())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for i := 0; i < len(keys); i++ {
|
|
||||||
k := keys[i]
|
|
||||||
v := values[i]
|
|
||||||
nodeID, err := parseNodeKey(k)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
info := &datapb.ChannelWatchInfo{}
|
|
||||||
if err := proto.Unmarshal([]byte(v), info); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
reviseVChannelInfo(info.GetVchan())
|
|
||||||
|
|
||||||
c.AddNode(nodeID)
|
|
||||||
|
|
||||||
channel := NewStateChannelByWatchInfo(nodeID, info)
|
|
||||||
c.channelsInfo[nodeID].AddChannel(channel)
|
|
||||||
log.Info("channel store reload channel",
|
|
||||||
zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name))
|
|
||||||
metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels)))
|
|
||||||
}
|
|
||||||
log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan()))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) AddNode(nodeID int64) {
|
|
||||||
if _, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.channelsInfo[nodeID] = &NodeChannelInfo{
|
|
||||||
NodeID: nodeID,
|
|
||||||
Channels: make(map[string]RWChannel),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
|
|
||||||
lo.ForEach(channels, func(ch RWChannel, _ int) {
|
|
||||||
for _, cInfo := range c.channelsInfo {
|
|
||||||
if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok {
|
|
||||||
if isSuccessful {
|
|
||||||
stateChannel.(*StateChannel).TransitionOnSuccess()
|
|
||||||
} else {
|
|
||||||
stateChannel.(*StateChannel).TransitionOnFailure()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) {
|
|
||||||
lo.ForEach(nodeIDs, func(nodeID int64, _ int) {
|
|
||||||
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
for _, ch := range cInfo.Channels {
|
|
||||||
ch.(*StateChannel).setState(Legacy)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) Update(opSet *ChannelOpSet) error {
|
|
||||||
// Split opset into multiple txn. Operations on the same channel must be executed in one txn.
|
|
||||||
perChOps := opSet.SplitByChannel()
|
|
||||||
|
|
||||||
// Execute a txn for every 64 operations.
|
|
||||||
count := 0
|
|
||||||
operations := make([]*ChannelOp, 0, maxOperationsPerTxn)
|
|
||||||
for _, opset := range perChOps {
|
|
||||||
if !c.sanityCheckPerChannelOpSet(opset) {
|
|
||||||
log.Error("unsupported ChannelOpSet", zap.Any("OpSet", opset))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if opset.Len() > maxOperationsPerTxn {
|
|
||||||
log.Error("Operations for one channel exceeds maxOperationsPerTxn",
|
|
||||||
zap.Any("opset size", opset.Len()),
|
|
||||||
zap.Int("limit", maxOperationsPerTxn))
|
|
||||||
}
|
|
||||||
if count+opset.Len() > maxOperationsPerTxn {
|
|
||||||
if err := c.updateMeta(NewChannelOpSet(operations...)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
count = 0
|
|
||||||
operations = make([]*ChannelOp, 0, maxOperationsPerTxn)
|
|
||||||
}
|
|
||||||
count += opset.Len()
|
|
||||||
operations = append(operations, opset.Collect()...)
|
|
||||||
}
|
|
||||||
if count == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.updateMeta(NewChannelOpSet(operations...))
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove from the assignments
|
|
||||||
func (c *StateChannelStore) removeAssignment(nodeID int64, channelName string) {
|
|
||||||
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
delete(cInfo.Channels, channelName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) addAssignment(nodeID int64, channel RWChannel) {
|
|
||||||
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
cInfo.Channels[channel.GetName()] = channel
|
|
||||||
} else {
|
|
||||||
c.channelsInfo[nodeID] = &NodeChannelInfo{
|
|
||||||
NodeID: nodeID,
|
|
||||||
Channels: map[string]RWChannel{
|
|
||||||
channel.GetName(): channel,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateMeta applies the WATCH/RELEASE/DELETE operations to the current channel store.
|
|
||||||
// DELETE + WATCH ---> from bufferID to nodeID
|
|
||||||
// DELETE + WATCH ---> from lagecyID to nodeID
|
|
||||||
// DELETE + WATCH ---> from deletedNode to nodeID/bufferID
|
|
||||||
// DELETE + WATCH ---> from releasedNode to nodeID/bufferID
|
|
||||||
// RELEASE ---> release from nodeID
|
|
||||||
// WATCH ---> watch to a new channel
|
|
||||||
// DELETE ---> remove the channel
|
|
||||||
func (c *StateChannelStore) sanityCheckPerChannelOpSet(opSet *ChannelOpSet) bool {
|
|
||||||
if opSet.Len() == 2 {
|
|
||||||
ops := opSet.Collect()
|
|
||||||
return (ops[0].Type == Delete && ops[1].Type == Watch) || (ops[1].Type == Delete && ops[0].Type == Watch)
|
|
||||||
} else if opSet.Len() == 1 {
|
|
||||||
t := opSet.Collect()[0].Type
|
|
||||||
return t == Delete || t == Watch || t == Release
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// DELETE + WATCH
|
|
||||||
func (c *StateChannelStore) updateMetaMemoryForPairOp(chName string, opSet *ChannelOpSet) error {
|
|
||||||
if !c.sanityCheckPerChannelOpSet(opSet) {
|
|
||||||
return errUnknownOpType
|
|
||||||
}
|
|
||||||
ops := opSet.Collect()
|
|
||||||
op1 := ops[1]
|
|
||||||
op2 := ops[0]
|
|
||||||
if ops[0].Type == Delete {
|
|
||||||
op1 = ops[0]
|
|
||||||
op2 = ops[1]
|
|
||||||
}
|
|
||||||
cInfo, ok := c.channelsInfo[op1.NodeID]
|
|
||||||
if !ok {
|
|
||||||
return errChannelNotExistInNode
|
|
||||||
}
|
|
||||||
var ch *StateChannel
|
|
||||||
if channel, ok := cInfo.Channels[chName]; ok {
|
|
||||||
ch = channel.(*StateChannel)
|
|
||||||
c.addAssignment(op2.NodeID, ch)
|
|
||||||
c.removeAssignment(op1.NodeID, chName)
|
|
||||||
} else {
|
|
||||||
if cInfo, ok = c.channelsInfo[op2.NodeID]; ok {
|
|
||||||
if channel2, ok := cInfo.Channels[chName]; ok {
|
|
||||||
ch = channel2.(*StateChannel)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// update channel
|
|
||||||
if ch != nil {
|
|
||||||
ch.Assign(op2.NodeID)
|
|
||||||
if op2.NodeID == bufferID {
|
|
||||||
ch.setState(Standby)
|
|
||||||
} else {
|
|
||||||
ch.setState(ToWatch)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) getChannel(nodeID int64, channelName string) *StateChannel {
|
|
||||||
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
if storedChannel, ok := cInfo.Channels[channelName]; ok {
|
|
||||||
return storedChannel.(*StateChannel)
|
|
||||||
}
|
|
||||||
log.Debug("Channel doesn't exist in Node", zap.String("channel", channelName), zap.Int64("nodeID", nodeID))
|
|
||||||
} else {
|
|
||||||
log.Error("Node doesn't exist", zap.Int64("NodeID", nodeID))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) updateMetaMemoryForSingleOp(op *ChannelOp) error {
|
|
||||||
lo.ForEach(op.Channels, func(ch RWChannel, _ int) {
|
|
||||||
switch op.Type {
|
|
||||||
case Release: // release an already exsits storedChannel-node pair
|
|
||||||
if channel := c.getChannel(op.NodeID, ch.GetName()); channel != nil {
|
|
||||||
channel.setState(ToRelease)
|
|
||||||
}
|
|
||||||
case Watch:
|
|
||||||
storedChannel := c.getChannel(op.NodeID, ch.GetName())
|
|
||||||
if storedChannel == nil { // New Channel
|
|
||||||
// set the correct assigment and state for NEW stateChannel
|
|
||||||
newChannel := NewStateChannel(ch)
|
|
||||||
newChannel.Assign(op.NodeID)
|
|
||||||
|
|
||||||
if op.NodeID != bufferID {
|
|
||||||
newChannel.setState(ToWatch)
|
|
||||||
}
|
|
||||||
|
|
||||||
// add channel to memory
|
|
||||||
c.addAssignment(op.NodeID, newChannel)
|
|
||||||
} else { // assign to the original nodes
|
|
||||||
storedChannel.setState(ToWatch)
|
|
||||||
}
|
|
||||||
case Delete: // Remove Channel
|
|
||||||
// if not Delete from bufferID, remove from channel
|
|
||||||
if op.NodeID != bufferID {
|
|
||||||
c.removeAssignment(op.NodeID, ch.GetName())
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
log.Error("unknown opType in updateMetaMemoryForSingleOp", zap.Any("type", op.Type))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) updateMeta(opSet *ChannelOpSet) error {
|
|
||||||
// Update ChannelStore's kv store.
|
|
||||||
if err := c.txn(opSet); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update memory
|
|
||||||
chOpSet := opSet.SplitByChannel()
|
|
||||||
for chName, ops := range chOpSet {
|
|
||||||
// DELETE + WATCH
|
|
||||||
if ops.Len() == 2 {
|
|
||||||
c.updateMetaMemoryForPairOp(chName, ops)
|
|
||||||
// RELEASE, DELETE, WATCH
|
|
||||||
} else if ops.Len() == 1 {
|
|
||||||
c.updateMetaMemoryForSingleOp(ops.Collect()[0])
|
|
||||||
} else {
|
|
||||||
log.Error("unsupported ChannelOpSet", zap.Any("OpSet", ops))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// txn updates the channelStore's kv store with the given channel ops.
|
|
||||||
func (c *StateChannelStore) txn(opSet *ChannelOpSet) error {
|
|
||||||
var (
|
|
||||||
saves = make(map[string]string)
|
|
||||||
removals []string
|
|
||||||
)
|
|
||||||
for _, op := range opSet.Collect() {
|
|
||||||
opSaves, opRemovals, err := op.BuildKV()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
saves = lo.Assign(opSaves, saves)
|
|
||||||
removals = append(removals, opRemovals...)
|
|
||||||
}
|
|
||||||
return c.store.MultiSaveAndRemove(saves, removals)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) RemoveNode(nodeID int64) {
|
|
||||||
delete(c.channelsInfo, nodeID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) HasChannel(channel string) bool {
|
|
||||||
for _, info := range c.channelsInfo {
|
|
||||||
if _, ok := info.Channels[channel]; ok {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
type (
|
|
||||||
ChannelSelector func(ch *StateChannel) bool
|
|
||||||
NodeSelector func(ID int64) bool
|
|
||||||
)
|
|
||||||
|
|
||||||
func WithAllNodes() NodeSelector {
|
|
||||||
return func(ID int64) bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithoutBufferNode() NodeSelector {
|
|
||||||
return func(ID int64) bool {
|
|
||||||
return ID != int64(bufferID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithNodeIDs(IDs ...int64) NodeSelector {
|
|
||||||
return func(ID int64) bool {
|
|
||||||
return lo.Contains(IDs, ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithoutNodeIDs(IDs ...int64) NodeSelector {
|
|
||||||
return func(ID int64) bool {
|
|
||||||
return !lo.Contains(IDs, ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithChannelName(channel string) ChannelSelector {
|
|
||||||
return func(ch *StateChannel) bool {
|
|
||||||
return ch.GetName() == channel
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithCollectionIDV2(collectionID int64) ChannelSelector {
|
|
||||||
return func(ch *StateChannel) bool {
|
|
||||||
return ch.GetCollectionID() == collectionID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithChannelStates(states ...ChannelState) ChannelSelector {
|
|
||||||
return func(ch *StateChannel) bool {
|
|
||||||
return lo.Contains(states, ch.currentState)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
|
|
||||||
var nodeChannels []*NodeChannelInfo
|
|
||||||
for nodeID, cInfo := range c.channelsInfo {
|
|
||||||
if nodeSelector(nodeID) {
|
|
||||||
selected := make(map[string]RWChannel)
|
|
||||||
for chName, channel := range cInfo.Channels {
|
|
||||||
var sel bool = true
|
|
||||||
for _, selector := range channelSelectors {
|
|
||||||
if !selector(channel.(*StateChannel)) {
|
|
||||||
sel = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if sel {
|
|
||||||
selected[chName] = channel
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nodeChannels = append(nodeChannels, &NodeChannelInfo{
|
|
||||||
NodeID: nodeID,
|
|
||||||
Channels: selected,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nodeChannels
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo {
|
|
||||||
ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo))
|
|
||||||
for id, info := range c.channelsInfo {
|
|
||||||
if id != bufferID {
|
|
||||||
ret = append(ret, info)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
|
|
||||||
nodeChs := make(map[UniqueID][]string)
|
|
||||||
for id, info := range c.channelsInfo {
|
|
||||||
if id == bufferID {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var channelNames []string
|
|
||||||
for name, ch := range info.Channels {
|
|
||||||
if ch.GetCollectionID() == collectionID {
|
|
||||||
channelNames = append(channelNames, name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nodeChs[id] = channelNames
|
|
||||||
}
|
|
||||||
return nodeChs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
|
|
||||||
return c.GetNode(bufferID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) GetNode(nodeID int64) *NodeChannelInfo {
|
|
||||||
if info, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
return info
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) GetNodeChannelCount(nodeID int64) int {
|
|
||||||
if cInfo, ok := c.channelsInfo[nodeID]; ok {
|
|
||||||
return len(cInfo.Channels)
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *StateChannelStore) GetNodes() []int64 {
|
|
||||||
return lo.Filter(lo.Keys(c.channelsInfo), func(ID int64, _ int) bool {
|
|
||||||
return ID != bufferID
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove deletes kv pairs from the kv store where keys have given nodeID as prefix.
|
|
||||||
func (c *StateChannelStore) remove(nodeID int64) error {
|
|
||||||
k := buildKeyPrefix(nodeID)
|
|
||||||
return c.store.RemoveWithPrefix(k)
|
|
||||||
}
|
|
@ -1,519 +0,0 @@
|
|||||||
package datacoord
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strconv"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
|
||||||
"github.com/samber/lo"
|
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
"github.com/stretchr/testify/suite"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/kv/mocks"
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
||||||
"github.com/milvus-io/milvus/pkg/kv/predicates"
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/testutils"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestStateChannelStore(t *testing.T) {
|
|
||||||
suite.Run(t, new(StateChannelStoreSuite))
|
|
||||||
}
|
|
||||||
|
|
||||||
type StateChannelStoreSuite struct {
|
|
||||||
testutils.PromMetricsSuite
|
|
||||||
|
|
||||||
mockTxn *mocks.TxnKV
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StateChannelStoreSuite) SetupTest() {
|
|
||||||
s.mockTxn = mocks.NewTxnKV(s.T())
|
|
||||||
}
|
|
||||||
|
|
||||||
func generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.ChannelWatchInfo {
|
|
||||||
return &datapb.ChannelWatchInfo{
|
|
||||||
Vchan: &datapb.VchannelInfo{
|
|
||||||
ChannelName: name,
|
|
||||||
},
|
|
||||||
State: state,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StateChannelStoreSuite) createChannelInfo(nodeID int64, channels ...RWChannel) *NodeChannelInfo {
|
|
||||||
cInfo := &NodeChannelInfo{
|
|
||||||
NodeID: nodeID,
|
|
||||||
Channels: make(map[string]RWChannel),
|
|
||||||
}
|
|
||||||
for _, channel := range channels {
|
|
||||||
cInfo.Channels[channel.GetName()] = channel
|
|
||||||
}
|
|
||||||
return cInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StateChannelStoreSuite) TestGetNodeChannelsBy() {
|
|
||||||
nodes := []int64{bufferID, 100, 101, 102}
|
|
||||||
nodesExcludeBufferID := []int64{100, 101, 102}
|
|
||||||
channels := []*StateChannel{
|
|
||||||
getChannel("ch1", 1),
|
|
||||||
getChannel("ch2", 1),
|
|
||||||
getChannel("ch3", 1),
|
|
||||||
getChannel("ch4", 1),
|
|
||||||
getChannel("ch5", 1),
|
|
||||||
getChannel("ch6", 1),
|
|
||||||
getChannel("ch7", 1),
|
|
||||||
}
|
|
||||||
|
|
||||||
channelsInfo := map[int64]*NodeChannelInfo{
|
|
||||||
bufferID: s.createChannelInfo(bufferID, channels[0]),
|
|
||||||
100: s.createChannelInfo(100, channels[1], channels[2]),
|
|
||||||
101: s.createChannelInfo(101, channels[3], channels[4]),
|
|
||||||
102: s.createChannelInfo(102, channels[5], channels[6]), // legacy nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
store := NewStateChannelStore(s.mockTxn)
|
|
||||||
lo.ForEach(nodes, func(nodeID int64, _ int) { store.AddNode(nodeID) })
|
|
||||||
store.channelsInfo = channelsInfo
|
|
||||||
lo.ForEach(channels, func(ch *StateChannel, _ int) {
|
|
||||||
if ch.GetName() == "ch6" || ch.GetName() == "ch7" {
|
|
||||||
ch.setState(Legacy)
|
|
||||||
}
|
|
||||||
s.Require().True(store.HasChannel(ch.GetName()))
|
|
||||||
})
|
|
||||||
s.Require().ElementsMatch(nodesExcludeBufferID, store.GetNodes())
|
|
||||||
store.SetLegacyChannelByNode(102)
|
|
||||||
|
|
||||||
s.Run("test AddNode RemoveNode", func() {
|
|
||||||
var nodeID int64 = 19530
|
|
||||||
_, ok := store.channelsInfo[nodeID]
|
|
||||||
s.Require().False(ok)
|
|
||||||
store.AddNode(nodeID)
|
|
||||||
_, ok = store.channelsInfo[nodeID]
|
|
||||||
s.True(ok)
|
|
||||||
|
|
||||||
store.RemoveNode(nodeID)
|
|
||||||
_, ok = store.channelsInfo[nodeID]
|
|
||||||
s.False(ok)
|
|
||||||
})
|
|
||||||
|
|
||||||
s.Run("test GetNodeChannels", func() {
|
|
||||||
infos := store.GetNodesChannels()
|
|
||||||
expectedResults := map[int64][]string{
|
|
||||||
100: {"ch2", "ch3"},
|
|
||||||
101: {"ch4", "ch5"},
|
|
||||||
102: {"ch6", "ch7"},
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Equal(3, len(infos))
|
|
||||||
|
|
||||||
lo.ForEach(infos, func(info *NodeChannelInfo, _ int) {
|
|
||||||
expectedChannels, ok := expectedResults[info.NodeID]
|
|
||||||
s.True(ok)
|
|
||||||
|
|
||||||
gotChannels := lo.Keys(info.Channels)
|
|
||||||
s.ElementsMatch(expectedChannels, gotChannels)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
s.Run("test GetBufferChannelInfo", func() {
|
|
||||||
info := store.GetBufferChannelInfo()
|
|
||||||
s.NotNil(info)
|
|
||||||
|
|
||||||
gotChannels := lo.Keys(info.Channels)
|
|
||||||
s.ElementsMatch([]string{"ch1"}, gotChannels)
|
|
||||||
})
|
|
||||||
|
|
||||||
s.Run("test GetNode", func() {
|
|
||||||
info := store.GetNode(19530)
|
|
||||||
s.Nil(info)
|
|
||||||
|
|
||||||
info = store.GetNode(bufferID)
|
|
||||||
s.NotNil(info)
|
|
||||||
|
|
||||||
gotChannels := lo.Keys(info.Channels)
|
|
||||||
s.ElementsMatch([]string{"ch1"}, gotChannels)
|
|
||||||
})
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
description string
|
|
||||||
nodeSelector NodeSelector
|
|
||||||
channelSelectors []ChannelSelector
|
|
||||||
|
|
||||||
expectedResult map[int64][]string
|
|
||||||
}{
|
|
||||||
{"test withnodeIDs bufferID", WithNodeIDs(bufferID), nil, map[int64][]string{bufferID: {"ch1"}}},
|
|
||||||
{"test withnodeIDs 100", WithNodeIDs(100), nil, map[int64][]string{100: {"ch2", "ch3"}}},
|
|
||||||
{"test withnodeIDs 101 102", WithNodeIDs(101, 102), nil, map[int64][]string{
|
|
||||||
101: {"ch4", "ch5"},
|
|
||||||
102: {"ch6", "ch7"},
|
|
||||||
}},
|
|
||||||
{"test withAllNodes", WithAllNodes(), nil, map[int64][]string{
|
|
||||||
bufferID: {"ch1"},
|
|
||||||
100: {"ch2", "ch3"},
|
|
||||||
101: {"ch4", "ch5"},
|
|
||||||
102: {"ch6", "ch7"},
|
|
||||||
}},
|
|
||||||
{"test WithoutBufferNode", WithoutBufferNode(), nil, map[int64][]string{
|
|
||||||
100: {"ch2", "ch3"},
|
|
||||||
101: {"ch4", "ch5"},
|
|
||||||
102: {"ch6", "ch7"},
|
|
||||||
}},
|
|
||||||
{"test WithoutNodeIDs 100, 101", WithoutNodeIDs(100, 101), nil, map[int64][]string{
|
|
||||||
bufferID: {"ch1"},
|
|
||||||
102: {"ch6", "ch7"},
|
|
||||||
}},
|
|
||||||
{
|
|
||||||
"test WithChannelName ch1", WithNodeIDs(bufferID),
|
|
||||||
[]ChannelSelector{WithChannelName("ch1")},
|
|
||||||
map[int64][]string{
|
|
||||||
bufferID: {"ch1"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"test WithChannelName ch1, collectionID 1", WithNodeIDs(100),
|
|
||||||
[]ChannelSelector{
|
|
||||||
WithChannelName("ch2"),
|
|
||||||
WithCollectionIDV2(1),
|
|
||||||
},
|
|
||||||
map[int64][]string{100: {"ch2"}},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"test WithCollectionID 1", WithAllNodes(),
|
|
||||||
[]ChannelSelector{
|
|
||||||
WithCollectionIDV2(1),
|
|
||||||
},
|
|
||||||
map[int64][]string{
|
|
||||||
bufferID: {"ch1"},
|
|
||||||
100: {"ch2", "ch3"},
|
|
||||||
101: {"ch4", "ch5"},
|
|
||||||
102: {"ch6", "ch7"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"test WithChannelState", WithNodeIDs(102),
|
|
||||||
[]ChannelSelector{
|
|
||||||
WithChannelStates(Legacy),
|
|
||||||
},
|
|
||||||
map[int64][]string{
|
|
||||||
102: {"ch6", "ch7"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
s.Run(test.description, func() {
|
|
||||||
if test.channelSelectors == nil {
|
|
||||||
test.channelSelectors = []ChannelSelector{}
|
|
||||||
}
|
|
||||||
|
|
||||||
infos := store.GetNodeChannelsBy(test.nodeSelector, test.channelSelectors...)
|
|
||||||
log.Info("got test infos", zap.Any("infos", infos))
|
|
||||||
s.Equal(len(test.expectedResult), len(infos))
|
|
||||||
|
|
||||||
lo.ForEach(infos, func(info *NodeChannelInfo, _ int) {
|
|
||||||
expectedChannels, ok := test.expectedResult[info.NodeID]
|
|
||||||
s.True(ok)
|
|
||||||
|
|
||||||
gotChannels := lo.Keys(info.Channels)
|
|
||||||
s.ElementsMatch(expectedChannels, gotChannels)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StateChannelStoreSuite) TestUpdateWithTxnLimit() {
|
|
||||||
tests := []struct {
|
|
||||||
description string
|
|
||||||
inOpCount int
|
|
||||||
outTxnCount int
|
|
||||||
}{
|
|
||||||
{"operations count < maxPerTxn", maxOperationsPerTxn - 1, 1},
|
|
||||||
{"operations count = maxPerTxn", maxOperationsPerTxn, 1},
|
|
||||||
{"operations count > maxPerTxn", maxOperationsPerTxn + 1, 2},
|
|
||||||
{"operations count = 2*maxPerTxn", maxOperationsPerTxn * 2, 2},
|
|
||||||
{"operations count = 2*maxPerTxn+1", maxOperationsPerTxn*2 + 1, 3},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
s.SetupTest()
|
|
||||||
s.Run(test.description, func() {
|
|
||||||
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
|
|
||||||
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
|
|
||||||
log.Info("test save and remove", zap.Any("saves", saves), zap.Any("removals", removals))
|
|
||||||
}).Return(nil).Times(test.outTxnCount)
|
|
||||||
|
|
||||||
store := NewStateChannelStore(s.mockTxn)
|
|
||||||
store.AddNode(1)
|
|
||||||
s.Require().ElementsMatch([]int64{1}, store.GetNodes())
|
|
||||||
s.Require().Equal(0, store.GetNodeChannelCount(1))
|
|
||||||
|
|
||||||
// Get operations
|
|
||||||
ops := genChannelOperations(1, Watch, test.inOpCount)
|
|
||||||
err := store.Update(ops)
|
|
||||||
s.NoError(err)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StateChannelStoreSuite) TestUpdateMeta2000kSegs() {
|
|
||||||
ch := getChannel("ch1", 1)
|
|
||||||
info := ch.GetWatchInfo()
|
|
||||||
// way larger than limit=2097152
|
|
||||||
seg2000k := make([]int64, 2000000)
|
|
||||||
for i := range seg2000k {
|
|
||||||
seg2000k[i] = int64(i)
|
|
||||||
}
|
|
||||||
info.Vchan.FlushedSegmentIds = seg2000k
|
|
||||||
ch.UpdateWatchInfo(info)
|
|
||||||
|
|
||||||
opSet := NewChannelOpSet(
|
|
||||||
NewChannelOp(bufferID, Delete, ch),
|
|
||||||
NewChannelOp(100, Watch, ch),
|
|
||||||
)
|
|
||||||
s.SetupTest()
|
|
||||||
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
|
|
||||||
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
|
|
||||||
}).Return(nil).Once()
|
|
||||||
|
|
||||||
store := NewStateChannelStore(s.mockTxn)
|
|
||||||
store.AddNode(100)
|
|
||||||
s.Require().Equal(0, store.GetNodeChannelCount(100))
|
|
||||||
store.addAssignment(bufferID, ch)
|
|
||||||
s.Require().Equal(1, store.GetNodeChannelCount(bufferID))
|
|
||||||
|
|
||||||
err := store.updateMeta(opSet)
|
|
||||||
s.NoError(err)
|
|
||||||
|
|
||||||
got := store.GetNodeChannelsBy(WithNodeIDs(100))
|
|
||||||
s.NotNil(got)
|
|
||||||
s.Require().Equal(1, len(got))
|
|
||||||
gotInfo := got[0]
|
|
||||||
s.ElementsMatch([]string{"ch1"}, lo.Keys(gotInfo.Channels))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StateChannelStoreSuite) TestUpdateMeta() {
|
|
||||||
tests := []struct {
|
|
||||||
description string
|
|
||||||
|
|
||||||
opSet *ChannelOpSet
|
|
||||||
nodeIDs []int64
|
|
||||||
channels []*StateChannel
|
|
||||||
assignments map[int64][]string
|
|
||||||
|
|
||||||
outAssignments map[int64][]string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"delete_watch_ch1 from bufferID to nodeID=100",
|
|
||||||
NewChannelOpSet(
|
|
||||||
NewChannelOp(bufferID, Delete, getChannel("ch1", 1)),
|
|
||||||
NewChannelOp(100, Watch, getChannel("ch1", 1)),
|
|
||||||
),
|
|
||||||
[]int64{bufferID, 100},
|
|
||||||
[]*StateChannel{getChannel("ch1", 1)},
|
|
||||||
map[int64][]string{
|
|
||||||
bufferID: {"ch1"},
|
|
||||||
},
|
|
||||||
map[int64][]string{
|
|
||||||
100: {"ch1"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"delete_watch_ch1 from lagecyID=99 to nodeID=100",
|
|
||||||
NewChannelOpSet(
|
|
||||||
NewChannelOp(99, Delete, getChannel("ch1", 1)),
|
|
||||||
NewChannelOp(100, Watch, getChannel("ch1", 1)),
|
|
||||||
),
|
|
||||||
[]int64{bufferID, 99, 100},
|
|
||||||
[]*StateChannel{getChannel("ch1", 1)},
|
|
||||||
map[int64][]string{
|
|
||||||
99: {"ch1"},
|
|
||||||
},
|
|
||||||
map[int64][]string{
|
|
||||||
100: {"ch1"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"release from nodeID=100",
|
|
||||||
NewChannelOpSet(
|
|
||||||
NewChannelOp(100, Release, getChannel("ch1", 1)),
|
|
||||||
),
|
|
||||||
[]int64{bufferID, 100},
|
|
||||||
[]*StateChannel{getChannel("ch1", 1)},
|
|
||||||
map[int64][]string{
|
|
||||||
100: {"ch1"},
|
|
||||||
},
|
|
||||||
map[int64][]string{
|
|
||||||
100: {"ch1"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"watch a new channel from nodeID=100",
|
|
||||||
NewChannelOpSet(
|
|
||||||
NewChannelOp(100, Watch, getChannel("ch1", 1)),
|
|
||||||
),
|
|
||||||
[]int64{bufferID, 100},
|
|
||||||
[]*StateChannel{getChannel("ch1", 1)},
|
|
||||||
map[int64][]string{
|
|
||||||
100: {"ch1"},
|
|
||||||
},
|
|
||||||
map[int64][]string{
|
|
||||||
100: {"ch1"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"Delete remove a channelfrom nodeID=100",
|
|
||||||
NewChannelOpSet(
|
|
||||||
NewChannelOp(100, Delete, getChannel("ch1", 1)),
|
|
||||||
),
|
|
||||||
[]int64{bufferID, 100},
|
|
||||||
[]*StateChannel{getChannel("ch1", 1)},
|
|
||||||
map[int64][]string{
|
|
||||||
100: {"ch1"},
|
|
||||||
},
|
|
||||||
map[int64][]string{
|
|
||||||
100: {},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
s.SetupTest()
|
|
||||||
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
|
|
||||||
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
|
|
||||||
}).Return(nil).Times(len(tests))
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
s.Run(test.description, func() {
|
|
||||||
store := NewStateChannelStore(s.mockTxn)
|
|
||||||
|
|
||||||
lo.ForEach(test.nodeIDs, func(nodeID int64, _ int) {
|
|
||||||
store.AddNode(nodeID)
|
|
||||||
s.Require().Equal(0, store.GetNodeChannelCount(nodeID))
|
|
||||||
})
|
|
||||||
c := make(map[string]*StateChannel)
|
|
||||||
lo.ForEach(test.channels, func(ch *StateChannel, _ int) { c[ch.GetName()] = ch })
|
|
||||||
for nodeID, channels := range test.assignments {
|
|
||||||
lo.ForEach(channels, func(ch string, _ int) {
|
|
||||||
store.addAssignment(nodeID, c[ch])
|
|
||||||
})
|
|
||||||
s.Require().Equal(1, store.GetNodeChannelCount(nodeID))
|
|
||||||
}
|
|
||||||
|
|
||||||
err := store.updateMeta(test.opSet)
|
|
||||||
s.NoError(err)
|
|
||||||
|
|
||||||
for nodeID, channels := range test.outAssignments {
|
|
||||||
got := store.GetNodeChannelsBy(WithNodeIDs(nodeID))
|
|
||||||
s.NotNil(got)
|
|
||||||
s.Require().Equal(1, len(got))
|
|
||||||
info := got[0]
|
|
||||||
s.ElementsMatch(channels, lo.Keys(info.Channels))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StateChannelStoreSuite) TestUpdateState() {
|
|
||||||
tests := []struct {
|
|
||||||
description string
|
|
||||||
|
|
||||||
inSuccess bool
|
|
||||||
inChannelState ChannelState
|
|
||||||
outChannelState ChannelState
|
|
||||||
}{
|
|
||||||
{"input standby, fail", false, Standby, Standby},
|
|
||||||
{"input standby, success", true, Standby, ToWatch},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
s.Run(test.description, func() {
|
|
||||||
store := NewStateChannelStore(s.mockTxn)
|
|
||||||
|
|
||||||
ch := "ch-1"
|
|
||||||
channel := NewStateChannel(getChannel(ch, 1))
|
|
||||||
channel.setState(test.inChannelState)
|
|
||||||
store.channelsInfo[1] = &NodeChannelInfo{
|
|
||||||
NodeID: bufferID,
|
|
||||||
Channels: map[string]RWChannel{
|
|
||||||
ch: channel,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
store.UpdateState(test.inSuccess, channel)
|
|
||||||
s.Equal(test.outChannelState, channel.currentState)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StateChannelStoreSuite) TestReload() {
|
|
||||||
type item struct {
|
|
||||||
nodeID int64
|
|
||||||
channelName string
|
|
||||||
}
|
|
||||||
type testCase struct {
|
|
||||||
tag string
|
|
||||||
items []item
|
|
||||||
expect map[int64]int
|
|
||||||
}
|
|
||||||
|
|
||||||
cases := []testCase{
|
|
||||||
{
|
|
||||||
tag: "empty",
|
|
||||||
items: []item{},
|
|
||||||
expect: map[int64]int{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
tag: "normal",
|
|
||||||
items: []item{
|
|
||||||
{nodeID: 1, channelName: "dml1_v0"},
|
|
||||||
{nodeID: 1, channelName: "dml2_v1"},
|
|
||||||
{nodeID: 2, channelName: "dml3_v0"},
|
|
||||||
},
|
|
||||||
expect: map[int64]int{1: 2, 2: 1},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
tag: "buffer",
|
|
||||||
items: []item{
|
|
||||||
{nodeID: bufferID, channelName: "dml1_v0"},
|
|
||||||
},
|
|
||||||
expect: map[int64]int{bufferID: 1},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range cases {
|
|
||||||
s.Run(tc.tag, func() {
|
|
||||||
s.mockTxn.ExpectedCalls = nil
|
|
||||||
|
|
||||||
var keys, values []string
|
|
||||||
for _, item := range tc.items {
|
|
||||||
keys = append(keys, fmt.Sprintf("channel_store/%d/%s", item.nodeID, item.channelName))
|
|
||||||
info := generateWatchInfo(item.channelName, datapb.ChannelWatchState_WatchSuccess)
|
|
||||||
bs, err := proto.Marshal(info)
|
|
||||||
s.Require().NoError(err)
|
|
||||||
values = append(values, string(bs))
|
|
||||||
}
|
|
||||||
s.mockTxn.EXPECT().LoadWithPrefix(mock.AnythingOfType("string")).Return(keys, values, nil)
|
|
||||||
|
|
||||||
store := NewStateChannelStore(s.mockTxn)
|
|
||||||
err := store.Reload()
|
|
||||||
s.Require().NoError(err)
|
|
||||||
|
|
||||||
for nodeID, expect := range tc.expect {
|
|
||||||
s.MetricsEqual(metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)), float64(expect))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func genChannelOperations(nodeID int64, opType ChannelOpType, num int) *ChannelOpSet {
|
|
||||||
channels := make([]RWChannel, 0, num)
|
|
||||||
for i := 0; i < num; i++ {
|
|
||||||
name := fmt.Sprintf("ch%d", i)
|
|
||||||
channel := NewStateChannel(getChannel(name, 1))
|
|
||||||
channel.Info = generateWatchInfo(name, datapb.ChannelWatchState_ToWatch)
|
|
||||||
channels = append(channels, channel)
|
|
||||||
}
|
|
||||||
|
|
||||||
ops := NewChannelOpSet(NewChannelOp(nodeID, opType, channels...))
|
|
||||||
return ops
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user