mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Change channel's field to upper case to log details (#10567)
Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
parent
94fd4ec59b
commit
7fbb469081
@ -47,8 +47,8 @@ type ChannelManager struct {
|
||||
}
|
||||
|
||||
type channel struct {
|
||||
name string
|
||||
collectionID UniqueID
|
||||
Name string
|
||||
CollectionID UniqueID
|
||||
}
|
||||
|
||||
// ChannelManagerOpt is to set optional parameters in channel manager
|
||||
@ -250,7 +250,7 @@ func (c *ChannelManager) Watch(ch *channel) error {
|
||||
|
||||
func (c *ChannelManager) fillChannelPosition(update *ChannelOp) {
|
||||
for _, ch := range update.Channels {
|
||||
vchan := c.posProvider.GetVChanPositions(ch.name, ch.collectionID, true)
|
||||
vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, true)
|
||||
info := &datapb.ChannelWatchInfo{
|
||||
Vchan: vchan,
|
||||
StartTs: time.Now().Unix(),
|
||||
@ -287,7 +287,7 @@ func (c *ChannelManager) Match(nodeID int64, channel string) bool {
|
||||
}
|
||||
|
||||
for _, ch := range info.Channels {
|
||||
if ch.name == channel {
|
||||
if ch.Name == channel {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -151,8 +151,8 @@ func (c *ChannelStore) Reload() error {
|
||||
|
||||
c.Add(nodeID)
|
||||
channel := &channel{
|
||||
name: temp.GetVchan().GetChannelName(),
|
||||
collectionID: temp.GetVchan().GetCollectionID(),
|
||||
Name: temp.GetVchan().GetChannelName(),
|
||||
CollectionID: temp.GetVchan().GetCollectionID(),
|
||||
}
|
||||
c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel)
|
||||
}
|
||||
@ -228,12 +228,12 @@ func (c *ChannelStore) update(opSet ChannelOpSet) error {
|
||||
case Delete:
|
||||
filter := make(map[string]struct{})
|
||||
for _, ch := range v.Channels {
|
||||
filter[ch.name] = struct{}{}
|
||||
filter[ch.Name] = struct{}{}
|
||||
}
|
||||
origin := c.channelsInfo[v.NodeID].Channels
|
||||
res := make([]*channel, 0, len(origin))
|
||||
for _, ch := range origin {
|
||||
if _, ok := filter[ch.name]; !ok {
|
||||
if _, ok := filter[ch.Name]; !ok {
|
||||
res = append(res, ch)
|
||||
}
|
||||
}
|
||||
@ -322,7 +322,7 @@ func (c *ChannelStore) txn(opSet ChannelOpSet) error {
|
||||
removals := make([]string, 0)
|
||||
for _, update := range opSet {
|
||||
for i, c := range update.Channels {
|
||||
k := buildChannelKey(update.NodeID, c.name)
|
||||
k := buildChannelKey(update.NodeID, c.Name)
|
||||
switch update.Type {
|
||||
case Add:
|
||||
val, err := proto.Marshal(update.ChannelWatchInfos[i])
|
||||
@ -366,7 +366,7 @@ func (cu *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error {
|
||||
cstr := "["
|
||||
if len(cu.Channels) > 0 {
|
||||
for _, s := range cu.Channels {
|
||||
cstr += s.name
|
||||
cstr += s.Name
|
||||
cstr += ", "
|
||||
}
|
||||
cstr = cstr[:len(cstr)-2]
|
||||
|
@ -67,7 +67,7 @@ func (c *Cluster) UnRegister(node *NodeInfo) error {
|
||||
|
||||
// Watch try to add a channel in datanode cluster
|
||||
func (c *Cluster) Watch(ch string, collectionID UniqueID) error {
|
||||
return c.channelManager.Watch(&channel{name: ch, collectionID: collectionID})
|
||||
return c.channelManager.Watch(&channel{Name: ch, CollectionID: collectionID})
|
||||
}
|
||||
|
||||
// Flush sends flush requests to corresponding datanodes according to channels that segments belong to
|
||||
@ -80,7 +80,7 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar
|
||||
// channel -> node
|
||||
for _, c := range channels {
|
||||
for _, ch := range c.Channels {
|
||||
channelNodes[ch.name] = c.NodeID
|
||||
channelNodes[ch.Name] = c.NodeID
|
||||
}
|
||||
}
|
||||
// find node on which segment exists
|
||||
|
@ -171,8 +171,8 @@ func TestRegister(t *testing.T) {
|
||||
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
|
||||
assert.Nil(t, err)
|
||||
err = channelManager.Watch(&channel{
|
||||
name: "ch1",
|
||||
collectionID: 0,
|
||||
Name: "ch1",
|
||||
CollectionID: 0,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
cluster := NewCluster(sessionManager, channelManager)
|
||||
@ -191,7 +191,7 @@ func TestRegister(t *testing.T) {
|
||||
nodeChannels := channelManager.GetChannels()
|
||||
assert.EqualValues(t, 1, len(nodeChannels))
|
||||
assert.EqualValues(t, 1, nodeChannels[0].NodeID)
|
||||
assert.EqualValues(t, "ch1", nodeChannels[0].Channels[0].name)
|
||||
assert.EqualValues(t, "ch1", nodeChannels[0].Channels[0].Name)
|
||||
})
|
||||
|
||||
t.Run("register and restart with no channel", func(t *testing.T) {
|
||||
@ -271,7 +271,7 @@ func TestUnregister(t *testing.T) {
|
||||
assert.EqualValues(t, 1, len(channels))
|
||||
assert.EqualValues(t, 2, channels[0].NodeID)
|
||||
assert.EqualValues(t, 1, len(channels[0].Channels))
|
||||
assert.EqualValues(t, "ch1", channels[0].Channels[0].name)
|
||||
assert.EqualValues(t, "ch1", channels[0].Channels[0].Name)
|
||||
})
|
||||
|
||||
t.Run("remove all channels after unregsiter", func(t *testing.T) {
|
||||
@ -300,7 +300,7 @@ func TestUnregister(t *testing.T) {
|
||||
channel := channelManager.GetBuffer()
|
||||
assert.NotNil(t, channel)
|
||||
assert.EqualValues(t, 1, len(channel.Channels))
|
||||
assert.EqualValues(t, "ch_1", channel.Channels[0].name)
|
||||
assert.EqualValues(t, "ch_1", channel.Channels[0].Name)
|
||||
})
|
||||
}
|
||||
|
||||
@ -328,7 +328,7 @@ func TestWatchIfNeeded(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
channels := channelManager.GetChannels()
|
||||
assert.EqualValues(t, 1, len(channels))
|
||||
assert.EqualValues(t, "ch1", channels[0].Channels[0].name)
|
||||
assert.EqualValues(t, "ch1", channels[0].Channels[0].Name)
|
||||
})
|
||||
|
||||
t.Run("watch channel to empty cluster", func(t *testing.T) {
|
||||
@ -346,7 +346,7 @@ func TestWatchIfNeeded(t *testing.T) {
|
||||
assert.Empty(t, channels)
|
||||
channel := channelManager.GetBuffer()
|
||||
assert.NotNil(t, channel)
|
||||
assert.EqualValues(t, "ch1", channel.Channels[0].name)
|
||||
assert.EqualValues(t, "ch1", channel.Channels[0].Name)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -128,10 +128,10 @@ func ConsistentHashRegisterPolicy(hashring *consistent.Consistent) RegisterPolic
|
||||
channelsInfo := store.GetNodesChannels()
|
||||
for _, c := range channelsInfo {
|
||||
for _, ch := range c.Channels {
|
||||
idstr, err := hashring.Get(ch.name)
|
||||
idstr, err := hashring.Get(ch.Name)
|
||||
if err != nil {
|
||||
log.Warn("receive error when getting from hashring",
|
||||
zap.String("channel", ch.name), zap.Error(err))
|
||||
zap.String("channel", ch.Name), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
did, err := deformatNodeID(idstr)
|
||||
@ -218,10 +218,10 @@ func ConsistentHashChannelAssignPolicy(hashring *consistent.Consistent) ChannelA
|
||||
|
||||
adds := make(map[int64][]*channel)
|
||||
for _, c := range filteredChannels {
|
||||
idstr, err := hashring.Get(c.name)
|
||||
idstr, err := hashring.Get(c.Name)
|
||||
if err != nil {
|
||||
log.Warn("receive error when getting from hashring",
|
||||
zap.String("channel", c.name), zap.Error(err))
|
||||
zap.String("channel", c.Name), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
did, err := deformatNodeID(idstr)
|
||||
@ -247,13 +247,13 @@ func ConsistentHashChannelAssignPolicy(hashring *consistent.Consistent) ChannelA
|
||||
func filterChannels(store ROChannelStore, channels []*channel) []*channel {
|
||||
channelsMap := make(map[string]*channel)
|
||||
for _, c := range channels {
|
||||
channelsMap[c.name] = c
|
||||
channelsMap[c.Name] = c
|
||||
}
|
||||
|
||||
allChannelsInfo := store.GetChannels()
|
||||
for _, info := range allChannelsInfo {
|
||||
for _, c := range info.Channels {
|
||||
delete(channelsMap, c.name)
|
||||
delete(channelsMap, c.Name)
|
||||
}
|
||||
}
|
||||
|
||||
@ -343,9 +343,9 @@ func ConsistentHashDeregisterPolicy(hashring *consistent.Consistent) DeregisterP
|
||||
// reassign channels of deleted node
|
||||
updates := make(map[int64][]*channel)
|
||||
for _, c := range deletedInfo.Channels {
|
||||
idstr, err := hashring.Get(c.name)
|
||||
idstr, err := hashring.Get(c.Name)
|
||||
if err != nil {
|
||||
log.Warn("failed to get channel in hash ring", zap.String("channel", c.name))
|
||||
log.Warn("failed to get channel in hash ring", zap.String("channel", c.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -437,7 +437,7 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker {
|
||||
Channels: make([]*channel, 0),
|
||||
}
|
||||
for _, c := range ch.Channels {
|
||||
k := buildChannelKey(ch.NodeID, c.name)
|
||||
k := buildChannelKey(ch.NodeID, c.Name)
|
||||
v, err := kv.Load(k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user