Add index unittests for query node (#7674)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-09-10 17:11:09 +08:00 committed by GitHub
parent 020f109dd8
commit cb5cd1cf12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 734 additions and 284 deletions

View File

@ -0,0 +1,50 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestIndexInfo(t *testing.T) {
indexInfo := newIndexInfo()
buildID := UniqueID(0)
indexID := UniqueID(0)
indexPaths := []string{"test-index-paths"}
indexName := "test-index-name"
indexParams := make(map[string]string)
indexInfo.setBuildID(buildID)
indexInfo.setIndexID(indexID)
indexInfo.setReadyLoad(true)
indexInfo.setIndexName(indexName)
indexInfo.setIndexPaths(indexPaths)
indexInfo.setIndexParams(indexParams)
resBuildID := indexInfo.getBuildID()
assert.Equal(t, buildID, resBuildID)
resIndexID := indexInfo.getIndexID()
assert.Equal(t, indexID, resIndexID)
resLoad := indexInfo.getReadyLoad()
assert.True(t, resLoad)
resName := indexInfo.getIndexName()
assert.Equal(t, indexName, resName)
resPaths := indexInfo.getIndexPaths()
assert.Equal(t, len(indexPaths), len(resPaths))
assert.Len(t, resPaths, 1)
assert.Equal(t, indexPaths[0], resPaths[0])
resParams := indexInfo.getIndexParams()
assert.Equal(t, len(indexParams), len(resParams))
}

View File

@ -16,9 +16,6 @@ import (
"errors"
"fmt"
"path"
"sort"
"strconv"
"strings"
"time"
"go.uber.org/zap"
@ -49,50 +46,6 @@ type indexLoader struct {
kv kv.BaseKV // minio kv
}
//func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
// collectionIDs, _, segmentIDs := loader.replica.getSegmentsBySegmentType(segmentTypeSealed)
// if len(collectionIDs) <= 0 {
// wg.Done()
// return
// }
// log.Debug("do load index for sealed segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs)))
// for i := range collectionIDs {
// // we don't need index id yet
// segment, err := loader.replica.getSegmentByID(segmentIDs[i])
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionIDs[i])
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// for _, fieldID := range vecFieldIDs {
// err = loader.setIndexInfo(collectionIDs[i], segment, fieldID)
// if err != nil {
// log.Warn(err.Error())
// continue
// }
//
// err = loader.loadIndex(segment, fieldID)
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// }
// }
// // sendQueryNodeStats
// err := loader.sendQueryNodeStats()
// if err != nil {
// log.Warn(err.Error())
// wg.Done()
// return
// }
//
// wg.Done()
//}
func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error {
// 1. use msg's index paths to get index bytes
var err error
@ -137,11 +90,6 @@ func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error {
if err != nil {
return err
}
// 4. update segment index stats
err = loader.updateSegmentIndexStats(segment)
if err != nil {
return err
}
log.Debug("load index done")
return nil
}
@ -153,85 +101,6 @@ func (loader *indexLoader) printIndexParams(index []*commonpb.KeyValuePair) {
}
}
func (loader *indexLoader) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
if len(index1) != len(index2) {
return false
}
for i := 0; i < len(index1); i++ {
kv1 := *index1[i]
kv2 := *index2[i]
if kv1.Key != kv2.Key || kv1.Value != kv2.Value {
return false
}
}
return true
}
func (loader *indexLoader) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10)
}
func (loader *indexLoader) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
ids := strings.Split(key, "/")
if len(ids) != 2 {
return 0, 0, errors.New("illegal fieldsStatsKey")
}
collectionID, err := strconv.ParseInt(ids[0], 10, 64)
if err != nil {
return 0, 0, err
}
fieldID, err := strconv.ParseInt(ids[1], 10, 64)
if err != nil {
return 0, 0, err
}
return collectionID, fieldID, nil
}
func (loader *indexLoader) updateSegmentIndexStats(segment *Segment) error {
for fieldID := range segment.indexInfos {
fieldStatsKey := loader.fieldsStatsIDs2Key(segment.collectionID, fieldID)
_, ok := loader.fieldIndexes[fieldStatsKey]
newIndexParams := make([]*commonpb.KeyValuePair, 0)
indexParams := segment.getIndexParams(fieldID)
for k, v := range indexParams {
newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
}
// sort index params by key
sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
if !ok {
loader.fieldIndexes[fieldStatsKey] = make([]*internalpb.IndexStats, 0)
loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
&internalpb.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
})
} else {
isNewIndex := true
for _, index := range loader.fieldIndexes[fieldStatsKey] {
if loader.indexParamsEqual(newIndexParams, index.IndexParams) {
index.NumRelatedSegments++
isNewIndex = false
}
}
if isNewIndex {
loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
&internalpb.IndexStats{
IndexParams: newIndexParams,
NumRelatedSegments: 1,
})
}
}
}
return nil
}
func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexParam, string, error) {
index := make([][]byte, 0)
@ -266,26 +135,6 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa
return index, indexParams, indexName, nil
}
func (loader *indexLoader) sendQueryNodeStats() error {
resultFieldsStats := make([]*internalpb.FieldStats, 0)
for fieldStatsKey, indexStats := range loader.fieldIndexes {
colID, fieldID, err := loader.fieldsStatsKey2IDs(fieldStatsKey)
if err != nil {
return err
}
fieldStats := internalpb.FieldStats{
CollectionID: colID,
FieldID: fieldID,
IndexStats: indexStats,
}
resultFieldsStats = append(resultFieldsStats, &fieldStats)
}
loader.fieldStatsChan <- resultFieldsStats
log.Debug("sent field stats")
return nil
}
func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment, fieldID UniqueID) error {
if loader.indexCoord == nil || loader.rootCoord == nil {
return errors.New("null index coordinator client or root coordinator client, collectionID = " +
@ -339,27 +188,6 @@ func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment,
return nil
}
//func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
// ctx := context.TODO()
// if loader.indexCoord == nil {
// return nil, errors.New("null index coordinator client")
// }
//
// indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{
// IndexBuildIDs: []UniqueID{indexBuildID},
// }
// pathResponse, err := loader.indexCoord.GetIndexFilePaths(ctx, indexFilePathRequest)
// if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
// return nil, err
// }
//
// if len(pathResponse.FilePaths) <= 0 {
// return nil, errors.New("illegal index file paths")
// }
//
// return pathResponse.FilePaths[0].IndexFilePaths, nil
//}
func newIndexLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface) *indexLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
@ -387,3 +215,168 @@ func newIndexLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord t
kv: client,
}
}
//// deprecated
//func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
// collectionIDs, _, segmentIDs := loader.replica.getSegmentsBySegmentType(segmentTypeSealed)
// if len(collectionIDs) <= 0 {
// wg.Done()
// return
// }
// log.Debug("do load index for sealed segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs)))
// for i := range collectionIDs {
// // we don't need index id yet
// segment, err := loader.replica.getSegmentByID(segmentIDs[i])
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionIDs[i])
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// for _, fieldID := range vecFieldIDs {
// err = loader.setIndexInfo(collectionIDs[i], segment, fieldID)
// if err != nil {
// log.Warn(err.Error())
// continue
// }
//
// err = loader.loadIndex(segment, fieldID)
// if err != nil {
// log.Warn(err.Error())
// continue
// }
// }
// }
// // sendQueryNodeStats
// err := loader.sendQueryNodeStats()
// if err != nil {
// log.Warn(err.Error())
// wg.Done()
// return
// }
//
// wg.Done()
//}
//
//func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
// ctx := context.TODO()
// if loader.indexCoord == nil {
// return nil, errors.New("null index coordinator client")
// }
//
// indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{
// IndexBuildIDs: []UniqueID{indexBuildID},
// }
// pathResponse, err := loader.indexCoord.GetIndexFilePaths(ctx, indexFilePathRequest)
// if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
// return nil, err
// }
//
// if len(pathResponse.FilePaths) <= 0 {
// return nil, errors.New("illegal index file paths")
// }
//
// return pathResponse.FilePaths[0].IndexFilePaths, nil
//}
//
//func (loader *indexLoader) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
// if len(index1) != len(index2) {
// return false
// }
//
// for i := 0; i < len(index1); i++ {
// kv1 := *index1[i]
// kv2 := *index2[i]
// if kv1.Key != kv2.Key || kv1.Value != kv2.Value {
// return false
// }
// }
//
// return true
//}
//
//func (loader *indexLoader) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
// return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10)
//}
//
//func (loader *indexLoader) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
// ids := strings.Split(key, "/")
// if len(ids) != 2 {
// return 0, 0, errors.New("illegal fieldsStatsKey")
// }
// collectionID, err := strconv.ParseInt(ids[0], 10, 64)
// if err != nil {
// return 0, 0, err
// }
// fieldID, err := strconv.ParseInt(ids[1], 10, 64)
// if err != nil {
// return 0, 0, err
// }
// return collectionID, fieldID, nil
//}
//
//func (loader *indexLoader) updateSegmentIndexStats(segment *Segment) error {
// for fieldID := range segment.indexInfos {
// fieldStatsKey := loader.fieldsStatsIDs2Key(segment.collectionID, fieldID)
// _, ok := loader.fieldIndexes[fieldStatsKey]
// newIndexParams := make([]*commonpb.KeyValuePair, 0)
// indexParams := segment.getIndexParams(fieldID)
// for k, v := range indexParams {
// newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
// Key: k,
// Value: v,
// })
// }
//
// // sort index params by key
// sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
// if !ok {
// loader.fieldIndexes[fieldStatsKey] = make([]*internalpb.IndexStats, 0)
// loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
// &internalpb.IndexStats{
// IndexParams: newIndexParams,
// NumRelatedSegments: 1,
// })
// } else {
// isNewIndex := true
// for _, index := range loader.fieldIndexes[fieldStatsKey] {
// if loader.indexParamsEqual(newIndexParams, index.IndexParams) {
// index.NumRelatedSegments++
// isNewIndex = false
// }
// }
// if isNewIndex {
// loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey],
// &internalpb.IndexStats{
// IndexParams: newIndexParams,
// NumRelatedSegments: 1,
// })
// }
// }
// }
//
// return nil
//}
//
//func (loader *indexLoader) sendQueryNodeStats() error {
// resultFieldsStats := make([]*internalpb.FieldStats, 0)
// for fieldStatsKey, indexStats := range loader.fieldIndexes {
// colID, fieldID, err := loader.fieldsStatsKey2IDs(fieldStatsKey)
// if err != nil {
// return err
// }
// fieldStats := internalpb.FieldStats{
// CollectionID: colID,
// FieldID: fieldID,
// IndexStats: indexStats,
// }
// resultFieldsStats = append(resultFieldsStats, &fieldStats)
// }
//
// loader.fieldStatsChan <- resultFieldsStats
// log.Debug("sent field stats")
// return nil
//}

View File

@ -0,0 +1,148 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/commonpb"
)
func TestIndexLoader_setIndexInfo(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("test setIndexInfo", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
assert.NoError(t, err)
historical.loader.indexLoader.rootCoord = newMockRootCoord()
historical.loader.indexLoader.indexCoord = newMockIndexCoord()
err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID)
assert.NoError(t, err)
})
t.Run("test nil root and index", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
assert.NoError(t, err)
err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID)
assert.Error(t, err)
})
}
func TestIndexLoader_getIndexBinlog(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("test getIndexBinlog", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
paths, err := generateIndex(defaultSegmentID)
assert.NoError(t, err)
_, _, _, err = historical.loader.indexLoader.getIndexBinlog(paths)
assert.NoError(t, err)
})
t.Run("test invalid path", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
_, _, _, err = historical.loader.indexLoader.getIndexBinlog([]string{""})
assert.Error(t, err)
})
}
func TestIndexLoader_printIndexParams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
indexKV := []*commonpb.KeyValuePair{
{
Key: "test-key-0",
Value: "test-value-0",
},
}
historical.loader.indexLoader.printIndexParams(indexKV)
}
func TestIndexLoader_loadIndex(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("test loadIndex", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
assert.NoError(t, err)
historical.loader.indexLoader.rootCoord = newMockRootCoord()
historical.loader.indexLoader.indexCoord = newMockIndexCoord()
err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID)
assert.NoError(t, err)
err = historical.loader.indexLoader.loadIndex(segment, rowIDFieldID)
assert.Error(t, err)
})
if runTimeConsumingTest {
t.Run("test get index failed", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
assert.NoError(t, err)
historical.loader.indexLoader.rootCoord = newMockRootCoord()
historical.loader.indexLoader.indexCoord = newMockIndexCoord()
err = historical.loader.indexLoader.loadIndex(segment, rowIDFieldID)
assert.Error(t, err)
})
}
t.Run("test checkIndexReady failed", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
assert.NoError(t, err)
historical.loader.indexLoader.rootCoord = newMockRootCoord()
historical.loader.indexLoader.indexCoord = newMockIndexCoord()
err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID)
assert.NoError(t, err)
segment.indexInfos[rowIDFieldID].setReadyLoad(false)
err = historical.loader.indexLoader.loadIndex(segment, rowIDFieldID)
assert.Error(t, err)
})
}

View File

@ -20,7 +20,6 @@ import (
"path"
"strconv"
"github.com/milvus-io/milvus/internal/indexnode"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -852,91 +851,6 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID
return paths, fieldIDs, nil
}
func generateIndex(segmentID UniqueID) ([]string, error) {
const (
msgLength = 1000
DIM = 16
)
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_PQ"
indexParams["index_mode"] = "cpu"
indexParams["dim"] = "16"
indexParams["k"] = "10"
indexParams["nlist"] = "100"
indexParams["nprobe"] = "10"
indexParams["m"] = "4"
indexParams["nbits"] = "8"
indexParams["metric_type"] = "L2"
indexParams["SLICE_SIZE"] = "4"
var indexParamsKV []*commonpb.KeyValuePair
for key, value := range indexParams {
indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
typeParams := make(map[string]string)
typeParams["dim"] = strconv.Itoa(DIM)
var indexRowData []float32
for n := 0; n < msgLength; n++ {
for i := 0; i < DIM; i++ {
indexRowData = append(indexRowData, float32(n*i))
}
}
index, err := indexnode.NewCIndex(typeParams, indexParams)
if err != nil {
return nil, err
}
err = index.BuildFloatVecIndexWithoutIds(indexRowData)
if err != nil {
return nil, err
}
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(context.Background(), option)
if err != nil {
return nil, err
}
// save index to minio
binarySet, err := index.Serialize()
if err != nil {
return nil, err
}
// serialize index params
var indexCodec storage.IndexCodec
serializedIndexBlobs, err := indexCodec.Serialize(binarySet, indexParams, "index_test_name", 1234)
if err != nil {
return nil, err
}
indexPaths := make([]string, 0)
for _, index := range serializedIndexBlobs {
p := strconv.Itoa(int(segmentID)) + "/" + index.Key
indexPaths = append(indexPaths, p)
err := kv.Save(p, string(index.Value))
if err != nil {
return nil, err
}
}
return indexPaths, nil
}
func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) error {
const msgLength = 1000
const DIM = 16

View File

@ -12,11 +12,29 @@
package querynode
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
)
func TestGetSystemInfoMetrics(t *testing.T) {
log.Info("TestGetSystemInfoMetrics, todo")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
req := &milvuspb.GetMetricsRequest{
Base: genCommonMsgBase(commonpb.MsgType_WatchQueryChannels),
}
resp, err := getSystemInfoMetrics(ctx, req, node)
assert.NoError(t, err)
resp.Status.ErrorCode = commonpb.ErrorCode_Success
}

View File

@ -0,0 +1,213 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
)
// TODO: move to mock_test
// TODO: getMockFrom common package
type mockRootCoord struct {
state internalpb.StateCode
returnError bool // TODO: add error tests
}
func newMockRootCoord() *mockRootCoord {
return &mockRootCoord{
state: internalpb.StateCode_Healthy,
}
}
func (m *mockRootCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return nil, nil
}
func (m *mockRootCoord) Init() error {
return nil
}
func (m *mockRootCoord) Start() error {
return nil
}
func (m *mockRootCoord) Stop() error {
m.state = internalpb.StateCode_Abnormal
return nil
}
func (m *mockRootCoord) Register() error {
return nil
}
func (m *mockRootCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
return &milvuspb.DescribeSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexID: int64(0),
BuildID: int64(0),
EnableIndex: true,
}, nil
}
func (m *mockRootCoord) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
panic("not implemented") // TODO: Implement
}
////////////////////////////////////////////////////////////////////////////////////////////
// TODO: move to mock_test
// TODO: getMockFrom common package
type mockIndexCoord struct {
types.Component
types.TimeTickProvider
}
func newMockIndexCoord() *mockIndexCoord {
return &mockIndexCoord{}
}
func (m *mockIndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockIndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockIndexCoord) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockIndexCoord) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
paths, err := generateIndex(defaultSegmentID)
if err != nil {
return &indexpb.GetIndexFilePathsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, nil
}
return &indexpb.GetIndexFilePathsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
FilePaths: []*indexpb.IndexFilePathInfo{
{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexBuildID: int64(0),
IndexFilePaths: paths,
},
},
}, nil
}
func (m *mockIndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
panic("not implemented") // TODO: Implement
}

View File

@ -42,6 +42,7 @@ import (
// common definitions
const ctxTimeInMillisecond = 5000
const debug = false
const runTimeConsumingTest = true
const (
dimKey = "dim"
@ -190,6 +191,76 @@ func genIndexBinarySet() ([][]byte, error) {
return bytesSet, nil
}
func generateIndex(segmentID UniqueID) ([]string, error) {
indexParams := genSimpleIndexParams()
var indexParamsKV []*commonpb.KeyValuePair
for key, value := range indexParams {
indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
typeParams := make(map[string]string)
typeParams["dim"] = strconv.Itoa(defaultDim)
var indexRowData []float32
for n := 0; n < defaultMsgLength; n++ {
for i := 0; i < defaultDim; i++ {
indexRowData = append(indexRowData, float32(n*i))
}
}
index, err := indexnode.NewCIndex(typeParams, indexParams)
if err != nil {
return nil, err
}
err = index.BuildFloatVecIndexWithoutIds(indexRowData)
if err != nil {
return nil, err
}
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSLStr,
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(context.Background(), option)
if err != nil {
return nil, err
}
// save index to minio
binarySet, err := index.Serialize()
if err != nil {
return nil, err
}
// serialize index params
var indexCodec storage.IndexCodec
serializedIndexBlobs, err := indexCodec.Serialize(binarySet, indexParams, "index_test_name", 1234)
if err != nil {
return nil, err
}
indexPaths := make([]string, 0)
for _, index := range serializedIndexBlobs {
p := strconv.Itoa(int(segmentID)) + "/" + index.Key
indexPaths = append(indexPaths, p)
err := kv.Save(p, string(index.Value))
if err != nil {
return nil, err
}
}
return indexPaths, nil
}
func genSimpleSchema() (*schemapb.CollectionSchema, *schemapb.CollectionSchema) {
fieldUID := genConstantField(uidField)
fieldTimestamp := genConstantField(timestampField)

View File

@ -144,8 +144,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
}
setSegments()
// sendQueryNodeStats
return loader.indexLoader.sendQueryNodeStats()
return nil
}
func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment *Segment, segmentLoadInfo *querypb.SegmentLoadInfo) error {

View File

@ -66,6 +66,48 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
assert.Error(t, err)
}
func TestSegmentLoader_notOnService(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.replica.removeSegment(defaultSegmentID)
assert.NoError(t, err)
kv, err := genEtcdKV()
assert.NoError(t, err)
loader := newSegmentLoader(ctx, nil, nil, historical.replica, kv)
assert.NotNil(t, loader)
schema, _ := genSimpleSchema()
fieldBinlog, err := saveSimpleBinLog(ctx)
assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
NodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
},
},
}
err = loader.loadSegment(req, false)
assert.NoError(t, err)
}
func TestSegmentLoader_CheckSegmentMemory(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -99,30 +99,32 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
assert.NoError(t, err)
})
// TODO: time consuming, reduce seek error time
t.Run("test execute seek error", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
if runTimeConsumingTest {
t.Run("test execute seek error", func(t *testing.T) {
task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
node: node,
}
task.req.Infos = []*datapb.VchannelInfo{
{
CollectionID: defaultCollectionID,
ChannelName: defaultVChannel,
SeekPosition: &msgstream.MsgPosition{
ChannelName: defaultVChannel,
MsgID: []byte{1, 2, 3},
MsgGroup: defaultSubName,
Timestamp: 0,
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
node: node,
}
task.req.Infos = []*datapb.VchannelInfo{
{
CollectionID: defaultCollectionID,
ChannelName: defaultVChannel,
SeekPosition: &msgstream.MsgPosition{
ChannelName: defaultVChannel,
MsgID: []byte{1, 2, 3},
MsgGroup: defaultSubName,
Timestamp: 0,
},
},
},
}
err = task.Execute(ctx)
assert.Error(t, err)
})
}
err = task.Execute(ctx)
assert.Error(t, err)
})
}
}
func TestTask_loadSegmentsTask(t *testing.T) {