diff --git a/internal/querynode/index_info_test.go b/internal/querynode/index_info_test.go new file mode 100644 index 0000000000..6a597135c8 --- /dev/null +++ b/internal/querynode/index_info_test.go @@ -0,0 +1,50 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexInfo(t *testing.T) { + indexInfo := newIndexInfo() + + buildID := UniqueID(0) + indexID := UniqueID(0) + indexPaths := []string{"test-index-paths"} + indexName := "test-index-name" + indexParams := make(map[string]string) + + indexInfo.setBuildID(buildID) + indexInfo.setIndexID(indexID) + indexInfo.setReadyLoad(true) + indexInfo.setIndexName(indexName) + indexInfo.setIndexPaths(indexPaths) + indexInfo.setIndexParams(indexParams) + + resBuildID := indexInfo.getBuildID() + assert.Equal(t, buildID, resBuildID) + resIndexID := indexInfo.getIndexID() + assert.Equal(t, indexID, resIndexID) + resLoad := indexInfo.getReadyLoad() + assert.True(t, resLoad) + resName := indexInfo.getIndexName() + assert.Equal(t, indexName, resName) + resPaths := indexInfo.getIndexPaths() + assert.Equal(t, len(indexPaths), len(resPaths)) + assert.Len(t, resPaths, 1) + assert.Equal(t, indexPaths[0], resPaths[0]) + resParams := indexInfo.getIndexParams() + assert.Equal(t, len(indexParams), len(resParams)) +} diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index f238340444..fd0876ab5a 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -16,9 +16,6 @@ import ( "errors" "fmt" "path" - "sort" - "strconv" - "strings" "time" "go.uber.org/zap" @@ -49,50 +46,6 @@ type indexLoader struct { kv kv.BaseKV // minio kv } -//func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) { -// collectionIDs, _, segmentIDs := loader.replica.getSegmentsBySegmentType(segmentTypeSealed) -// if len(collectionIDs) <= 0 { -// wg.Done() -// return -// } -// log.Debug("do load index for sealed segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs))) -// for i := range collectionIDs { -// // we don't need index id yet -// segment, err := loader.replica.getSegmentByID(segmentIDs[i]) -// if err != nil { -// log.Warn(err.Error()) -// continue -// } -// vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionIDs[i]) -// if err != nil { -// log.Warn(err.Error()) -// continue -// } -// for _, fieldID := range vecFieldIDs { -// err = loader.setIndexInfo(collectionIDs[i], segment, fieldID) -// if err != nil { -// log.Warn(err.Error()) -// continue -// } -// -// err = loader.loadIndex(segment, fieldID) -// if err != nil { -// log.Warn(err.Error()) -// continue -// } -// } -// } -// // sendQueryNodeStats -// err := loader.sendQueryNodeStats() -// if err != nil { -// log.Warn(err.Error()) -// wg.Done() -// return -// } -// -// wg.Done() -//} - func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error { // 1. use msg's index paths to get index bytes var err error @@ -137,11 +90,6 @@ func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error { if err != nil { return err } - // 4. update segment index stats - err = loader.updateSegmentIndexStats(segment) - if err != nil { - return err - } log.Debug("load index done") return nil } @@ -153,85 +101,6 @@ func (loader *indexLoader) printIndexParams(index []*commonpb.KeyValuePair) { } } -func (loader *indexLoader) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool { - if len(index1) != len(index2) { - return false - } - - for i := 0; i < len(index1); i++ { - kv1 := *index1[i] - kv2 := *index2[i] - if kv1.Key != kv2.Key || kv1.Value != kv2.Value { - return false - } - } - - return true -} - -func (loader *indexLoader) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string { - return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10) -} - -func (loader *indexLoader) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) { - ids := strings.Split(key, "/") - if len(ids) != 2 { - return 0, 0, errors.New("illegal fieldsStatsKey") - } - collectionID, err := strconv.ParseInt(ids[0], 10, 64) - if err != nil { - return 0, 0, err - } - fieldID, err := strconv.ParseInt(ids[1], 10, 64) - if err != nil { - return 0, 0, err - } - return collectionID, fieldID, nil -} - -func (loader *indexLoader) updateSegmentIndexStats(segment *Segment) error { - for fieldID := range segment.indexInfos { - fieldStatsKey := loader.fieldsStatsIDs2Key(segment.collectionID, fieldID) - _, ok := loader.fieldIndexes[fieldStatsKey] - newIndexParams := make([]*commonpb.KeyValuePair, 0) - indexParams := segment.getIndexParams(fieldID) - for k, v := range indexParams { - newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{ - Key: k, - Value: v, - }) - } - - // sort index params by key - sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key }) - if !ok { - loader.fieldIndexes[fieldStatsKey] = make([]*internalpb.IndexStats, 0) - loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey], - &internalpb.IndexStats{ - IndexParams: newIndexParams, - NumRelatedSegments: 1, - }) - } else { - isNewIndex := true - for _, index := range loader.fieldIndexes[fieldStatsKey] { - if loader.indexParamsEqual(newIndexParams, index.IndexParams) { - index.NumRelatedSegments++ - isNewIndex = false - } - } - if isNewIndex { - loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey], - &internalpb.IndexStats{ - IndexParams: newIndexParams, - NumRelatedSegments: 1, - }) - } - } - } - - return nil -} - func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexParam, string, error) { index := make([][]byte, 0) @@ -266,26 +135,6 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa return index, indexParams, indexName, nil } -func (loader *indexLoader) sendQueryNodeStats() error { - resultFieldsStats := make([]*internalpb.FieldStats, 0) - for fieldStatsKey, indexStats := range loader.fieldIndexes { - colID, fieldID, err := loader.fieldsStatsKey2IDs(fieldStatsKey) - if err != nil { - return err - } - fieldStats := internalpb.FieldStats{ - CollectionID: colID, - FieldID: fieldID, - IndexStats: indexStats, - } - resultFieldsStats = append(resultFieldsStats, &fieldStats) - } - - loader.fieldStatsChan <- resultFieldsStats - log.Debug("sent field stats") - return nil -} - func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment, fieldID UniqueID) error { if loader.indexCoord == nil || loader.rootCoord == nil { return errors.New("null index coordinator client or root coordinator client, collectionID = " + @@ -339,27 +188,6 @@ func (loader *indexLoader) setIndexInfo(collectionID UniqueID, segment *Segment, return nil } -//func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) { -// ctx := context.TODO() -// if loader.indexCoord == nil { -// return nil, errors.New("null index coordinator client") -// } -// -// indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{ -// IndexBuildIDs: []UniqueID{indexBuildID}, -// } -// pathResponse, err := loader.indexCoord.GetIndexFilePaths(ctx, indexFilePathRequest) -// if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success { -// return nil, err -// } -// -// if len(pathResponse.FilePaths) <= 0 { -// return nil, errors.New("illegal index file paths") -// } -// -// return pathResponse.FilePaths[0].IndexFilePaths, nil -//} - func newIndexLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord types.IndexCoord, replica ReplicaInterface) *indexLoader { option := &minioKV.Option{ Address: Params.MinioEndPoint, @@ -387,3 +215,168 @@ func newIndexLoader(ctx context.Context, rootCoord types.RootCoord, indexCoord t kv: client, } } + +//// deprecated +//func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) { +// collectionIDs, _, segmentIDs := loader.replica.getSegmentsBySegmentType(segmentTypeSealed) +// if len(collectionIDs) <= 0 { +// wg.Done() +// return +// } +// log.Debug("do load index for sealed segments:", zap.String("segmentIDs", fmt.Sprintln(segmentIDs))) +// for i := range collectionIDs { +// // we don't need index id yet +// segment, err := loader.replica.getSegmentByID(segmentIDs[i]) +// if err != nil { +// log.Warn(err.Error()) +// continue +// } +// vecFieldIDs, err := loader.replica.getVecFieldIDsByCollectionID(collectionIDs[i]) +// if err != nil { +// log.Warn(err.Error()) +// continue +// } +// for _, fieldID := range vecFieldIDs { +// err = loader.setIndexInfo(collectionIDs[i], segment, fieldID) +// if err != nil { +// log.Warn(err.Error()) +// continue +// } +// +// err = loader.loadIndex(segment, fieldID) +// if err != nil { +// log.Warn(err.Error()) +// continue +// } +// } +// } +// // sendQueryNodeStats +// err := loader.sendQueryNodeStats() +// if err != nil { +// log.Warn(err.Error()) +// wg.Done() +// return +// } +// +// wg.Done() +//} +// +//func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) { +// ctx := context.TODO() +// if loader.indexCoord == nil { +// return nil, errors.New("null index coordinator client") +// } +// +// indexFilePathRequest := &indexpb.GetIndexFilePathsRequest{ +// IndexBuildIDs: []UniqueID{indexBuildID}, +// } +// pathResponse, err := loader.indexCoord.GetIndexFilePaths(ctx, indexFilePathRequest) +// if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_Success { +// return nil, err +// } +// +// if len(pathResponse.FilePaths) <= 0 { +// return nil, errors.New("illegal index file paths") +// } +// +// return pathResponse.FilePaths[0].IndexFilePaths, nil +//} +// +//func (loader *indexLoader) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool { +// if len(index1) != len(index2) { +// return false +// } +// +// for i := 0; i < len(index1); i++ { +// kv1 := *index1[i] +// kv2 := *index2[i] +// if kv1.Key != kv2.Key || kv1.Value != kv2.Value { +// return false +// } +// } +// +// return true +//} +// +//func (loader *indexLoader) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string { +// return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10) +//} +// +//func (loader *indexLoader) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) { +// ids := strings.Split(key, "/") +// if len(ids) != 2 { +// return 0, 0, errors.New("illegal fieldsStatsKey") +// } +// collectionID, err := strconv.ParseInt(ids[0], 10, 64) +// if err != nil { +// return 0, 0, err +// } +// fieldID, err := strconv.ParseInt(ids[1], 10, 64) +// if err != nil { +// return 0, 0, err +// } +// return collectionID, fieldID, nil +//} +// +//func (loader *indexLoader) updateSegmentIndexStats(segment *Segment) error { +// for fieldID := range segment.indexInfos { +// fieldStatsKey := loader.fieldsStatsIDs2Key(segment.collectionID, fieldID) +// _, ok := loader.fieldIndexes[fieldStatsKey] +// newIndexParams := make([]*commonpb.KeyValuePair, 0) +// indexParams := segment.getIndexParams(fieldID) +// for k, v := range indexParams { +// newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{ +// Key: k, +// Value: v, +// }) +// } +// +// // sort index params by key +// sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key }) +// if !ok { +// loader.fieldIndexes[fieldStatsKey] = make([]*internalpb.IndexStats, 0) +// loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey], +// &internalpb.IndexStats{ +// IndexParams: newIndexParams, +// NumRelatedSegments: 1, +// }) +// } else { +// isNewIndex := true +// for _, index := range loader.fieldIndexes[fieldStatsKey] { +// if loader.indexParamsEqual(newIndexParams, index.IndexParams) { +// index.NumRelatedSegments++ +// isNewIndex = false +// } +// } +// if isNewIndex { +// loader.fieldIndexes[fieldStatsKey] = append(loader.fieldIndexes[fieldStatsKey], +// &internalpb.IndexStats{ +// IndexParams: newIndexParams, +// NumRelatedSegments: 1, +// }) +// } +// } +// } +// +// return nil +//} +// +//func (loader *indexLoader) sendQueryNodeStats() error { +// resultFieldsStats := make([]*internalpb.FieldStats, 0) +// for fieldStatsKey, indexStats := range loader.fieldIndexes { +// colID, fieldID, err := loader.fieldsStatsKey2IDs(fieldStatsKey) +// if err != nil { +// return err +// } +// fieldStats := internalpb.FieldStats{ +// CollectionID: colID, +// FieldID: fieldID, +// IndexStats: indexStats, +// } +// resultFieldsStats = append(resultFieldsStats, &fieldStats) +// } +// +// loader.fieldStatsChan <- resultFieldsStats +// log.Debug("sent field stats") +// return nil +//} diff --git a/internal/querynode/index_loader_test.go b/internal/querynode/index_loader_test.go new file mode 100644 index 0000000000..5968814171 --- /dev/null +++ b/internal/querynode/index_loader_test.go @@ -0,0 +1,148 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/commonpb" +) + +func TestIndexLoader_setIndexInfo(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("test setIndexInfo", func(t *testing.T) { + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + segment, err := genSimpleSealedSegment() + assert.NoError(t, err) + + historical.loader.indexLoader.rootCoord = newMockRootCoord() + historical.loader.indexLoader.indexCoord = newMockIndexCoord() + + err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID) + assert.NoError(t, err) + }) + + t.Run("test nil root and index", func(t *testing.T) { + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + segment, err := genSimpleSealedSegment() + assert.NoError(t, err) + + err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID) + assert.Error(t, err) + }) +} + +func TestIndexLoader_getIndexBinlog(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("test getIndexBinlog", func(t *testing.T) { + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + paths, err := generateIndex(defaultSegmentID) + assert.NoError(t, err) + + _, _, _, err = historical.loader.indexLoader.getIndexBinlog(paths) + assert.NoError(t, err) + }) + + t.Run("test invalid path", func(t *testing.T) { + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + _, _, _, err = historical.loader.indexLoader.getIndexBinlog([]string{""}) + assert.Error(t, err) + }) +} + +func TestIndexLoader_printIndexParams(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + indexKV := []*commonpb.KeyValuePair{ + { + Key: "test-key-0", + Value: "test-value-0", + }, + } + historical.loader.indexLoader.printIndexParams(indexKV) +} + +func TestIndexLoader_loadIndex(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("test loadIndex", func(t *testing.T) { + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + segment, err := genSimpleSealedSegment() + assert.NoError(t, err) + + historical.loader.indexLoader.rootCoord = newMockRootCoord() + historical.loader.indexLoader.indexCoord = newMockIndexCoord() + + err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID) + assert.NoError(t, err) + + err = historical.loader.indexLoader.loadIndex(segment, rowIDFieldID) + assert.Error(t, err) + }) + + if runTimeConsumingTest { + t.Run("test get index failed", func(t *testing.T) { + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + segment, err := genSimpleSealedSegment() + assert.NoError(t, err) + + historical.loader.indexLoader.rootCoord = newMockRootCoord() + historical.loader.indexLoader.indexCoord = newMockIndexCoord() + + err = historical.loader.indexLoader.loadIndex(segment, rowIDFieldID) + assert.Error(t, err) + }) + } + + t.Run("test checkIndexReady failed", func(t *testing.T) { + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + segment, err := genSimpleSealedSegment() + assert.NoError(t, err) + + historical.loader.indexLoader.rootCoord = newMockRootCoord() + historical.loader.indexLoader.indexCoord = newMockIndexCoord() + + err = historical.loader.indexLoader.setIndexInfo(defaultCollectionID, segment, rowIDFieldID) + assert.NoError(t, err) + + segment.indexInfos[rowIDFieldID].setReadyLoad(false) + + err = historical.loader.indexLoader.loadIndex(segment, rowIDFieldID) + assert.Error(t, err) + }) +} diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 806efbc6e8..3fd948b3ee 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -20,7 +20,6 @@ import ( "path" "strconv" - "github.com/milvus-io/milvus/internal/indexnode" minioKV "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -852,91 +851,6 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID return paths, fieldIDs, nil } -func generateIndex(segmentID UniqueID) ([]string, error) { - const ( - msgLength = 1000 - DIM = 16 - ) - - indexParams := make(map[string]string) - indexParams["index_type"] = "IVF_PQ" - indexParams["index_mode"] = "cpu" - indexParams["dim"] = "16" - indexParams["k"] = "10" - indexParams["nlist"] = "100" - indexParams["nprobe"] = "10" - indexParams["m"] = "4" - indexParams["nbits"] = "8" - indexParams["metric_type"] = "L2" - indexParams["SLICE_SIZE"] = "4" - - var indexParamsKV []*commonpb.KeyValuePair - for key, value := range indexParams { - indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ - Key: key, - Value: value, - }) - } - - typeParams := make(map[string]string) - typeParams["dim"] = strconv.Itoa(DIM) - var indexRowData []float32 - for n := 0; n < msgLength; n++ { - for i := 0; i < DIM; i++ { - indexRowData = append(indexRowData, float32(n*i)) - } - } - - index, err := indexnode.NewCIndex(typeParams, indexParams) - if err != nil { - return nil, err - } - - err = index.BuildFloatVecIndexWithoutIds(indexRowData) - if err != nil { - return nil, err - } - - option := &minioKV.Option{ - Address: Params.MinioEndPoint, - AccessKeyID: Params.MinioAccessKeyID, - SecretAccessKeyID: Params.MinioSecretAccessKey, - UseSSL: Params.MinioUseSSLStr, - BucketName: Params.MinioBucketName, - CreateBucket: true, - } - - kv, err := minioKV.NewMinIOKV(context.Background(), option) - if err != nil { - return nil, err - } - - // save index to minio - binarySet, err := index.Serialize() - if err != nil { - return nil, err - } - - // serialize index params - var indexCodec storage.IndexCodec - serializedIndexBlobs, err := indexCodec.Serialize(binarySet, indexParams, "index_test_name", 1234) - if err != nil { - return nil, err - } - - indexPaths := make([]string, 0) - for _, index := range serializedIndexBlobs { - p := strconv.Itoa(int(segmentID)) + "/" + index.Key - indexPaths = append(indexPaths, p) - err := kv.Save(p, string(index.Value)) - if err != nil { - return nil, err - } - } - - return indexPaths, nil -} - func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) error { const msgLength = 1000 const DIM = 16 diff --git a/internal/querynode/metrics_info_test.go b/internal/querynode/metrics_info_test.go index 9f0497b116..1bc24a0178 100644 --- a/internal/querynode/metrics_info_test.go +++ b/internal/querynode/metrics_info_test.go @@ -12,11 +12,29 @@ package querynode import ( + "context" "testing" - "github.com/milvus-io/milvus/internal/log" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/sessionutil" ) func TestGetSystemInfoMetrics(t *testing.T) { - log.Info("TestGetSystemInfoMetrics, todo") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + node, err := genSimpleQueryNode(ctx) + assert.NoError(t, err) + + node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints) + + req := &milvuspb.GetMetricsRequest{ + Base: genCommonMsgBase(commonpb.MsgType_WatchQueryChannels), + } + resp, err := getSystemInfoMetrics(ctx, req, node) + assert.NoError(t, err) + resp.Status.ErrorCode = commonpb.ErrorCode_Success } diff --git a/internal/querynode/mock_components_test.go b/internal/querynode/mock_components_test.go new file mode 100644 index 0000000000..e6c616002a --- /dev/null +++ b/internal/querynode/mock_components_test.go @@ -0,0 +1,213 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/proto/proxypb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/types" +) + +// TODO: move to mock_test +// TODO: getMockFrom common package +type mockRootCoord struct { + state internalpb.StateCode + returnError bool // TODO: add error tests +} + +func newMockRootCoord() *mockRootCoord { + return &mockRootCoord{ + state: internalpb.StateCode_Healthy, + } +} + +func (m *mockRootCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return nil, nil +} + +func (m *mockRootCoord) Init() error { + return nil +} + +func (m *mockRootCoord) Start() error { + return nil +} + +func (m *mockRootCoord) Stop() error { + m.state = internalpb.StateCode_Abnormal + return nil +} + +func (m *mockRootCoord) Register() error { + return nil +} + +func (m *mockRootCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { + return &milvuspb.DescribeSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + IndexID: int64(0), + BuildID: int64(0), + EnableIndex: true, + }, nil +} + +func (m *mockRootCoord) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} +func (m *mockRootCoord) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} +func (m *mockRootCoord) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockRootCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + panic("not implemented") // TODO: Implement +} + +//////////////////////////////////////////////////////////////////////////////////////////// +// TODO: move to mock_test +// TODO: getMockFrom common package +type mockIndexCoord struct { + types.Component + types.TimeTickProvider +} + +func newMockIndexCoord() *mockIndexCoord { + return &mockIndexCoord{} +} + +func (m *mockIndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockIndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockIndexCoord) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (m *mockIndexCoord) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) { + paths, err := generateIndex(defaultSegmentID) + if err != nil { + return &indexpb.GetIndexFilePathsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + }, nil + } + return &indexpb.GetIndexFilePathsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + FilePaths: []*indexpb.IndexFilePathInfo{ + { + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + IndexBuildID: int64(0), + IndexFilePaths: paths, + }, + }, + }, nil +} + +func (m *mockIndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + panic("not implemented") // TODO: Implement +} diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 624560b504..34249f84a7 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -42,6 +42,7 @@ import ( // common definitions const ctxTimeInMillisecond = 5000 const debug = false +const runTimeConsumingTest = true const ( dimKey = "dim" @@ -190,6 +191,76 @@ func genIndexBinarySet() ([][]byte, error) { return bytesSet, nil } +func generateIndex(segmentID UniqueID) ([]string, error) { + indexParams := genSimpleIndexParams() + + var indexParamsKV []*commonpb.KeyValuePair + for key, value := range indexParams { + indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ + Key: key, + Value: value, + }) + } + + typeParams := make(map[string]string) + typeParams["dim"] = strconv.Itoa(defaultDim) + var indexRowData []float32 + for n := 0; n < defaultMsgLength; n++ { + for i := 0; i < defaultDim; i++ { + indexRowData = append(indexRowData, float32(n*i)) + } + } + + index, err := indexnode.NewCIndex(typeParams, indexParams) + if err != nil { + return nil, err + } + + err = index.BuildFloatVecIndexWithoutIds(indexRowData) + if err != nil { + return nil, err + } + + option := &minioKV.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + BucketName: Params.MinioBucketName, + CreateBucket: true, + } + + kv, err := minioKV.NewMinIOKV(context.Background(), option) + if err != nil { + return nil, err + } + + // save index to minio + binarySet, err := index.Serialize() + if err != nil { + return nil, err + } + + // serialize index params + var indexCodec storage.IndexCodec + serializedIndexBlobs, err := indexCodec.Serialize(binarySet, indexParams, "index_test_name", 1234) + if err != nil { + return nil, err + } + + indexPaths := make([]string, 0) + for _, index := range serializedIndexBlobs { + p := strconv.Itoa(int(segmentID)) + "/" + index.Key + indexPaths = append(indexPaths, p) + err := kv.Save(p, string(index.Value)) + if err != nil { + return nil, err + } + } + + return indexPaths, nil +} + func genSimpleSchema() (*schemapb.CollectionSchema, *schemapb.CollectionSchema) { fieldUID := genConstantField(uidField) fieldTimestamp := genConstantField(timestampField) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index f4fd6e7128..9267a1ca2b 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -144,8 +144,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer } setSegments() - // sendQueryNodeStats - return loader.indexLoader.sendQueryNodeStats() + return nil } func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment *Segment, segmentLoadInfo *querypb.SegmentLoadInfo) error { diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index b8e7ae0747..67f8e83384 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -66,6 +66,48 @@ func TestSegmentLoader_loadSegment(t *testing.T) { assert.Error(t, err) } +func TestSegmentLoader_notOnService(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + historical, err := genSimpleHistorical(ctx) + assert.NoError(t, err) + + err = historical.replica.removeSegment(defaultSegmentID) + assert.NoError(t, err) + + kv, err := genEtcdKV() + assert.NoError(t, err) + + loader := newSegmentLoader(ctx, nil, nil, historical.replica, kv) + assert.NotNil(t, loader) + + schema, _ := genSimpleSchema() + + fieldBinlog, err := saveSimpleBinLog(ctx) + assert.NoError(t, err) + + req := &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_WatchQueryChannels, + MsgID: rand.Int63(), + }, + NodeID: 0, + Schema: schema, + LoadCondition: querypb.TriggerCondition_grpcRequest, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: defaultSegmentID, + PartitionID: defaultPartitionID, + CollectionID: defaultCollectionID, + BinlogPaths: fieldBinlog, + }, + }, + } + err = loader.loadSegment(req, false) + assert.NoError(t, err) +} + func TestSegmentLoader_CheckSegmentMemory(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index 6960943ef4..df9794ede8 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -99,30 +99,32 @@ func TestTask_watchDmChannelsTask(t *testing.T) { assert.NoError(t, err) }) - // TODO: time consuming, reduce seek error time - t.Run("test execute seek error", func(t *testing.T) { - node, err := genSimpleQueryNode(ctx) - assert.NoError(t, err) + if runTimeConsumingTest { + t.Run("test execute seek error", func(t *testing.T) { - task := watchDmChannelsTask{ - req: genWatchDMChannelsRequest(), - node: node, - } - task.req.Infos = []*datapb.VchannelInfo{ - { - CollectionID: defaultCollectionID, - ChannelName: defaultVChannel, - SeekPosition: &msgstream.MsgPosition{ - ChannelName: defaultVChannel, - MsgID: []byte{1, 2, 3}, - MsgGroup: defaultSubName, - Timestamp: 0, + node, err := genSimpleQueryNode(ctx) + assert.NoError(t, err) + + task := watchDmChannelsTask{ + req: genWatchDMChannelsRequest(), + node: node, + } + task.req.Infos = []*datapb.VchannelInfo{ + { + CollectionID: defaultCollectionID, + ChannelName: defaultVChannel, + SeekPosition: &msgstream.MsgPosition{ + ChannelName: defaultVChannel, + MsgID: []byte{1, 2, 3}, + MsgGroup: defaultSubName, + Timestamp: 0, + }, }, - }, - } - err = task.Execute(ctx) - assert.Error(t, err) - }) + } + err = task.Execute(ctx) + assert.Error(t, err) + }) + } } func TestTask_loadSegmentsTask(t *testing.T) {