From a2dd16472de4b59bbaca8bf0301d403d90d265ac Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 28 May 2021 16:48:31 +0800 Subject: [PATCH] Implement random reassign and add test case (#5474) Signed-off-by: Congqi Xia --- internal/dataservice/policy.go | 44 ++++++++++-------- internal/dataservice/policy_test.go | 70 +++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 18 deletions(-) diff --git a/internal/dataservice/policy.go b/internal/dataservice/policy.go index 6bcb35ff09..ca17b5b5b9 100644 --- a/internal/dataservice/policy.go +++ b/internal/dataservice/policy.go @@ -84,35 +84,43 @@ type randomAssignUnregisterPolicy struct{} func (p *randomAssignUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo { if len(cluster) == 0 || // no available node + session == nil || len(session.Channels) == 0 { // lost node not watching any channels return []*datapb.DataNodeInfo{} } - mChan := make(map[string]struct{}, len(session.Channels)) + appliedNodes := make([]*datapb.DataNodeInfo, 0, len(session.Channels)) + raResult := make(map[int][]*datapb.ChannelStatus) for _, chanSt := range session.Channels { - mChan[chanSt.Name] = struct{}{} + bIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(cluster)))) + if err != nil { + log.Error("error generated rand idx", zap.Error(err)) + return []*datapb.DataNodeInfo{} + } + idx := bIdx.Int64() + if int(idx) >= len(cluster) { + continue + } + cs, ok := raResult[int(idx)] + if !ok { + cs = make([]*datapb.ChannelStatus, 0, 10) + } + chanSt.State = datapb.ChannelWatchState_Uncomplete + cs = append(cs, chanSt) + raResult[int(idx)] = cs } - bIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(cluster)))) - if err != nil { - log.Error("error generated rand idx", zap.Error(err)) - return []*datapb.DataNodeInfo{} - } - idx := bIdx.Int64() - if int(idx) >= len(cluster) { - return []*datapb.DataNodeInfo{} - } i := 0 for _, node := range cluster { - if i == int(idx) { - //TODO add channel to node - return []*datapb.DataNodeInfo{ - node, - } - } + cs, ok := raResult[i] i++ + if ok { + node.Channels = append(node.Channels, cs...) + appliedNodes = append(appliedNodes, node) + } } - return []*datapb.DataNodeInfo{} + + return appliedNodes } type channelAssignPolicy interface { diff --git a/internal/dataservice/policy_test.go b/internal/dataservice/policy_test.go index fa161c4cec..61dfbc586d 100644 --- a/internal/dataservice/policy_test.go +++ b/internal/dataservice/policy_test.go @@ -1,3 +1,14 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + package dataservice import ( @@ -44,3 +55,62 @@ func TestWatchRestartsPolicy(t *testing.T) { assert.EqualValues(t, 1, len(nodes)) assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nodes[0].Channels[0].State) } + +func TestRandomReassign(t *testing.T) { + p := randomAssignUnregisterPolicy{} + + clusters := make(map[string]*datapb.DataNodeInfo) + clusters["addr1"] = &datapb.DataNodeInfo{ + Address: "addr1", + Channels: make([]*datapb.ChannelStatus, 0, 10), + } + clusters["addr2"] = &datapb.DataNodeInfo{ + Address: "addr2", + Channels: make([]*datapb.ChannelStatus, 0, 10), + } + clusters["addr3"] = &datapb.DataNodeInfo{ + Address: "addr3", + Channels: make([]*datapb.ChannelStatus, 0, 10), + } + + cases := []*datapb.DataNodeInfo{ + { + Channels: []*datapb.ChannelStatus{}, + }, + { + Channels: []*datapb.ChannelStatus{ + {Name: "VChan1", CollectionID: 1}, + {Name: "VChan2", CollectionID: 2}, + }, + }, + { + Channels: []*datapb.ChannelStatus{ + {Name: "VChan3", CollectionID: 1}, + {Name: "VChan4", CollectionID: 2}, + }, + }, + nil, + } + + for _, ca := range cases { + nodes := p.apply(clusters, ca) + if ca == nil || len(ca.Channels) == 0 { + assert.Equal(t, 0, len(nodes)) + } else { + for _, ch := range ca.Channels { + found := false + loop: + for _, node := range nodes { + for _, nch := range node.Channels { + if nch.Name == ch.Name { + found = true + assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nch.State) + break loop + } + } + } + assert.Equal(t, true, found) + } + } + } +}