diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 3dca44910e..0ae1b4abc1 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -47,8 +47,8 @@ type ChannelManager struct { } type channel struct { - name string - collectionID UniqueID + Name string + CollectionID UniqueID } // ChannelManagerOpt is to set optional parameters in channel manager @@ -250,7 +250,7 @@ func (c *ChannelManager) Watch(ch *channel) error { func (c *ChannelManager) fillChannelPosition(update *ChannelOp) { for _, ch := range update.Channels { - vchan := c.posProvider.GetVChanPositions(ch.name, ch.collectionID, true) + vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, true) info := &datapb.ChannelWatchInfo{ Vchan: vchan, StartTs: time.Now().Unix(), @@ -287,7 +287,7 @@ func (c *ChannelManager) Match(nodeID int64, channel string) bool { } for _, ch := range info.Channels { - if ch.name == channel { + if ch.Name == channel { return true } } diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index a335a22c06..3c2e7de832 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -151,8 +151,8 @@ func (c *ChannelStore) Reload() error { c.Add(nodeID) channel := &channel{ - name: temp.GetVchan().GetChannelName(), - collectionID: temp.GetVchan().GetCollectionID(), + Name: temp.GetVchan().GetChannelName(), + CollectionID: temp.GetVchan().GetCollectionID(), } c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel) } @@ -228,12 +228,12 @@ func (c *ChannelStore) update(opSet ChannelOpSet) error { case Delete: filter := make(map[string]struct{}) for _, ch := range v.Channels { - filter[ch.name] = struct{}{} + filter[ch.Name] = struct{}{} } origin := c.channelsInfo[v.NodeID].Channels res := make([]*channel, 0, len(origin)) for _, ch := range origin { - if _, ok := filter[ch.name]; !ok { + if _, ok := filter[ch.Name]; !ok { res = append(res, ch) } } @@ -322,7 +322,7 @@ func (c *ChannelStore) txn(opSet ChannelOpSet) error { removals := make([]string, 0) for _, update := range opSet { for i, c := range update.Channels { - k := buildChannelKey(update.NodeID, c.name) + k := buildChannelKey(update.NodeID, c.Name) switch update.Type { case Add: val, err := proto.Marshal(update.ChannelWatchInfos[i]) @@ -366,7 +366,7 @@ func (cu *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error { cstr := "[" if len(cu.Channels) > 0 { for _, s := range cu.Channels { - cstr += s.name + cstr += s.Name cstr += ", " } cstr = cstr[:len(cstr)-2] diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 328530cfb4..e8fca2d68f 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -67,7 +67,7 @@ func (c *Cluster) UnRegister(node *NodeInfo) error { // Watch try to add a channel in datanode cluster func (c *Cluster) Watch(ch string, collectionID UniqueID) error { - return c.channelManager.Watch(&channel{name: ch, collectionID: collectionID}) + return c.channelManager.Watch(&channel{Name: ch, CollectionID: collectionID}) } // Flush sends flush requests to corresponding datanodes according to channels that segments belong to @@ -80,7 +80,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar // channel -> node for _, c := range channels { for _, ch := range c.Channels { - channelNodes[ch.name] = c.NodeID + channelNodes[ch.Name] = c.NodeID } } // find node on which segment exists diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index ef2dde31de..8ce71ede93 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -171,8 +171,8 @@ func TestRegister(t *testing.T) { channelManager, err := NewChannelManager(kv, dummyPosProvider{}) assert.Nil(t, err) err = channelManager.Watch(&channel{ - name: "ch1", - collectionID: 0, + Name: "ch1", + CollectionID: 0, }) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) @@ -191,7 +191,7 @@ func TestRegister(t *testing.T) { nodeChannels := channelManager.GetChannels() assert.EqualValues(t, 1, len(nodeChannels)) assert.EqualValues(t, 1, nodeChannels[0].NodeID) - assert.EqualValues(t, "ch1", nodeChannels[0].Channels[0].name) + assert.EqualValues(t, "ch1", nodeChannels[0].Channels[0].Name) }) t.Run("register and restart with no channel", func(t *testing.T) { @@ -271,7 +271,7 @@ func TestUnregister(t *testing.T) { assert.EqualValues(t, 1, len(channels)) assert.EqualValues(t, 2, channels[0].NodeID) assert.EqualValues(t, 1, len(channels[0].Channels)) - assert.EqualValues(t, "ch1", channels[0].Channels[0].name) + assert.EqualValues(t, "ch1", channels[0].Channels[0].Name) }) t.Run("remove all channels after unregsiter", func(t *testing.T) { @@ -300,7 +300,7 @@ func TestUnregister(t *testing.T) { channel := channelManager.GetBuffer() assert.NotNil(t, channel) assert.EqualValues(t, 1, len(channel.Channels)) - assert.EqualValues(t, "ch_1", channel.Channels[0].name) + assert.EqualValues(t, "ch_1", channel.Channels[0].Name) }) } @@ -328,7 +328,7 @@ func TestWatchIfNeeded(t *testing.T) { assert.Nil(t, err) channels := channelManager.GetChannels() assert.EqualValues(t, 1, len(channels)) - assert.EqualValues(t, "ch1", channels[0].Channels[0].name) + assert.EqualValues(t, "ch1", channels[0].Channels[0].Name) }) t.Run("watch channel to empty cluster", func(t *testing.T) { @@ -346,7 +346,7 @@ func TestWatchIfNeeded(t *testing.T) { assert.Empty(t, channels) channel := channelManager.GetBuffer() assert.NotNil(t, channel) - assert.EqualValues(t, "ch1", channel.Channels[0].name) + assert.EqualValues(t, "ch1", channel.Channels[0].Name) }) } diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index 139558e87c..c3046ab002 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -128,10 +128,10 @@ func ConsistentHashRegisterPolicy(hashring *consistent.Consistent) RegisterPolic channelsInfo := store.GetNodesChannels() for _, c := range channelsInfo { for _, ch := range c.Channels { - idstr, err := hashring.Get(ch.name) + idstr, err := hashring.Get(ch.Name) if err != nil { log.Warn("receive error when getting from hashring", - zap.String("channel", ch.name), zap.Error(err)) + zap.String("channel", ch.Name), zap.Error(err)) return nil } did, err := deformatNodeID(idstr) @@ -218,10 +218,10 @@ func ConsistentHashChannelAssignPolicy(hashring *consistent.Consistent) ChannelA adds := make(map[int64][]*channel) for _, c := range filteredChannels { - idstr, err := hashring.Get(c.name) + idstr, err := hashring.Get(c.Name) if err != nil { log.Warn("receive error when getting from hashring", - zap.String("channel", c.name), zap.Error(err)) + zap.String("channel", c.Name), zap.Error(err)) return nil } did, err := deformatNodeID(idstr) @@ -247,13 +247,13 @@ func ConsistentHashChannelAssignPolicy(hashring *consistent.Consistent) ChannelA func filterChannels(store ROChannelStore, channels []*channel) []*channel { channelsMap := make(map[string]*channel) for _, c := range channels { - channelsMap[c.name] = c + channelsMap[c.Name] = c } allChannelsInfo := store.GetChannels() for _, info := range allChannelsInfo { for _, c := range info.Channels { - delete(channelsMap, c.name) + delete(channelsMap, c.Name) } } @@ -343,9 +343,9 @@ func ConsistentHashDeregisterPolicy(hashring *consistent.Consistent) DeregisterP // reassign channels of deleted node updates := make(map[int64][]*channel) for _, c := range deletedInfo.Channels { - idstr, err := hashring.Get(c.name) + idstr, err := hashring.Get(c.Name) if err != nil { - log.Warn("failed to get channel in hash ring", zap.String("channel", c.name)) + log.Warn("failed to get channel in hash ring", zap.String("channel", c.Name)) return nil } @@ -437,7 +437,7 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker { Channels: make([]*channel, 0), } for _, c := range ch.Channels { - k := buildChannelKey(ch.NodeID, c.name) + k := buildChannelKey(ch.NodeID, c.Name) v, err := kv.Load(k) if err != nil { return nil, err