diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager.go similarity index 100% rename from internal/datacoord/channel_manager_v2.go rename to internal/datacoord/channel_manager.go diff --git a/internal/datacoord/channel_manager_v2_test.go b/internal/datacoord/channel_manager_test.go similarity index 100% rename from internal/datacoord/channel_manager_v2_test.go rename to internal/datacoord/channel_manager_test.go diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 0ea4e5daae..3de73b8a03 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -269,15 +269,6 @@ func (c *ChannelOpSet) MarshalLogArray(enc zapcore.ArrayEncoder) error { return nil } -// ChannelStore must satisfy RWChannelStore. -var _ RWChannelStore = (*ChannelStore)(nil) - -// ChannelStore maintains a mapping between channels and data nodes. -type ChannelStore struct { - store kv.TxnKV // A kv store with (NodeChannelKey) -> (ChannelWatchInfos) information. - channelsInfo map[int64]*NodeChannelInfo // A map of (nodeID) -> (NodeChannelInfo). -} - // NodeChannelInfo stores the nodeID and its channels. type NodeChannelInfo struct { NodeID int64 @@ -315,261 +306,6 @@ func (info *NodeChannelInfo) GetChannels() []RWChannel { return lo.Values(info.Channels) } -// NewChannelStore creates and returns a new ChannelStore. -func NewChannelStore(kv kv.TxnKV) *ChannelStore { - c := &ChannelStore{ - store: kv, - channelsInfo: make(map[int64]*NodeChannelInfo), - } - c.channelsInfo[bufferID] = &NodeChannelInfo{ - NodeID: bufferID, - Channels: make(map[string]RWChannel), - } - return c -} - -// Reload restores the buffer channels and node-channels mapping from kv. -func (c *ChannelStore) Reload() error { - record := timerecord.NewTimeRecorder("datacoord") - keys, values, err := c.store.LoadWithPrefix(Params.CommonCfg.DataCoordWatchSubPath.GetValue()) - if err != nil { - return err - } - for i := 0; i < len(keys); i++ { - k := keys[i] - v := values[i] - nodeID, err := parseNodeKey(k) - if err != nil { - return err - } - - cw := &datapb.ChannelWatchInfo{} - if err := proto.Unmarshal([]byte(v), cw); err != nil { - return err - } - reviseVChannelInfo(cw.GetVchan()) - - c.AddNode(nodeID) - channel := &channelMeta{ - Name: cw.GetVchan().GetChannelName(), - CollectionID: cw.GetVchan().GetCollectionID(), - Schema: cw.GetSchema(), - WatchInfo: cw, - } - c.channelsInfo[nodeID].AddChannel(channel) - - log.Info("channel store reload channel", - zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name)) - metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels))) - } - log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan())) - return nil -} - -// AddNode creates a new node-channels mapping for the given node, and assigns no channels to it. -// Returns immediately if the node's already in the channel. -func (c *ChannelStore) AddNode(nodeID int64) { - if _, ok := c.channelsInfo[nodeID]; ok { - return - } - - c.channelsInfo[nodeID] = NewNodeChannelInfo(nodeID) -} - -// Update applies the channel operations in opSet. -func (c *ChannelStore) Update(opSet *ChannelOpSet) error { - totalChannelNum := opSet.GetChannelNumber() - if totalChannelNum <= maxOperationsPerTxn { - return c.update(opSet) - } - - // Split opset into multiple txn. Operations on the same channel must be executed in one txn. - perChOps := opSet.SplitByChannel() - - // Execute a txn for every 64 operations. - count := 0 - operations := make([]*ChannelOp, 0, maxOperationsPerTxn) - for _, opset := range perChOps { - if count+opset.Len() > maxOperationsPerTxn { - if err := c.update(NewChannelOpSet(operations...)); err != nil { - return err - } - count = 0 - operations = make([]*ChannelOp, 0, maxOperationsPerTxn) - } - count += opset.Len() - operations = append(operations, opset.Collect()...) - } - if count == 0 { - return nil - } - return c.update(NewChannelOpSet(operations...)) -} - -func (c *ChannelStore) checkIfExist(nodeID int64, channel RWChannel) bool { - if info, ok := c.channelsInfo[nodeID]; ok { - if ch, ok := info.Channels[channel.GetName()]; ok { - return ch.GetCollectionID() == channel.GetCollectionID() - } - } - return false -} - -// update applies the ADD/DELETE operations to the current channel store. -func (c *ChannelStore) update(opSet *ChannelOpSet) error { - // Update ChannelStore's kv store. - if err := c.txn(opSet); err != nil { - return err - } - - // Update node id -> channel mapping. - for _, op := range opSet.Collect() { - switch op.Type { - case Add, Watch, Release: - for _, ch := range op.Channels { - if c.checkIfExist(op.NodeID, ch) { - continue // prevent adding duplicated channel info - } - // Append target channels to channel store. - c.channelsInfo[op.NodeID].AddChannel(ch) - } - case Delete: - info := c.channelsInfo[op.NodeID] - for _, channelName := range op.GetChannelNames() { - info.RemoveChannel(channelName) - } - default: - return errUnknownOpType - } - metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(op.NodeID, 10)).Set(float64(len(c.channelsInfo[op.NodeID].Channels))) - } - return nil -} - -// GetChannels returns information of all channels. -func (c *ChannelStore) GetChannels() []*NodeChannelInfo { - ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo)) - for _, info := range c.channelsInfo { - ret = append(ret, info) - } - return ret -} - -// GetNodesChannels returns the channels assigned to real nodes. -func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo { - ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo)) - for id, info := range c.channelsInfo { - if id != bufferID { - ret = append(ret, info) - } - } - return ret -} - -func (c *ChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { - nodeChs := make(map[UniqueID][]string) - for id, info := range c.channelsInfo { - if id == bufferID { - continue - } - var channelNames []string - for name, ch := range info.Channels { - if ch.GetCollectionID() == collectionID { - channelNames = append(channelNames, name) - } - } - nodeChs[id] = channelNames - } - return nodeChs -} - -// GetBufferChannelInfo returns all unassigned channels. -func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo { - if info, ok := c.channelsInfo[bufferID]; ok { - return info - } - return nil -} - -// GetNode returns the channel info of a given node. -func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo { - if info, ok := c.channelsInfo[nodeID]; ok { - return info - } - return nil -} - -func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int { - if info, ok := c.channelsInfo[nodeID]; ok { - return len(info.Channels) - } - return 0 -} - -// RemoveNode removes the given node from the channel store and returns its channels. -func (c *ChannelStore) RemoveNode(nodeID int64) { - delete(c.channelsInfo, nodeID) -} - -// GetNodes returns a slice of all nodes ids in the current channel store. -func (c *ChannelStore) GetNodes() []int64 { - ids := make([]int64, 0, len(c.channelsInfo)) - for id := range c.channelsInfo { - if id != bufferID { - ids = append(ids, id) - } - } - return ids -} - -// remove deletes kv pairs from the kv store where keys have given nodeID as prefix. -func (c *ChannelStore) remove(nodeID int64) error { - k := buildKeyPrefix(nodeID) - return c.store.RemoveWithPrefix(k) -} - -// txn updates the channelStore's kv store with the given channel ops. -func (c *ChannelStore) txn(opSet *ChannelOpSet) error { - var ( - saves = make(map[string]string) - removals []string - ) - for _, op := range opSet.Collect() { - opSaves, opRemovals, err := op.BuildKV() - if err != nil { - return err - } - - saves = lo.Assign(opSaves, saves) - removals = append(removals, opRemovals...) - } - return c.store.MultiSaveAndRemove(saves, removals) -} - -func (c *ChannelStore) HasChannel(channel string) bool { - for _, info := range c.channelsInfo { - for _, ch := range info.Channels { - if ch.GetName() == channel { - return true - } - } - } - return false -} - -func (c *ChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { - log.Error("ChannelStore doesn't implement GetNodeChannelsBy") - return nil -} - -func (c *ChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) { - log.Error("ChannelStore doesn't implement UpdateState") -} - -func (c *ChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) { - log.Error("ChannelStore doesn't implement SetLegacyChannelByNode") -} - // buildNodeChannelKey generates a key for kv store, where the key is a concatenation of ChannelWatchSubPath, nodeID and channel name. // ${WatchSubPath}/${nodeID}/${channelName} func buildNodeChannelKey(nodeID int64, chName string) string { @@ -589,3 +325,437 @@ func parseNodeKey(key string) (int64, error) { } return strconv.ParseInt(s[len(s)-2], 10, 64) } + +type StateChannelStore struct { + store kv.TxnKV + channelsInfo map[int64]*NodeChannelInfo // A map of (nodeID) -> (NodeChannelInfo). +} + +var _ RWChannelStore = (*StateChannelStore)(nil) + +var errChannelNotExistInNode = errors.New("channel doesn't exist in given node") + +func NewChannelStoreV2(kv kv.TxnKV) RWChannelStore { + return NewStateChannelStore(kv) +} + +func NewStateChannelStore(kv kv.TxnKV) *StateChannelStore { + c := StateChannelStore{ + store: kv, + channelsInfo: make(map[int64]*NodeChannelInfo), + } + c.channelsInfo[bufferID] = &NodeChannelInfo{ + NodeID: bufferID, + Channels: make(map[string]RWChannel), + } + return &c +} + +func (c *StateChannelStore) Reload() error { + record := timerecord.NewTimeRecorder("datacoord") + keys, values, err := c.store.LoadWithPrefix(Params.CommonCfg.DataCoordWatchSubPath.GetValue()) + if err != nil { + return err + } + for i := 0; i < len(keys); i++ { + k := keys[i] + v := values[i] + nodeID, err := parseNodeKey(k) + if err != nil { + return err + } + + info := &datapb.ChannelWatchInfo{} + if err := proto.Unmarshal([]byte(v), info); err != nil { + return err + } + reviseVChannelInfo(info.GetVchan()) + + c.AddNode(nodeID) + + channel := NewStateChannelByWatchInfo(nodeID, info) + c.channelsInfo[nodeID].AddChannel(channel) + log.Info("channel store reload channel", + zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name)) + metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels))) + } + log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan())) + return nil +} + +func (c *StateChannelStore) AddNode(nodeID int64) { + if _, ok := c.channelsInfo[nodeID]; ok { + return + } + c.channelsInfo[nodeID] = &NodeChannelInfo{ + NodeID: nodeID, + Channels: make(map[string]RWChannel), + } +} + +func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) { + lo.ForEach(channels, func(ch RWChannel, _ int) { + for _, cInfo := range c.channelsInfo { + if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok { + if isSuccessful { + stateChannel.(*StateChannel).TransitionOnSuccess() + } else { + stateChannel.(*StateChannel).TransitionOnFailure() + } + } + } + }) +} + +func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) { + lo.ForEach(nodeIDs, func(nodeID int64, _ int) { + if cInfo, ok := c.channelsInfo[nodeID]; ok { + for _, ch := range cInfo.Channels { + ch.(*StateChannel).setState(Legacy) + } + } + }) +} + +func (c *StateChannelStore) Update(opSet *ChannelOpSet) error { + // Split opset into multiple txn. Operations on the same channel must be executed in one txn. + perChOps := opSet.SplitByChannel() + + // Execute a txn for every 64 operations. + count := 0 + operations := make([]*ChannelOp, 0, maxOperationsPerTxn) + for _, opset := range perChOps { + if !c.sanityCheckPerChannelOpSet(opset) { + log.Error("unsupported ChannelOpSet", zap.Any("OpSet", opset)) + continue + } + if opset.Len() > maxOperationsPerTxn { + log.Error("Operations for one channel exceeds maxOperationsPerTxn", + zap.Any("opset size", opset.Len()), + zap.Int("limit", maxOperationsPerTxn)) + } + if count+opset.Len() > maxOperationsPerTxn { + if err := c.updateMeta(NewChannelOpSet(operations...)); err != nil { + return err + } + count = 0 + operations = make([]*ChannelOp, 0, maxOperationsPerTxn) + } + count += opset.Len() + operations = append(operations, opset.Collect()...) + } + if count == 0 { + return nil + } + + return c.updateMeta(NewChannelOpSet(operations...)) +} + +// remove from the assignments +func (c *StateChannelStore) removeAssignment(nodeID int64, channelName string) { + if cInfo, ok := c.channelsInfo[nodeID]; ok { + delete(cInfo.Channels, channelName) + } +} + +func (c *StateChannelStore) addAssignment(nodeID int64, channel RWChannel) { + if cInfo, ok := c.channelsInfo[nodeID]; ok { + cInfo.Channels[channel.GetName()] = channel + } else { + c.channelsInfo[nodeID] = &NodeChannelInfo{ + NodeID: nodeID, + Channels: map[string]RWChannel{ + channel.GetName(): channel, + }, + } + } +} + +// updateMeta applies the WATCH/RELEASE/DELETE operations to the current channel store. +// DELETE + WATCH ---> from bufferID to nodeID +// DELETE + WATCH ---> from lagecyID to nodeID +// DELETE + WATCH ---> from deletedNode to nodeID/bufferID +// DELETE + WATCH ---> from releasedNode to nodeID/bufferID +// RELEASE ---> release from nodeID +// WATCH ---> watch to a new channel +// DELETE ---> remove the channel +func (c *StateChannelStore) sanityCheckPerChannelOpSet(opSet *ChannelOpSet) bool { + if opSet.Len() == 2 { + ops := opSet.Collect() + return (ops[0].Type == Delete && ops[1].Type == Watch) || (ops[1].Type == Delete && ops[0].Type == Watch) + } else if opSet.Len() == 1 { + t := opSet.Collect()[0].Type + return t == Delete || t == Watch || t == Release + } + return false +} + +// DELETE + WATCH +func (c *StateChannelStore) updateMetaMemoryForPairOp(chName string, opSet *ChannelOpSet) error { + if !c.sanityCheckPerChannelOpSet(opSet) { + return errUnknownOpType + } + ops := opSet.Collect() + op1 := ops[1] + op2 := ops[0] + if ops[0].Type == Delete { + op1 = ops[0] + op2 = ops[1] + } + cInfo, ok := c.channelsInfo[op1.NodeID] + if !ok { + return errChannelNotExistInNode + } + var ch *StateChannel + if channel, ok := cInfo.Channels[chName]; ok { + ch = channel.(*StateChannel) + c.addAssignment(op2.NodeID, ch) + c.removeAssignment(op1.NodeID, chName) + } else { + if cInfo, ok = c.channelsInfo[op2.NodeID]; ok { + if channel2, ok := cInfo.Channels[chName]; ok { + ch = channel2.(*StateChannel) + } + } + } + // update channel + if ch != nil { + ch.Assign(op2.NodeID) + if op2.NodeID == bufferID { + ch.setState(Standby) + } else { + ch.setState(ToWatch) + } + } + return nil +} + +func (c *StateChannelStore) getChannel(nodeID int64, channelName string) *StateChannel { + if cInfo, ok := c.channelsInfo[nodeID]; ok { + if storedChannel, ok := cInfo.Channels[channelName]; ok { + return storedChannel.(*StateChannel) + } + log.Debug("Channel doesn't exist in Node", zap.String("channel", channelName), zap.Int64("nodeID", nodeID)) + } else { + log.Error("Node doesn't exist", zap.Int64("NodeID", nodeID)) + } + return nil +} + +func (c *StateChannelStore) updateMetaMemoryForSingleOp(op *ChannelOp) error { + lo.ForEach(op.Channels, func(ch RWChannel, _ int) { + switch op.Type { + case Release: // release an already exsits storedChannel-node pair + if channel := c.getChannel(op.NodeID, ch.GetName()); channel != nil { + channel.setState(ToRelease) + } + case Watch: + storedChannel := c.getChannel(op.NodeID, ch.GetName()) + if storedChannel == nil { // New Channel + // set the correct assigment and state for NEW stateChannel + newChannel := NewStateChannel(ch) + newChannel.Assign(op.NodeID) + + if op.NodeID != bufferID { + newChannel.setState(ToWatch) + } + + // add channel to memory + c.addAssignment(op.NodeID, newChannel) + } else { // assign to the original nodes + storedChannel.setState(ToWatch) + } + case Delete: // Remove Channel + // if not Delete from bufferID, remove from channel + if op.NodeID != bufferID { + c.removeAssignment(op.NodeID, ch.GetName()) + } + default: + log.Error("unknown opType in updateMetaMemoryForSingleOp", zap.Any("type", op.Type)) + } + }) + return nil +} + +func (c *StateChannelStore) updateMeta(opSet *ChannelOpSet) error { + // Update ChannelStore's kv store. + if err := c.txn(opSet); err != nil { + return err + } + + // Update memory + chOpSet := opSet.SplitByChannel() + for chName, ops := range chOpSet { + // DELETE + WATCH + if ops.Len() == 2 { + c.updateMetaMemoryForPairOp(chName, ops) + // RELEASE, DELETE, WATCH + } else if ops.Len() == 1 { + c.updateMetaMemoryForSingleOp(ops.Collect()[0]) + } else { + log.Error("unsupported ChannelOpSet", zap.Any("OpSet", ops)) + } + } + return nil +} + +// txn updates the channelStore's kv store with the given channel ops. +func (c *StateChannelStore) txn(opSet *ChannelOpSet) error { + var ( + saves = make(map[string]string) + removals []string + ) + for _, op := range opSet.Collect() { + opSaves, opRemovals, err := op.BuildKV() + if err != nil { + return err + } + + saves = lo.Assign(opSaves, saves) + removals = append(removals, opRemovals...) + } + return c.store.MultiSaveAndRemove(saves, removals) +} + +func (c *StateChannelStore) RemoveNode(nodeID int64) { + delete(c.channelsInfo, nodeID) +} + +func (c *StateChannelStore) HasChannel(channel string) bool { + for _, info := range c.channelsInfo { + if _, ok := info.Channels[channel]; ok { + return true + } + } + return false +} + +type ( + ChannelSelector func(ch *StateChannel) bool + NodeSelector func(ID int64) bool +) + +func WithAllNodes() NodeSelector { + return func(ID int64) bool { + return true + } +} + +func WithoutBufferNode() NodeSelector { + return func(ID int64) bool { + return ID != int64(bufferID) + } +} + +func WithNodeIDs(IDs ...int64) NodeSelector { + return func(ID int64) bool { + return lo.Contains(IDs, ID) + } +} + +func WithoutNodeIDs(IDs ...int64) NodeSelector { + return func(ID int64) bool { + return !lo.Contains(IDs, ID) + } +} + +func WithChannelName(channel string) ChannelSelector { + return func(ch *StateChannel) bool { + return ch.GetName() == channel + } +} + +func WithCollectionIDV2(collectionID int64) ChannelSelector { + return func(ch *StateChannel) bool { + return ch.GetCollectionID() == collectionID + } +} + +func WithChannelStates(states ...ChannelState) ChannelSelector { + return func(ch *StateChannel) bool { + return lo.Contains(states, ch.currentState) + } +} + +func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { + var nodeChannels []*NodeChannelInfo + for nodeID, cInfo := range c.channelsInfo { + if nodeSelector(nodeID) { + selected := make(map[string]RWChannel) + for chName, channel := range cInfo.Channels { + var sel bool = true + for _, selector := range channelSelectors { + if !selector(channel.(*StateChannel)) { + sel = false + break + } + } + if sel { + selected[chName] = channel + } + } + nodeChannels = append(nodeChannels, &NodeChannelInfo{ + NodeID: nodeID, + Channels: selected, + }) + } + } + return nodeChannels +} + +func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo { + ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo)) + for id, info := range c.channelsInfo { + if id != bufferID { + ret = append(ret, info) + } + } + return ret +} + +func (c *StateChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { + nodeChs := make(map[UniqueID][]string) + for id, info := range c.channelsInfo { + if id == bufferID { + continue + } + var channelNames []string + for name, ch := range info.Channels { + if ch.GetCollectionID() == collectionID { + channelNames = append(channelNames, name) + } + } + nodeChs[id] = channelNames + } + return nodeChs +} + +func (c *StateChannelStore) GetBufferChannelInfo() *NodeChannelInfo { + return c.GetNode(bufferID) +} + +func (c *StateChannelStore) GetNode(nodeID int64) *NodeChannelInfo { + if info, ok := c.channelsInfo[nodeID]; ok { + return info + } + return nil +} + +func (c *StateChannelStore) GetNodeChannelCount(nodeID int64) int { + if cInfo, ok := c.channelsInfo[nodeID]; ok { + return len(cInfo.Channels) + } + return 0 +} + +func (c *StateChannelStore) GetNodes() []int64 { + return lo.Filter(lo.Keys(c.channelsInfo), func(ID int64, _ int) bool { + return ID != bufferID + }) +} + +// remove deletes kv pairs from the kv store where keys have given nodeID as prefix. +func (c *StateChannelStore) remove(nodeID int64) error { + k := buildKeyPrefix(nodeID) + return c.store.RemoveWithPrefix(k) +} diff --git a/internal/datacoord/channel_store_test.go b/internal/datacoord/channel_store_test.go index 22a93d68a3..501d4a9d74 100644 --- a/internal/datacoord/channel_store_test.go +++ b/internal/datacoord/channel_store_test.go @@ -1,19 +1,3 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package datacoord import ( @@ -22,26 +6,34 @@ import ( "testing" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/kv/predicates" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/testutils" ) -type ChannelStoreReloadSuite struct { +func TestStateChannelStore(t *testing.T) { + suite.Run(t, new(StateChannelStoreSuite)) +} + +type StateChannelStoreSuite struct { testutils.PromMetricsSuite mockTxn *mocks.TxnKV } -func (suite *ChannelStoreReloadSuite) SetupTest() { - suite.mockTxn = mocks.NewTxnKV(suite.T()) +func (s *StateChannelStoreSuite) SetupTest() { + s.mockTxn = mocks.NewTxnKV(s.T()) } -func (suite *ChannelStoreReloadSuite) generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.ChannelWatchInfo { +func generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.ChannelWatchInfo { return &datapb.ChannelWatchInfo{ Vchan: &datapb.VchannelInfo{ ChannelName: name, @@ -50,7 +42,410 @@ func (suite *ChannelStoreReloadSuite) generateWatchInfo(name string, state datap } } -func (suite *ChannelStoreReloadSuite) TestReload() { +func (s *StateChannelStoreSuite) createChannelInfo(nodeID int64, channels ...RWChannel) *NodeChannelInfo { + cInfo := &NodeChannelInfo{ + NodeID: nodeID, + Channels: make(map[string]RWChannel), + } + for _, channel := range channels { + cInfo.Channels[channel.GetName()] = channel + } + return cInfo +} + +func (s *StateChannelStoreSuite) TestGetNodeChannelsBy() { + nodes := []int64{bufferID, 100, 101, 102} + nodesExcludeBufferID := []int64{100, 101, 102} + channels := []*StateChannel{ + getChannel("ch1", 1), + getChannel("ch2", 1), + getChannel("ch3", 1), + getChannel("ch4", 1), + getChannel("ch5", 1), + getChannel("ch6", 1), + getChannel("ch7", 1), + } + + channelsInfo := map[int64]*NodeChannelInfo{ + bufferID: s.createChannelInfo(bufferID, channels[0]), + 100: s.createChannelInfo(100, channels[1], channels[2]), + 101: s.createChannelInfo(101, channels[3], channels[4]), + 102: s.createChannelInfo(102, channels[5], channels[6]), // legacy nodes + } + + store := NewStateChannelStore(s.mockTxn) + lo.ForEach(nodes, func(nodeID int64, _ int) { store.AddNode(nodeID) }) + store.channelsInfo = channelsInfo + lo.ForEach(channels, func(ch *StateChannel, _ int) { + if ch.GetName() == "ch6" || ch.GetName() == "ch7" { + ch.setState(Legacy) + } + s.Require().True(store.HasChannel(ch.GetName())) + }) + s.Require().ElementsMatch(nodesExcludeBufferID, store.GetNodes()) + store.SetLegacyChannelByNode(102) + + s.Run("test AddNode RemoveNode", func() { + var nodeID int64 = 19530 + _, ok := store.channelsInfo[nodeID] + s.Require().False(ok) + store.AddNode(nodeID) + _, ok = store.channelsInfo[nodeID] + s.True(ok) + + store.RemoveNode(nodeID) + _, ok = store.channelsInfo[nodeID] + s.False(ok) + }) + + s.Run("test GetNodeChannels", func() { + infos := store.GetNodesChannels() + expectedResults := map[int64][]string{ + 100: {"ch2", "ch3"}, + 101: {"ch4", "ch5"}, + 102: {"ch6", "ch7"}, + } + + s.Equal(3, len(infos)) + + lo.ForEach(infos, func(info *NodeChannelInfo, _ int) { + expectedChannels, ok := expectedResults[info.NodeID] + s.True(ok) + + gotChannels := lo.Keys(info.Channels) + s.ElementsMatch(expectedChannels, gotChannels) + }) + }) + + s.Run("test GetBufferChannelInfo", func() { + info := store.GetBufferChannelInfo() + s.NotNil(info) + + gotChannels := lo.Keys(info.Channels) + s.ElementsMatch([]string{"ch1"}, gotChannels) + }) + + s.Run("test GetNode", func() { + info := store.GetNode(19530) + s.Nil(info) + + info = store.GetNode(bufferID) + s.NotNil(info) + + gotChannels := lo.Keys(info.Channels) + s.ElementsMatch([]string{"ch1"}, gotChannels) + }) + + tests := []struct { + description string + nodeSelector NodeSelector + channelSelectors []ChannelSelector + + expectedResult map[int64][]string + }{ + {"test withnodeIDs bufferID", WithNodeIDs(bufferID), nil, map[int64][]string{bufferID: {"ch1"}}}, + {"test withnodeIDs 100", WithNodeIDs(100), nil, map[int64][]string{100: {"ch2", "ch3"}}}, + {"test withnodeIDs 101 102", WithNodeIDs(101, 102), nil, map[int64][]string{ + 101: {"ch4", "ch5"}, + 102: {"ch6", "ch7"}, + }}, + {"test withAllNodes", WithAllNodes(), nil, map[int64][]string{ + bufferID: {"ch1"}, + 100: {"ch2", "ch3"}, + 101: {"ch4", "ch5"}, + 102: {"ch6", "ch7"}, + }}, + {"test WithoutBufferNode", WithoutBufferNode(), nil, map[int64][]string{ + 100: {"ch2", "ch3"}, + 101: {"ch4", "ch5"}, + 102: {"ch6", "ch7"}, + }}, + {"test WithoutNodeIDs 100, 101", WithoutNodeIDs(100, 101), nil, map[int64][]string{ + bufferID: {"ch1"}, + 102: {"ch6", "ch7"}, + }}, + { + "test WithChannelName ch1", WithNodeIDs(bufferID), + []ChannelSelector{WithChannelName("ch1")}, + map[int64][]string{ + bufferID: {"ch1"}, + }, + }, + { + "test WithChannelName ch1, collectionID 1", WithNodeIDs(100), + []ChannelSelector{ + WithChannelName("ch2"), + WithCollectionIDV2(1), + }, + map[int64][]string{100: {"ch2"}}, + }, + { + "test WithCollectionID 1", WithAllNodes(), + []ChannelSelector{ + WithCollectionIDV2(1), + }, + map[int64][]string{ + bufferID: {"ch1"}, + 100: {"ch2", "ch3"}, + 101: {"ch4", "ch5"}, + 102: {"ch6", "ch7"}, + }, + }, + { + "test WithChannelState", WithNodeIDs(102), + []ChannelSelector{ + WithChannelStates(Legacy), + }, + map[int64][]string{ + 102: {"ch6", "ch7"}, + }, + }, + } + + for _, test := range tests { + s.Run(test.description, func() { + if test.channelSelectors == nil { + test.channelSelectors = []ChannelSelector{} + } + + infos := store.GetNodeChannelsBy(test.nodeSelector, test.channelSelectors...) + log.Info("got test infos", zap.Any("infos", infos)) + s.Equal(len(test.expectedResult), len(infos)) + + lo.ForEach(infos, func(info *NodeChannelInfo, _ int) { + expectedChannels, ok := test.expectedResult[info.NodeID] + s.True(ok) + + gotChannels := lo.Keys(info.Channels) + s.ElementsMatch(expectedChannels, gotChannels) + }) + }) + } +} + +func (s *StateChannelStoreSuite) TestUpdateWithTxnLimit() { + tests := []struct { + description string + inOpCount int + outTxnCount int + }{ + {"operations count < maxPerTxn", maxOperationsPerTxn - 1, 1}, + {"operations count = maxPerTxn", maxOperationsPerTxn, 1}, + {"operations count > maxPerTxn", maxOperationsPerTxn + 1, 2}, + {"operations count = 2*maxPerTxn", maxOperationsPerTxn * 2, 2}, + {"operations count = 2*maxPerTxn+1", maxOperationsPerTxn*2 + 1, 3}, + } + + for _, test := range tests { + s.SetupTest() + s.Run(test.description, func() { + s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything). + Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) { + log.Info("test save and remove", zap.Any("saves", saves), zap.Any("removals", removals)) + }).Return(nil).Times(test.outTxnCount) + + store := NewStateChannelStore(s.mockTxn) + store.AddNode(1) + s.Require().ElementsMatch([]int64{1}, store.GetNodes()) + s.Require().Equal(0, store.GetNodeChannelCount(1)) + + // Get operations + ops := genChannelOperations(1, Watch, test.inOpCount) + err := store.Update(ops) + s.NoError(err) + }) + } +} + +func (s *StateChannelStoreSuite) TestUpdateMeta2000kSegs() { + ch := getChannel("ch1", 1) + info := ch.GetWatchInfo() + // way larger than limit=2097152 + seg2000k := make([]int64, 2000000) + for i := range seg2000k { + seg2000k[i] = int64(i) + } + info.Vchan.FlushedSegmentIds = seg2000k + ch.UpdateWatchInfo(info) + + opSet := NewChannelOpSet( + NewChannelOp(bufferID, Delete, ch), + NewChannelOp(100, Watch, ch), + ) + s.SetupTest() + s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything). + Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) { + }).Return(nil).Once() + + store := NewStateChannelStore(s.mockTxn) + store.AddNode(100) + s.Require().Equal(0, store.GetNodeChannelCount(100)) + store.addAssignment(bufferID, ch) + s.Require().Equal(1, store.GetNodeChannelCount(bufferID)) + + err := store.updateMeta(opSet) + s.NoError(err) + + got := store.GetNodeChannelsBy(WithNodeIDs(100)) + s.NotNil(got) + s.Require().Equal(1, len(got)) + gotInfo := got[0] + s.ElementsMatch([]string{"ch1"}, lo.Keys(gotInfo.Channels)) +} + +func (s *StateChannelStoreSuite) TestUpdateMeta() { + tests := []struct { + description string + + opSet *ChannelOpSet + nodeIDs []int64 + channels []*StateChannel + assignments map[int64][]string + + outAssignments map[int64][]string + }{ + { + "delete_watch_ch1 from bufferID to nodeID=100", + NewChannelOpSet( + NewChannelOp(bufferID, Delete, getChannel("ch1", 1)), + NewChannelOp(100, Watch, getChannel("ch1", 1)), + ), + []int64{bufferID, 100}, + []*StateChannel{getChannel("ch1", 1)}, + map[int64][]string{ + bufferID: {"ch1"}, + }, + map[int64][]string{ + 100: {"ch1"}, + }, + }, + { + "delete_watch_ch1 from lagecyID=99 to nodeID=100", + NewChannelOpSet( + NewChannelOp(99, Delete, getChannel("ch1", 1)), + NewChannelOp(100, Watch, getChannel("ch1", 1)), + ), + []int64{bufferID, 99, 100}, + []*StateChannel{getChannel("ch1", 1)}, + map[int64][]string{ + 99: {"ch1"}, + }, + map[int64][]string{ + 100: {"ch1"}, + }, + }, + { + "release from nodeID=100", + NewChannelOpSet( + NewChannelOp(100, Release, getChannel("ch1", 1)), + ), + []int64{bufferID, 100}, + []*StateChannel{getChannel("ch1", 1)}, + map[int64][]string{ + 100: {"ch1"}, + }, + map[int64][]string{ + 100: {"ch1"}, + }, + }, + { + "watch a new channel from nodeID=100", + NewChannelOpSet( + NewChannelOp(100, Watch, getChannel("ch1", 1)), + ), + []int64{bufferID, 100}, + []*StateChannel{getChannel("ch1", 1)}, + map[int64][]string{ + 100: {"ch1"}, + }, + map[int64][]string{ + 100: {"ch1"}, + }, + }, + { + "Delete remove a channelfrom nodeID=100", + NewChannelOpSet( + NewChannelOp(100, Delete, getChannel("ch1", 1)), + ), + []int64{bufferID, 100}, + []*StateChannel{getChannel("ch1", 1)}, + map[int64][]string{ + 100: {"ch1"}, + }, + map[int64][]string{ + 100: {}, + }, + }, + } + s.SetupTest() + s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything). + Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) { + }).Return(nil).Times(len(tests)) + + for _, test := range tests { + s.Run(test.description, func() { + store := NewStateChannelStore(s.mockTxn) + + lo.ForEach(test.nodeIDs, func(nodeID int64, _ int) { + store.AddNode(nodeID) + s.Require().Equal(0, store.GetNodeChannelCount(nodeID)) + }) + c := make(map[string]*StateChannel) + lo.ForEach(test.channels, func(ch *StateChannel, _ int) { c[ch.GetName()] = ch }) + for nodeID, channels := range test.assignments { + lo.ForEach(channels, func(ch string, _ int) { + store.addAssignment(nodeID, c[ch]) + }) + s.Require().Equal(1, store.GetNodeChannelCount(nodeID)) + } + + err := store.updateMeta(test.opSet) + s.NoError(err) + + for nodeID, channels := range test.outAssignments { + got := store.GetNodeChannelsBy(WithNodeIDs(nodeID)) + s.NotNil(got) + s.Require().Equal(1, len(got)) + info := got[0] + s.ElementsMatch(channels, lo.Keys(info.Channels)) + } + }) + } +} + +func (s *StateChannelStoreSuite) TestUpdateState() { + tests := []struct { + description string + + inSuccess bool + inChannelState ChannelState + outChannelState ChannelState + }{ + {"input standby, fail", false, Standby, Standby}, + {"input standby, success", true, Standby, ToWatch}, + } + + for _, test := range tests { + s.Run(test.description, func() { + store := NewStateChannelStore(s.mockTxn) + + ch := "ch-1" + channel := NewStateChannel(getChannel(ch, 1)) + channel.setState(test.inChannelState) + store.channelsInfo[1] = &NodeChannelInfo{ + NodeID: bufferID, + Channels: map[string]RWChannel{ + ch: channel, + }, + } + + store.UpdateState(test.inSuccess, channel) + s.Equal(test.outChannelState, channel.currentState) + }) + } +} + +func (s *StateChannelStoreSuite) TestReload() { type item struct { nodeID int64 channelName string @@ -86,30 +481,39 @@ func (suite *ChannelStoreReloadSuite) TestReload() { } for _, tc := range cases { - suite.Run(tc.tag, func() { - suite.mockTxn.ExpectedCalls = nil + s.Run(tc.tag, func() { + s.mockTxn.ExpectedCalls = nil var keys, values []string for _, item := range tc.items { keys = append(keys, fmt.Sprintf("channel_store/%d/%s", item.nodeID, item.channelName)) - info := suite.generateWatchInfo(item.channelName, datapb.ChannelWatchState_WatchSuccess) + info := generateWatchInfo(item.channelName, datapb.ChannelWatchState_WatchSuccess) bs, err := proto.Marshal(info) - suite.Require().NoError(err) + s.Require().NoError(err) values = append(values, string(bs)) } - suite.mockTxn.EXPECT().LoadWithPrefix(mock.AnythingOfType("string")).Return(keys, values, nil) + s.mockTxn.EXPECT().LoadWithPrefix(mock.AnythingOfType("string")).Return(keys, values, nil) - store := NewChannelStore(suite.mockTxn) + store := NewStateChannelStore(s.mockTxn) err := store.Reload() - suite.Require().NoError(err) + s.Require().NoError(err) for nodeID, expect := range tc.expect { - suite.MetricsEqual(metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)), float64(expect)) + s.MetricsEqual(metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)), float64(expect)) } }) } } -func TestChannelStore(t *testing.T) { - suite.Run(t, new(ChannelStoreReloadSuite)) +func genChannelOperations(nodeID int64, opType ChannelOpType, num int) *ChannelOpSet { + channels := make([]RWChannel, 0, num) + for i := 0; i < num; i++ { + name := fmt.Sprintf("ch%d", i) + channel := NewStateChannel(getChannel(name, 1)) + channel.Info = generateWatchInfo(name, datapb.ChannelWatchState_ToWatch) + channels = append(channels, channel) + } + + ops := NewChannelOpSet(NewChannelOp(nodeID, opType, channels...)) + return ops } diff --git a/internal/datacoord/channel_store_v2.go b/internal/datacoord/channel_store_v2.go deleted file mode 100644 index b91d114d4d..0000000000 --- a/internal/datacoord/channel_store_v2.go +++ /dev/null @@ -1,450 +0,0 @@ -package datacoord - -import ( - "strconv" - - "github.com/cockroachdb/errors" - "github.com/golang/protobuf/proto" - "github.com/samber/lo" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/pkg/kv" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/util/timerecord" -) - -type StateChannelStore struct { - store kv.TxnKV - channelsInfo map[int64]*NodeChannelInfo // A map of (nodeID) -> (NodeChannelInfo). -} - -var _ RWChannelStore = (*StateChannelStore)(nil) - -var errChannelNotExistInNode = errors.New("channel doesn't exist in given node") - -func NewChannelStoreV2(kv kv.TxnKV) RWChannelStore { - return NewStateChannelStore(kv) -} - -func NewStateChannelStore(kv kv.TxnKV) *StateChannelStore { - c := StateChannelStore{ - store: kv, - channelsInfo: make(map[int64]*NodeChannelInfo), - } - c.channelsInfo[bufferID] = &NodeChannelInfo{ - NodeID: bufferID, - Channels: make(map[string]RWChannel), - } - return &c -} - -func (c *StateChannelStore) Reload() error { - record := timerecord.NewTimeRecorder("datacoord") - keys, values, err := c.store.LoadWithPrefix(Params.CommonCfg.DataCoordWatchSubPath.GetValue()) - if err != nil { - return err - } - for i := 0; i < len(keys); i++ { - k := keys[i] - v := values[i] - nodeID, err := parseNodeKey(k) - if err != nil { - return err - } - - info := &datapb.ChannelWatchInfo{} - if err := proto.Unmarshal([]byte(v), info); err != nil { - return err - } - reviseVChannelInfo(info.GetVchan()) - - c.AddNode(nodeID) - - channel := NewStateChannelByWatchInfo(nodeID, info) - c.channelsInfo[nodeID].AddChannel(channel) - log.Info("channel store reload channel", - zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name)) - metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels))) - } - log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan())) - return nil -} - -func (c *StateChannelStore) AddNode(nodeID int64) { - if _, ok := c.channelsInfo[nodeID]; ok { - return - } - c.channelsInfo[nodeID] = &NodeChannelInfo{ - NodeID: nodeID, - Channels: make(map[string]RWChannel), - } -} - -func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) { - lo.ForEach(channels, func(ch RWChannel, _ int) { - for _, cInfo := range c.channelsInfo { - if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok { - if isSuccessful { - stateChannel.(*StateChannel).TransitionOnSuccess() - } else { - stateChannel.(*StateChannel).TransitionOnFailure() - } - } - } - }) -} - -func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) { - lo.ForEach(nodeIDs, func(nodeID int64, _ int) { - if cInfo, ok := c.channelsInfo[nodeID]; ok { - for _, ch := range cInfo.Channels { - ch.(*StateChannel).setState(Legacy) - } - } - }) -} - -func (c *StateChannelStore) Update(opSet *ChannelOpSet) error { - // Split opset into multiple txn. Operations on the same channel must be executed in one txn. - perChOps := opSet.SplitByChannel() - - // Execute a txn for every 64 operations. - count := 0 - operations := make([]*ChannelOp, 0, maxOperationsPerTxn) - for _, opset := range perChOps { - if !c.sanityCheckPerChannelOpSet(opset) { - log.Error("unsupported ChannelOpSet", zap.Any("OpSet", opset)) - continue - } - if opset.Len() > maxOperationsPerTxn { - log.Error("Operations for one channel exceeds maxOperationsPerTxn", - zap.Any("opset size", opset.Len()), - zap.Int("limit", maxOperationsPerTxn)) - } - if count+opset.Len() > maxOperationsPerTxn { - if err := c.updateMeta(NewChannelOpSet(operations...)); err != nil { - return err - } - count = 0 - operations = make([]*ChannelOp, 0, maxOperationsPerTxn) - } - count += opset.Len() - operations = append(operations, opset.Collect()...) - } - if count == 0 { - return nil - } - - return c.updateMeta(NewChannelOpSet(operations...)) -} - -// remove from the assignments -func (c *StateChannelStore) removeAssignment(nodeID int64, channelName string) { - if cInfo, ok := c.channelsInfo[nodeID]; ok { - delete(cInfo.Channels, channelName) - } -} - -func (c *StateChannelStore) addAssignment(nodeID int64, channel RWChannel) { - if cInfo, ok := c.channelsInfo[nodeID]; ok { - cInfo.Channels[channel.GetName()] = channel - } else { - c.channelsInfo[nodeID] = &NodeChannelInfo{ - NodeID: nodeID, - Channels: map[string]RWChannel{ - channel.GetName(): channel, - }, - } - } -} - -// updateMeta applies the WATCH/RELEASE/DELETE operations to the current channel store. -// DELETE + WATCH ---> from bufferID to nodeID -// DELETE + WATCH ---> from lagecyID to nodeID -// DELETE + WATCH ---> from deletedNode to nodeID/bufferID -// DELETE + WATCH ---> from releasedNode to nodeID/bufferID -// RELEASE ---> release from nodeID -// WATCH ---> watch to a new channel -// DELETE ---> remove the channel -func (c *StateChannelStore) sanityCheckPerChannelOpSet(opSet *ChannelOpSet) bool { - if opSet.Len() == 2 { - ops := opSet.Collect() - return (ops[0].Type == Delete && ops[1].Type == Watch) || (ops[1].Type == Delete && ops[0].Type == Watch) - } else if opSet.Len() == 1 { - t := opSet.Collect()[0].Type - return t == Delete || t == Watch || t == Release - } - return false -} - -// DELETE + WATCH -func (c *StateChannelStore) updateMetaMemoryForPairOp(chName string, opSet *ChannelOpSet) error { - if !c.sanityCheckPerChannelOpSet(opSet) { - return errUnknownOpType - } - ops := opSet.Collect() - op1 := ops[1] - op2 := ops[0] - if ops[0].Type == Delete { - op1 = ops[0] - op2 = ops[1] - } - cInfo, ok := c.channelsInfo[op1.NodeID] - if !ok { - return errChannelNotExistInNode - } - var ch *StateChannel - if channel, ok := cInfo.Channels[chName]; ok { - ch = channel.(*StateChannel) - c.addAssignment(op2.NodeID, ch) - c.removeAssignment(op1.NodeID, chName) - } else { - if cInfo, ok = c.channelsInfo[op2.NodeID]; ok { - if channel2, ok := cInfo.Channels[chName]; ok { - ch = channel2.(*StateChannel) - } - } - } - // update channel - if ch != nil { - ch.Assign(op2.NodeID) - if op2.NodeID == bufferID { - ch.setState(Standby) - } else { - ch.setState(ToWatch) - } - } - return nil -} - -func (c *StateChannelStore) getChannel(nodeID int64, channelName string) *StateChannel { - if cInfo, ok := c.channelsInfo[nodeID]; ok { - if storedChannel, ok := cInfo.Channels[channelName]; ok { - return storedChannel.(*StateChannel) - } - log.Debug("Channel doesn't exist in Node", zap.String("channel", channelName), zap.Int64("nodeID", nodeID)) - } else { - log.Error("Node doesn't exist", zap.Int64("NodeID", nodeID)) - } - return nil -} - -func (c *StateChannelStore) updateMetaMemoryForSingleOp(op *ChannelOp) error { - lo.ForEach(op.Channels, func(ch RWChannel, _ int) { - switch op.Type { - case Release: // release an already exsits storedChannel-node pair - if channel := c.getChannel(op.NodeID, ch.GetName()); channel != nil { - channel.setState(ToRelease) - } - case Watch: - storedChannel := c.getChannel(op.NodeID, ch.GetName()) - if storedChannel == nil { // New Channel - // set the correct assigment and state for NEW stateChannel - newChannel := NewStateChannel(ch) - newChannel.Assign(op.NodeID) - - if op.NodeID != bufferID { - newChannel.setState(ToWatch) - } - - // add channel to memory - c.addAssignment(op.NodeID, newChannel) - } else { // assign to the original nodes - storedChannel.setState(ToWatch) - } - case Delete: // Remove Channel - // if not Delete from bufferID, remove from channel - if op.NodeID != bufferID { - c.removeAssignment(op.NodeID, ch.GetName()) - } - default: - log.Error("unknown opType in updateMetaMemoryForSingleOp", zap.Any("type", op.Type)) - } - }) - return nil -} - -func (c *StateChannelStore) updateMeta(opSet *ChannelOpSet) error { - // Update ChannelStore's kv store. - if err := c.txn(opSet); err != nil { - return err - } - - // Update memory - chOpSet := opSet.SplitByChannel() - for chName, ops := range chOpSet { - // DELETE + WATCH - if ops.Len() == 2 { - c.updateMetaMemoryForPairOp(chName, ops) - // RELEASE, DELETE, WATCH - } else if ops.Len() == 1 { - c.updateMetaMemoryForSingleOp(ops.Collect()[0]) - } else { - log.Error("unsupported ChannelOpSet", zap.Any("OpSet", ops)) - } - } - return nil -} - -// txn updates the channelStore's kv store with the given channel ops. -func (c *StateChannelStore) txn(opSet *ChannelOpSet) error { - var ( - saves = make(map[string]string) - removals []string - ) - for _, op := range opSet.Collect() { - opSaves, opRemovals, err := op.BuildKV() - if err != nil { - return err - } - - saves = lo.Assign(opSaves, saves) - removals = append(removals, opRemovals...) - } - return c.store.MultiSaveAndRemove(saves, removals) -} - -func (c *StateChannelStore) RemoveNode(nodeID int64) { - delete(c.channelsInfo, nodeID) -} - -func (c *StateChannelStore) HasChannel(channel string) bool { - for _, info := range c.channelsInfo { - if _, ok := info.Channels[channel]; ok { - return true - } - } - return false -} - -type ( - ChannelSelector func(ch *StateChannel) bool - NodeSelector func(ID int64) bool -) - -func WithAllNodes() NodeSelector { - return func(ID int64) bool { - return true - } -} - -func WithoutBufferNode() NodeSelector { - return func(ID int64) bool { - return ID != int64(bufferID) - } -} - -func WithNodeIDs(IDs ...int64) NodeSelector { - return func(ID int64) bool { - return lo.Contains(IDs, ID) - } -} - -func WithoutNodeIDs(IDs ...int64) NodeSelector { - return func(ID int64) bool { - return !lo.Contains(IDs, ID) - } -} - -func WithChannelName(channel string) ChannelSelector { - return func(ch *StateChannel) bool { - return ch.GetName() == channel - } -} - -func WithCollectionIDV2(collectionID int64) ChannelSelector { - return func(ch *StateChannel) bool { - return ch.GetCollectionID() == collectionID - } -} - -func WithChannelStates(states ...ChannelState) ChannelSelector { - return func(ch *StateChannel) bool { - return lo.Contains(states, ch.currentState) - } -} - -func (c *StateChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { - var nodeChannels []*NodeChannelInfo - for nodeID, cInfo := range c.channelsInfo { - if nodeSelector(nodeID) { - selected := make(map[string]RWChannel) - for chName, channel := range cInfo.Channels { - var sel bool = true - for _, selector := range channelSelectors { - if !selector(channel.(*StateChannel)) { - sel = false - break - } - } - if sel { - selected[chName] = channel - } - } - nodeChannels = append(nodeChannels, &NodeChannelInfo{ - NodeID: nodeID, - Channels: selected, - }) - } - } - return nodeChannels -} - -func (c *StateChannelStore) GetNodesChannels() []*NodeChannelInfo { - ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo)) - for id, info := range c.channelsInfo { - if id != bufferID { - ret = append(ret, info) - } - } - return ret -} - -func (c *StateChannelStore) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { - nodeChs := make(map[UniqueID][]string) - for id, info := range c.channelsInfo { - if id == bufferID { - continue - } - var channelNames []string - for name, ch := range info.Channels { - if ch.GetCollectionID() == collectionID { - channelNames = append(channelNames, name) - } - } - nodeChs[id] = channelNames - } - return nodeChs -} - -func (c *StateChannelStore) GetBufferChannelInfo() *NodeChannelInfo { - return c.GetNode(bufferID) -} - -func (c *StateChannelStore) GetNode(nodeID int64) *NodeChannelInfo { - if info, ok := c.channelsInfo[nodeID]; ok { - return info - } - return nil -} - -func (c *StateChannelStore) GetNodeChannelCount(nodeID int64) int { - if cInfo, ok := c.channelsInfo[nodeID]; ok { - return len(cInfo.Channels) - } - return 0 -} - -func (c *StateChannelStore) GetNodes() []int64 { - return lo.Filter(lo.Keys(c.channelsInfo), func(ID int64, _ int) bool { - return ID != bufferID - }) -} - -// remove deletes kv pairs from the kv store where keys have given nodeID as prefix. -func (c *StateChannelStore) remove(nodeID int64) error { - k := buildKeyPrefix(nodeID) - return c.store.RemoveWithPrefix(k) -} diff --git a/internal/datacoord/channel_store_v2_test.go b/internal/datacoord/channel_store_v2_test.go deleted file mode 100644 index 501d4a9d74..0000000000 --- a/internal/datacoord/channel_store_v2_test.go +++ /dev/null @@ -1,519 +0,0 @@ -package datacoord - -import ( - "fmt" - "strconv" - "testing" - - "github.com/golang/protobuf/proto" - "github.com/samber/lo" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/kv/mocks" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/pkg/kv/predicates" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/util/testutils" -) - -func TestStateChannelStore(t *testing.T) { - suite.Run(t, new(StateChannelStoreSuite)) -} - -type StateChannelStoreSuite struct { - testutils.PromMetricsSuite - - mockTxn *mocks.TxnKV -} - -func (s *StateChannelStoreSuite) SetupTest() { - s.mockTxn = mocks.NewTxnKV(s.T()) -} - -func generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.ChannelWatchInfo { - return &datapb.ChannelWatchInfo{ - Vchan: &datapb.VchannelInfo{ - ChannelName: name, - }, - State: state, - } -} - -func (s *StateChannelStoreSuite) createChannelInfo(nodeID int64, channels ...RWChannel) *NodeChannelInfo { - cInfo := &NodeChannelInfo{ - NodeID: nodeID, - Channels: make(map[string]RWChannel), - } - for _, channel := range channels { - cInfo.Channels[channel.GetName()] = channel - } - return cInfo -} - -func (s *StateChannelStoreSuite) TestGetNodeChannelsBy() { - nodes := []int64{bufferID, 100, 101, 102} - nodesExcludeBufferID := []int64{100, 101, 102} - channels := []*StateChannel{ - getChannel("ch1", 1), - getChannel("ch2", 1), - getChannel("ch3", 1), - getChannel("ch4", 1), - getChannel("ch5", 1), - getChannel("ch6", 1), - getChannel("ch7", 1), - } - - channelsInfo := map[int64]*NodeChannelInfo{ - bufferID: s.createChannelInfo(bufferID, channels[0]), - 100: s.createChannelInfo(100, channels[1], channels[2]), - 101: s.createChannelInfo(101, channels[3], channels[4]), - 102: s.createChannelInfo(102, channels[5], channels[6]), // legacy nodes - } - - store := NewStateChannelStore(s.mockTxn) - lo.ForEach(nodes, func(nodeID int64, _ int) { store.AddNode(nodeID) }) - store.channelsInfo = channelsInfo - lo.ForEach(channels, func(ch *StateChannel, _ int) { - if ch.GetName() == "ch6" || ch.GetName() == "ch7" { - ch.setState(Legacy) - } - s.Require().True(store.HasChannel(ch.GetName())) - }) - s.Require().ElementsMatch(nodesExcludeBufferID, store.GetNodes()) - store.SetLegacyChannelByNode(102) - - s.Run("test AddNode RemoveNode", func() { - var nodeID int64 = 19530 - _, ok := store.channelsInfo[nodeID] - s.Require().False(ok) - store.AddNode(nodeID) - _, ok = store.channelsInfo[nodeID] - s.True(ok) - - store.RemoveNode(nodeID) - _, ok = store.channelsInfo[nodeID] - s.False(ok) - }) - - s.Run("test GetNodeChannels", func() { - infos := store.GetNodesChannels() - expectedResults := map[int64][]string{ - 100: {"ch2", "ch3"}, - 101: {"ch4", "ch5"}, - 102: {"ch6", "ch7"}, - } - - s.Equal(3, len(infos)) - - lo.ForEach(infos, func(info *NodeChannelInfo, _ int) { - expectedChannels, ok := expectedResults[info.NodeID] - s.True(ok) - - gotChannels := lo.Keys(info.Channels) - s.ElementsMatch(expectedChannels, gotChannels) - }) - }) - - s.Run("test GetBufferChannelInfo", func() { - info := store.GetBufferChannelInfo() - s.NotNil(info) - - gotChannels := lo.Keys(info.Channels) - s.ElementsMatch([]string{"ch1"}, gotChannels) - }) - - s.Run("test GetNode", func() { - info := store.GetNode(19530) - s.Nil(info) - - info = store.GetNode(bufferID) - s.NotNil(info) - - gotChannels := lo.Keys(info.Channels) - s.ElementsMatch([]string{"ch1"}, gotChannels) - }) - - tests := []struct { - description string - nodeSelector NodeSelector - channelSelectors []ChannelSelector - - expectedResult map[int64][]string - }{ - {"test withnodeIDs bufferID", WithNodeIDs(bufferID), nil, map[int64][]string{bufferID: {"ch1"}}}, - {"test withnodeIDs 100", WithNodeIDs(100), nil, map[int64][]string{100: {"ch2", "ch3"}}}, - {"test withnodeIDs 101 102", WithNodeIDs(101, 102), nil, map[int64][]string{ - 101: {"ch4", "ch5"}, - 102: {"ch6", "ch7"}, - }}, - {"test withAllNodes", WithAllNodes(), nil, map[int64][]string{ - bufferID: {"ch1"}, - 100: {"ch2", "ch3"}, - 101: {"ch4", "ch5"}, - 102: {"ch6", "ch7"}, - }}, - {"test WithoutBufferNode", WithoutBufferNode(), nil, map[int64][]string{ - 100: {"ch2", "ch3"}, - 101: {"ch4", "ch5"}, - 102: {"ch6", "ch7"}, - }}, - {"test WithoutNodeIDs 100, 101", WithoutNodeIDs(100, 101), nil, map[int64][]string{ - bufferID: {"ch1"}, - 102: {"ch6", "ch7"}, - }}, - { - "test WithChannelName ch1", WithNodeIDs(bufferID), - []ChannelSelector{WithChannelName("ch1")}, - map[int64][]string{ - bufferID: {"ch1"}, - }, - }, - { - "test WithChannelName ch1, collectionID 1", WithNodeIDs(100), - []ChannelSelector{ - WithChannelName("ch2"), - WithCollectionIDV2(1), - }, - map[int64][]string{100: {"ch2"}}, - }, - { - "test WithCollectionID 1", WithAllNodes(), - []ChannelSelector{ - WithCollectionIDV2(1), - }, - map[int64][]string{ - bufferID: {"ch1"}, - 100: {"ch2", "ch3"}, - 101: {"ch4", "ch5"}, - 102: {"ch6", "ch7"}, - }, - }, - { - "test WithChannelState", WithNodeIDs(102), - []ChannelSelector{ - WithChannelStates(Legacy), - }, - map[int64][]string{ - 102: {"ch6", "ch7"}, - }, - }, - } - - for _, test := range tests { - s.Run(test.description, func() { - if test.channelSelectors == nil { - test.channelSelectors = []ChannelSelector{} - } - - infos := store.GetNodeChannelsBy(test.nodeSelector, test.channelSelectors...) - log.Info("got test infos", zap.Any("infos", infos)) - s.Equal(len(test.expectedResult), len(infos)) - - lo.ForEach(infos, func(info *NodeChannelInfo, _ int) { - expectedChannels, ok := test.expectedResult[info.NodeID] - s.True(ok) - - gotChannels := lo.Keys(info.Channels) - s.ElementsMatch(expectedChannels, gotChannels) - }) - }) - } -} - -func (s *StateChannelStoreSuite) TestUpdateWithTxnLimit() { - tests := []struct { - description string - inOpCount int - outTxnCount int - }{ - {"operations count < maxPerTxn", maxOperationsPerTxn - 1, 1}, - {"operations count = maxPerTxn", maxOperationsPerTxn, 1}, - {"operations count > maxPerTxn", maxOperationsPerTxn + 1, 2}, - {"operations count = 2*maxPerTxn", maxOperationsPerTxn * 2, 2}, - {"operations count = 2*maxPerTxn+1", maxOperationsPerTxn*2 + 1, 3}, - } - - for _, test := range tests { - s.SetupTest() - s.Run(test.description, func() { - s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything). - Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) { - log.Info("test save and remove", zap.Any("saves", saves), zap.Any("removals", removals)) - }).Return(nil).Times(test.outTxnCount) - - store := NewStateChannelStore(s.mockTxn) - store.AddNode(1) - s.Require().ElementsMatch([]int64{1}, store.GetNodes()) - s.Require().Equal(0, store.GetNodeChannelCount(1)) - - // Get operations - ops := genChannelOperations(1, Watch, test.inOpCount) - err := store.Update(ops) - s.NoError(err) - }) - } -} - -func (s *StateChannelStoreSuite) TestUpdateMeta2000kSegs() { - ch := getChannel("ch1", 1) - info := ch.GetWatchInfo() - // way larger than limit=2097152 - seg2000k := make([]int64, 2000000) - for i := range seg2000k { - seg2000k[i] = int64(i) - } - info.Vchan.FlushedSegmentIds = seg2000k - ch.UpdateWatchInfo(info) - - opSet := NewChannelOpSet( - NewChannelOp(bufferID, Delete, ch), - NewChannelOp(100, Watch, ch), - ) - s.SetupTest() - s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything). - Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) { - }).Return(nil).Once() - - store := NewStateChannelStore(s.mockTxn) - store.AddNode(100) - s.Require().Equal(0, store.GetNodeChannelCount(100)) - store.addAssignment(bufferID, ch) - s.Require().Equal(1, store.GetNodeChannelCount(bufferID)) - - err := store.updateMeta(opSet) - s.NoError(err) - - got := store.GetNodeChannelsBy(WithNodeIDs(100)) - s.NotNil(got) - s.Require().Equal(1, len(got)) - gotInfo := got[0] - s.ElementsMatch([]string{"ch1"}, lo.Keys(gotInfo.Channels)) -} - -func (s *StateChannelStoreSuite) TestUpdateMeta() { - tests := []struct { - description string - - opSet *ChannelOpSet - nodeIDs []int64 - channels []*StateChannel - assignments map[int64][]string - - outAssignments map[int64][]string - }{ - { - "delete_watch_ch1 from bufferID to nodeID=100", - NewChannelOpSet( - NewChannelOp(bufferID, Delete, getChannel("ch1", 1)), - NewChannelOp(100, Watch, getChannel("ch1", 1)), - ), - []int64{bufferID, 100}, - []*StateChannel{getChannel("ch1", 1)}, - map[int64][]string{ - bufferID: {"ch1"}, - }, - map[int64][]string{ - 100: {"ch1"}, - }, - }, - { - "delete_watch_ch1 from lagecyID=99 to nodeID=100", - NewChannelOpSet( - NewChannelOp(99, Delete, getChannel("ch1", 1)), - NewChannelOp(100, Watch, getChannel("ch1", 1)), - ), - []int64{bufferID, 99, 100}, - []*StateChannel{getChannel("ch1", 1)}, - map[int64][]string{ - 99: {"ch1"}, - }, - map[int64][]string{ - 100: {"ch1"}, - }, - }, - { - "release from nodeID=100", - NewChannelOpSet( - NewChannelOp(100, Release, getChannel("ch1", 1)), - ), - []int64{bufferID, 100}, - []*StateChannel{getChannel("ch1", 1)}, - map[int64][]string{ - 100: {"ch1"}, - }, - map[int64][]string{ - 100: {"ch1"}, - }, - }, - { - "watch a new channel from nodeID=100", - NewChannelOpSet( - NewChannelOp(100, Watch, getChannel("ch1", 1)), - ), - []int64{bufferID, 100}, - []*StateChannel{getChannel("ch1", 1)}, - map[int64][]string{ - 100: {"ch1"}, - }, - map[int64][]string{ - 100: {"ch1"}, - }, - }, - { - "Delete remove a channelfrom nodeID=100", - NewChannelOpSet( - NewChannelOp(100, Delete, getChannel("ch1", 1)), - ), - []int64{bufferID, 100}, - []*StateChannel{getChannel("ch1", 1)}, - map[int64][]string{ - 100: {"ch1"}, - }, - map[int64][]string{ - 100: {}, - }, - }, - } - s.SetupTest() - s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything). - Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) { - }).Return(nil).Times(len(tests)) - - for _, test := range tests { - s.Run(test.description, func() { - store := NewStateChannelStore(s.mockTxn) - - lo.ForEach(test.nodeIDs, func(nodeID int64, _ int) { - store.AddNode(nodeID) - s.Require().Equal(0, store.GetNodeChannelCount(nodeID)) - }) - c := make(map[string]*StateChannel) - lo.ForEach(test.channels, func(ch *StateChannel, _ int) { c[ch.GetName()] = ch }) - for nodeID, channels := range test.assignments { - lo.ForEach(channels, func(ch string, _ int) { - store.addAssignment(nodeID, c[ch]) - }) - s.Require().Equal(1, store.GetNodeChannelCount(nodeID)) - } - - err := store.updateMeta(test.opSet) - s.NoError(err) - - for nodeID, channels := range test.outAssignments { - got := store.GetNodeChannelsBy(WithNodeIDs(nodeID)) - s.NotNil(got) - s.Require().Equal(1, len(got)) - info := got[0] - s.ElementsMatch(channels, lo.Keys(info.Channels)) - } - }) - } -} - -func (s *StateChannelStoreSuite) TestUpdateState() { - tests := []struct { - description string - - inSuccess bool - inChannelState ChannelState - outChannelState ChannelState - }{ - {"input standby, fail", false, Standby, Standby}, - {"input standby, success", true, Standby, ToWatch}, - } - - for _, test := range tests { - s.Run(test.description, func() { - store := NewStateChannelStore(s.mockTxn) - - ch := "ch-1" - channel := NewStateChannel(getChannel(ch, 1)) - channel.setState(test.inChannelState) - store.channelsInfo[1] = &NodeChannelInfo{ - NodeID: bufferID, - Channels: map[string]RWChannel{ - ch: channel, - }, - } - - store.UpdateState(test.inSuccess, channel) - s.Equal(test.outChannelState, channel.currentState) - }) - } -} - -func (s *StateChannelStoreSuite) TestReload() { - type item struct { - nodeID int64 - channelName string - } - type testCase struct { - tag string - items []item - expect map[int64]int - } - - cases := []testCase{ - { - tag: "empty", - items: []item{}, - expect: map[int64]int{}, - }, - { - tag: "normal", - items: []item{ - {nodeID: 1, channelName: "dml1_v0"}, - {nodeID: 1, channelName: "dml2_v1"}, - {nodeID: 2, channelName: "dml3_v0"}, - }, - expect: map[int64]int{1: 2, 2: 1}, - }, - { - tag: "buffer", - items: []item{ - {nodeID: bufferID, channelName: "dml1_v0"}, - }, - expect: map[int64]int{bufferID: 1}, - }, - } - - for _, tc := range cases { - s.Run(tc.tag, func() { - s.mockTxn.ExpectedCalls = nil - - var keys, values []string - for _, item := range tc.items { - keys = append(keys, fmt.Sprintf("channel_store/%d/%s", item.nodeID, item.channelName)) - info := generateWatchInfo(item.channelName, datapb.ChannelWatchState_WatchSuccess) - bs, err := proto.Marshal(info) - s.Require().NoError(err) - values = append(values, string(bs)) - } - s.mockTxn.EXPECT().LoadWithPrefix(mock.AnythingOfType("string")).Return(keys, values, nil) - - store := NewStateChannelStore(s.mockTxn) - err := store.Reload() - s.Require().NoError(err) - - for nodeID, expect := range tc.expect { - s.MetricsEqual(metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)), float64(expect)) - } - }) - } -} - -func genChannelOperations(nodeID int64, opType ChannelOpType, num int) *ChannelOpSet { - channels := make([]RWChannel, 0, num) - for i := 0; i < num; i++ { - name := fmt.Sprintf("ch%d", i) - channel := NewStateChannel(getChannel(name, 1)) - channel.Info = generateWatchInfo(name, datapb.ChannelWatchState_ToWatch) - channels = append(channels, channel) - } - - ops := NewChannelOpSet(NewChannelOp(nodeID, opType, channels...)) - return ops -}