Fix reload delta channel error (#11783)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-11-15 15:17:09 +08:00 committed by GitHub
parent b3d85adb6e
commit 4d58ff2df7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 2 deletions

View File

@ -17,6 +17,7 @@ import (
"fmt"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/golang/protobuf/proto"
@ -202,8 +203,8 @@ func (m *MetaReplica) reloadFromKV() error {
return nil
}
for index, value := range deltaChannelValues {
collectionIDString, _ := filepath.Split(deltaChannelKeys[index])
collectionID, err := strconv.ParseInt(collectionIDString, 10, 64)
pathStrings := strings.Split(deltaChannelKeys[index], "/")
collectionID, err := strconv.ParseInt(pathStrings[len(pathStrings)-2], 10, 64)
if err != nil {
return err
}

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
@ -311,6 +312,7 @@ func TestReloadMetaFromKV(t *testing.T) {
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
segmentInfos: map[UniqueID]*querypb.SegmentInfo{},
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
deltaChannelInfos: map[UniqueID][]*datapb.VchannelInfo{},
}
kvs := make(map[string]string)
@ -338,6 +340,18 @@ func TestReloadMetaFromKV(t *testing.T) {
queryChannelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, defaultCollectionID)
kvs[queryChannelKey] = string(queryChannelBlobs)
deltaChannel1 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel1"}
deltaChannel2 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel2"}
infos := []*datapb.VchannelInfo{deltaChannel1, deltaChannel2}
for _, info := range infos {
infoBytes, err := proto.Marshal(info)
assert.Nil(t, err)
key := fmt.Sprintf("%s/%d/%s", deltaChannelMetaPrefix, defaultCollectionID, info.ChannelName)
kvs[key] = string(infoBytes)
}
err = kv.MultiSave(kvs)
assert.Nil(t, err)