From 130a923decc5d376320a25eaf782c39b90efdfe3 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 9 Oct 2024 17:35:19 +0800 Subject: [PATCH] enhance: the estimate method when loading the collection (#36307) - issue: #36530 --------- Signed-off-by: SimFG Signed-off-by: xianliang.li Co-authored-by: xianliang.li --- internal/core/src/common/Types.h | 2 +- internal/core/src/common/resource_c.h | 33 +++ internal/core/src/index/IndexFactory.cpp | 204 +++++++++++++++++ internal/core/src/index/IndexFactory.h | 22 ++ internal/core/src/index/Meta.h | 1 + internal/core/src/index/StringIndexMarisa.cpp | 2 +- .../core/src/segcore/SegmentSealedImpl.cpp | 1 + internal/core/src/segcore/Types.h | 1 + internal/core/src/segcore/load_index_c.cpp | 33 +++ internal/core/src/segcore/load_index_c.h | 4 + internal/core/unittest/CMakeLists.txt | 1 + internal/core/unittest/test_loading.cpp | 208 ++++++++++++++++++ .../compaction_task_clustering_test.go | 3 +- internal/datacoord/meta.go | 1 - internal/proto/cgo_msg.proto | 1 + .../querynodev2/segments/load_index_info.go | 9 +- internal/querynodev2/segments/segment.go | 159 +++++++------ .../querynodev2/segments/segment_loader.go | 89 +++++--- internal/querynodev2/segments/utils.go | 5 - pkg/util/constant.go | 1 + 20 files changed, 677 insertions(+), 103 deletions(-) create mode 100644 internal/core/src/common/resource_c.h create mode 100644 internal/core/unittest/test_loading.cpp diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 1ced64846c..477927567c 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -681,4 +681,4 @@ struct fmt::formatter : formatter { } return formatter::format(name, ctx); } -}; +}; \ No newline at end of file diff --git a/internal/core/src/common/resource_c.h b/internal/core/src/common/resource_c.h new file mode 100644 index 0000000000..233e13f14f --- /dev/null +++ b/internal/core/src/common/resource_c.h @@ -0,0 +1,33 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct LoadResourceRequest { + float max_memory_cost; //memory cost (GB) during loading + float max_disk_cost; // disk cost (GB) during loading + float final_memory_cost; // final memory (GB) cost after loading + float final_disk_cost; // final disk cost (GB) after loading + bool has_raw_data; // the filed contains raw data or not +} LoadResourceRequest; + +#ifdef __cplusplus +} +#endif diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index a80a643ca8..b7754c4503 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -28,6 +28,7 @@ #include "index/BoolIndex.h" #include "index/InvertedIndexTantivy.h" #include "index/HybridScalarIndex.h" +#include "knowhere/comp/knowhere_check.h" namespace milvus::index { @@ -78,6 +79,209 @@ IndexFactory::CreatePrimitiveScalarIndex( #endif } +LoadResourceRequest +IndexFactory::IndexLoadResource( + DataType field_type, + IndexVersion index_version, + float index_size, + std::map& index_params, + bool mmap_enable) { + if (milvus::IsVectorDataType(field_type)) { + return VecIndexLoadResource( + field_type, index_version, index_size, index_params, mmap_enable); + } else { + return ScalarIndexLoadResource( + field_type, index_version, index_size, index_params, mmap_enable); + } +} + +LoadResourceRequest +IndexFactory::VecIndexLoadResource( + DataType field_type, + IndexVersion index_version, + float index_size, + std::map& index_params, + bool mmap_enable) { + auto config = milvus::index::ParseConfigFromIndexParams(index_params); + + AssertInfo(index_params.find("index_type") != index_params.end(), + "index type is empty"); + std::string index_type = index_params.at("index_type"); + + bool mmaped = false; + if (mmap_enable && + knowhere::KnowhereCheck::SupportMmapIndexTypeCheck(index_type)) { + config["enable_mmap"] = true; + mmaped = true; + } + + knowhere::expected resource; + float index_size_gb = index_size * 1.0 / 1024.0 / 1024.0 / 1024.0; + float download_buffer_size_gb = + DEFAULT_FIELD_MAX_MEMORY_LIMIT * 1.0 / 1024.0 / 1024.0 / 1024.0; + + bool has_raw_data = false; + switch (field_type) { + case milvus::DataType::VECTOR_BINARY: + resource = knowhere::IndexStaticFaced< + knowhere::bin1>::EstimateLoadResource(index_type, + index_version, + index_size_gb, + config); + has_raw_data = + knowhere::IndexStaticFaced::HasRawData( + index_type, index_version, config); + break; + case milvus::DataType::VECTOR_FLOAT: + resource = knowhere::IndexStaticFaced< + knowhere::fp32>::EstimateLoadResource(index_type, + index_version, + index_size_gb, + config); + has_raw_data = + knowhere::IndexStaticFaced::HasRawData( + index_type, index_version, config); + break; + case milvus::DataType::VECTOR_FLOAT16: + resource = knowhere::IndexStaticFaced< + knowhere::fp16>::EstimateLoadResource(index_type, + index_version, + index_size_gb, + config); + has_raw_data = + knowhere::IndexStaticFaced::HasRawData( + index_type, index_version, config); + break; + case milvus::DataType::VECTOR_BFLOAT16: + resource = knowhere::IndexStaticFaced< + knowhere::bf16>::EstimateLoadResource(index_type, + index_version, + index_size_gb, + config); + has_raw_data = + knowhere::IndexStaticFaced::HasRawData( + index_type, index_version, config); + break; + case milvus::DataType::VECTOR_SPARSE_FLOAT: + resource = knowhere::IndexStaticFaced< + knowhere::fp32>::EstimateLoadResource(index_type, + index_version, + index_size_gb, + config); + has_raw_data = + knowhere::IndexStaticFaced::HasRawData( + index_type, index_version, config); + break; + default: + PanicInfo( + milvus::DataTypeInvalid, + fmt::format( + "invalid data type to estimate index load resource: {}", + field_type)); + } + + LoadResourceRequest request{}; + + request.has_raw_data = has_raw_data; + request.final_disk_cost = resource.value().diskCost; + request.final_memory_cost = resource.value().memoryCost; + if (knowhere::UseDiskLoad(index_type, index_version) || mmaped) { + request.max_disk_cost = resource.value().diskCost; + request.max_memory_cost = + std::max(resource.value().memoryCost, download_buffer_size_gb); + } else { + request.max_disk_cost = 0; + request.max_memory_cost = 2 * resource.value().memoryCost; + } + return request; +} + +LoadResourceRequest +IndexFactory::ScalarIndexLoadResource( + DataType field_type, + IndexVersion index_version, + float index_size, + std::map& index_params, + bool mmap_enable) { + auto config = milvus::index::ParseConfigFromIndexParams(index_params); + + AssertInfo(index_params.find("index_type") != index_params.end(), + "index type is empty"); + std::string index_type = index_params.at("index_type"); + + knowhere::expected resource; + float index_size_gb = index_size * 1.0 / 1024.0 / 1024.0 / 1024.0; + + LoadResourceRequest request{}; + request.has_raw_data = false; + + if (index_type == milvus::index::ASCENDING_SORT) { + request.final_memory_cost = index_size_gb; + request.final_disk_cost = 0; + request.max_memory_cost = 2 * index_size_gb; + request.max_disk_cost = 0; + request.has_raw_data = true; + } else if (index_type == milvus::index::MARISA_TRIE || + index_type == milvus::index::MARISA_TRIE_UPPER) { + if (mmap_enable) { + request.final_memory_cost = 0; + request.final_disk_cost = index_size_gb; + request.max_memory_cost = index_size_gb; + request.max_disk_cost = index_size_gb; + } else { + request.final_memory_cost = index_size_gb; + request.final_disk_cost = 0; + request.max_memory_cost = 2 * index_size_gb; + request.max_disk_cost = 0; + } + request.has_raw_data = true; + } else if (index_type == milvus::index::INVERTED_INDEX_TYPE) { + if (mmap_enable) { + request.final_memory_cost = 0; + request.final_disk_cost = index_size_gb; + request.max_memory_cost = index_size_gb; + request.max_disk_cost = index_size_gb; + } else { + request.final_memory_cost = index_size_gb; + request.final_disk_cost = 0; + request.max_memory_cost = 2 * index_size_gb; + request.max_disk_cost = 0; + } + + request.has_raw_data = false; + } else if (index_type == milvus::index::BITMAP_INDEX_TYPE) { + if (mmap_enable) { + request.final_memory_cost = 0; + request.final_disk_cost = index_size_gb; + request.max_memory_cost = index_size_gb; + request.max_disk_cost = index_size_gb; + } else { + request.final_memory_cost = index_size_gb; + request.final_disk_cost = 0; + request.max_memory_cost = 2 * index_size_gb; + request.max_disk_cost = 0; + } + + if (field_type == milvus::DataType::ARRAY) { + request.has_raw_data = false; + } else { + request.has_raw_data = true; + } + } else if (index_type == milvus::index::HYBRID_INDEX_TYPE) { + request.final_memory_cost = index_size_gb; + request.final_disk_cost = index_size_gb; + request.max_memory_cost = 2 * index_size_gb; + request.max_disk_cost = index_size_gb; + request.has_raw_data = false; + } else { + PanicInfo(milvus::UnexpectedError, + fmt::format("invalid index type to estimate scalar index " + "load resource: {}", + index_type)); + } + return request; +} + IndexBasePtr IndexFactory::CreateIndex( const CreateIndexInfo& create_index_info, diff --git a/internal/core/src/index/IndexFactory.h b/internal/core/src/index/IndexFactory.h index db46330a17..b5a6d408ba 100644 --- a/internal/core/src/index/IndexFactory.h +++ b/internal/core/src/index/IndexFactory.h @@ -32,6 +32,7 @@ #include "index/ScalarIndexSort.h" #include "index/StringIndexMarisa.h" #include "index/BoolIndex.h" +#include "segcore/load_index_c.h" namespace milvus::index { @@ -51,6 +52,27 @@ class IndexFactory { return instance; } + LoadResourceRequest + IndexLoadResource(DataType field_type, + IndexVersion index_version, + float index_size, + std::map& index_params, + bool mmap_enable); + + LoadResourceRequest + VecIndexLoadResource(DataType field_type, + IndexVersion index_version, + float index_size, + std::map& index_params, + bool mmap_enable); + + LoadResourceRequest + ScalarIndexLoadResource(DataType field_type, + IndexVersion index_version, + float index_size, + std::map& index_params, + bool mmap_enable); + IndexBasePtr CreateIndex(const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context); diff --git a/internal/core/src/index/Meta.h b/internal/core/src/index/Meta.h index c0c9ea6cd8..fd37814dbe 100644 --- a/internal/core/src/index/Meta.h +++ b/internal/core/src/index/Meta.h @@ -42,6 +42,7 @@ constexpr const char* METRIC_TYPE = "metric_type"; // scalar index type constexpr const char* ASCENDING_SORT = "STL_SORT"; constexpr const char* MARISA_TRIE = "Trie"; +constexpr const char* MARISA_TRIE_UPPER = "TRIE"; constexpr const char* INVERTED_INDEX_TYPE = "INVERTED"; constexpr const char* BITMAP_INDEX_TYPE = "BITMAP"; constexpr const char* HYBRID_INDEX_TYPE = "HYBRID"; diff --git a/internal/core/src/index/StringIndexMarisa.cpp b/internal/core/src/index/StringIndexMarisa.cpp index 7207b75694..e3c8531935 100644 --- a/internal/core/src/index/StringIndexMarisa.cpp +++ b/internal/core/src/index/StringIndexMarisa.cpp @@ -203,7 +203,7 @@ StringIndexMarisa::LoadWithoutAssemble(const BinarySet& set, } file.Seek(0, SEEK_SET); - if (config.contains(ENABLE_MMAP)) { + if (config.contains(MMAP_FILE_PATH)) { trie_.mmap(file_name.c_str()); } else { trie_.read(file.Descriptor()); diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index bf58838a16..de7643751f 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -1927,6 +1927,7 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) { return false; } // check data type + // TODO: QianYa when add other data type, please check the SupportInterimIndexDataType method in the go code if (field_meta.get_data_type() != DataType::VECTOR_FLOAT && !is_sparse) { return false; diff --git a/internal/core/src/segcore/Types.h b/internal/core/src/segcore/Types.h index 106799ce26..4f006f4a0d 100644 --- a/internal/core/src/segcore/Types.h +++ b/internal/core/src/segcore/Types.h @@ -47,6 +47,7 @@ struct LoadIndexInfo { int64_t index_store_version; IndexVersion index_engine_version; proto::schema::FieldSchema schema; + int64_t index_size; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index a4be799165..b63771392f 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -26,6 +26,8 @@ #include "storage/RemoteChunkManagerSingleton.h" #include "storage/LocalChunkManagerSingleton.h" #include "pb/cgo_msg.pb.h" +#include "knowhere/index/index_static.h" +#include "knowhere/comp/knowhere_check.h" bool IsLoadWithDisk(const char* index_type, int index_engine_version) { @@ -204,6 +206,35 @@ appendScalarIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { } } +LoadResourceRequest +EstimateLoadIndexResource(CLoadIndexInfo c_load_index_info) { + try { + auto load_index_info = + (milvus::segcore::LoadIndexInfo*)c_load_index_info; + auto field_type = load_index_info->field_type; + auto& index_params = load_index_info->index_params; + bool find_index_type = + index_params.count("index_type") > 0 ? true : false; + AssertInfo(find_index_type == true, + "Can't find index type in index_params"); + + LoadResourceRequest request = + milvus::index::IndexFactory::GetInstance().IndexLoadResource( + field_type, + load_index_info->index_engine_version, + load_index_info->index_size, + index_params, + load_index_info->enable_mmap); + return request; + } catch (std::exception& e) { + PanicInfo(milvus::UnexpectedError, + fmt::format("failed to estimate index load resource, " + "encounter exception : {}", + e.what())); + return LoadResourceRequest{0, 0, 0, 0, false}; + } +} + CStatus AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; @@ -288,6 +319,7 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { std::to_string(load_index_info->segment_id) / std::to_string(load_index_info->field_id); + config[milvus::index::ENABLE_MMAP] = "true"; config[milvus::index::MMAP_FILE_PATH] = filepath.string(); } @@ -450,6 +482,7 @@ FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info, load_index_info->index_engine_version = info_proto->index_engine_version(); load_index_info->schema = info_proto->field(); + load_index_info->index_size = info_proto->index_file_size(); } auto status = CStatus(); status.error_code = milvus::Success; diff --git a/internal/core/src/segcore/load_index_c.h b/internal/core/src/segcore/load_index_c.h index db0108dcd7..1b92ea778e 100644 --- a/internal/core/src/segcore/load_index_c.h +++ b/internal/core/src/segcore/load_index_c.h @@ -17,6 +17,7 @@ extern "C" { #include #include +#include "common/resource_c.h" #include "common/binary_set_c.h" #include "common/type_c.h" #include "segcore/collection_c.h" @@ -47,6 +48,9 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info, bool enable_mmap, const char* mmap_dir_path); +LoadResourceRequest +EstimateLoadIndexResource(CLoadIndexInfo c_load_index_info); + CStatus AppendIndexInfo(CLoadIndexInfo c_load_index_info, int64_t index_id, diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 11799675fa..3fdd8cbf89 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -41,6 +41,7 @@ set(MILVUS_TEST_FILES test_concurrent_vector.cpp test_c_stream_reduce.cpp test_c_tokenizer.cpp + test_loading.cpp test_data_codec.cpp test_disk_file_manager_test.cpp test_exec.cpp diff --git a/internal/core/unittest/test_loading.cpp b/internal/core/unittest/test_loading.cpp new file mode 100644 index 0000000000..851663a3d5 --- /dev/null +++ b/internal/core/unittest/test_loading.cpp @@ -0,0 +1,208 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "segcore/Types.h" +#include "knowhere/version.h" +#include "knowhere/comp/index_param.h" +#include "segcore/load_index_c.h" + +using Param = + std::pair, LoadResourceRequest>; + +class IndexLoadTest : public ::testing::TestWithParam { + protected: + void + SetUp() override { + auto param = GetParam(); + index_params = param.first; + ASSERT_TRUE(index_params.find("index_type") != index_params.end()); + index_type = index_params["index_type"]; + enable_mmap = index_params.find("mmap") != index_params.end() && + index_params["mmap"] == "true"; + std::string field_type = index_params["field_type"]; + ASSERT_TRUE(field_type.size() > 0); + if (field_type == "vector_float") { + data_type = milvus::DataType::VECTOR_FLOAT; + } else if (field_type == "vector_bf16") { + data_type = milvus::DataType::VECTOR_BFLOAT16; + } else if (field_type == "vector_fp16") { + data_type = milvus::DataType::VECTOR_FLOAT16; + } else if (field_type == "vector_binary") { + data_type = milvus::DataType::VECTOR_BINARY; + } else if (field_type == "vector_sparse_float") { + data_type = milvus::DataType::VECTOR_SPARSE_FLOAT; + } else if (field_type == "array") { + data_type = milvus::DataType::ARRAY; + } else { + data_type = milvus::DataType::STRING; + } + + expected = param.second; + } + + void + TearDown() override { + } + + protected: + std::string index_type; + std::map index_params; + bool enable_mmap; + milvus::DataType data_type; + LoadResourceRequest expected; +}; + +INSTANTIATE_TEST_SUITE_P( + IndexTypeLoadInfo, + IndexLoadTest, + ::testing::Values( + std::pair, LoadResourceRequest>( + {{"index_type", "HNSW"}, + {"metric_type", "L2"}, + {"efConstrcution", "300"}, + {"M", "30"}, + {"mmap", "false"}, + {"field_type", "vector_float"}}, + {2.0f, 0.0f, 1.0f, 0.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "HNSW"}, + {"metric_type", "L2"}, + {"efConstrcution", "300"}, + {"M", "30"}, + {"mmap", "true"}, + {"field_type", "vector_float"}}, + {0.125f, 1.0f, 0.0f, 1.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "HNSW"}, + {"metric_type", "L2"}, + {"efConstrcution", "300"}, + {"M", "30"}, + {"mmap", "false"}, + {"field_type", "vector_bf16"}}, + {2.0f, 0.0f, 1.0f, 0.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "HNSW"}, + {"metric_type", "L2"}, + {"efConstrcution", "300"}, + {"M", "30"}, + {"mmap", "true"}, + {"field_type", "vector_fp16"}}, + {0.125f, 1.0f, 0.0f, 1.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "IVFFLAT"}, + {"metric_type", "L2"}, + {"nlist", "1024"}, + {"mmap", "false"}, + {"field_type", "vector_float"}}, + {2.0f, 0.0f, 1.0f, 0.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "IVFSQ"}, + {"metric_type", "L2"}, + {"nlist", "1024"}, + {"mmap", "false"}, + {"field_type", "vector_float"}}, + {2.0f, 0.0f, 1.0f, 0.0f, false}), +#ifdef BUILD_DISK_ANN + std::pair, LoadResourceRequest>( + {{"index_type", "DISKANN"}, + {"metric_type", "L2"}, + {"nlist", "1024"}, + {"mmap", "false"}, + {"field_type", "vector_float"}}, + {0.25f, 1.0f, 0.25f, 1.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "DISKANN"}, + {"metric_type", "IP"}, + {"nlist", "1024"}, + {"mmap", "false"}, + {"field_type", "vector_float"}}, + {0.25f, 1.0f, 0.25f, 1.0f, false}), +#endif + std::pair, LoadResourceRequest>( + {{"index_type", "STL_SORT"}, + {"mmap", "false"}, + {"field_type", "string"}}, + {2.0f, 0.0f, 1.0f, 0.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "TRIE"}, + {"mmap", "false"}, + {"field_type", "string"}}, + {2.0f, 0.0f, 1.0f, 0.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "TRIE"}, + {"mmap", "true"}, + {"field_type", "string"}}, + {1.0f, 1.0f, 0.0f, 1.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "INVERTED"}, + {"mmap", "false"}, + {"field_type", "string"}}, + {2.0f, 0.0f, 1.0f, 0.0f, false}), + std::pair, LoadResourceRequest>( + {{"index_type", "INVERTED"}, + {"mmap", "true"}, + {"field_type", "string"}}, + {1.0f, 1.0f, 0.0f, 1.0f, false}), + std::pair, LoadResourceRequest>( + {{"index_type", "BITMAP"}, + {"mmap", "false"}, + {"field_type", "string"}}, + {2.0f, 0.0f, 1.0f, 0.0f, true}), + std::pair, LoadResourceRequest>( + {{"index_type", "BITMAP"}, + {"mmap", "true"}, + {"field_type", "array"}}, + {1.0f, 1.0f, 0.0f, 1.0f, false}), + std::pair, LoadResourceRequest>( + {{"index_type", "HYBRID"}, + {"mmap", "true"}, + {"field_type", "string"}}, + {2.0f, 1.0f, 1.0f, 1.0f, false}))); + +TEST_P(IndexLoadTest, ResourceEstimate) { + milvus::segcore::LoadIndexInfo loadIndexInfo; + + loadIndexInfo.collection_id = 1; + loadIndexInfo.partition_id = 2; + loadIndexInfo.segment_id = 3; + loadIndexInfo.field_id = 4; + loadIndexInfo.field_type = data_type; + loadIndexInfo.enable_mmap = enable_mmap; + loadIndexInfo.mmap_dir_path = "/tmp/mmap"; + loadIndexInfo.index_id = 5; + loadIndexInfo.index_build_id = 6; + loadIndexInfo.index_version = 1; + loadIndexInfo.index_params = index_params; + loadIndexInfo.index_files = {"/tmp/index/1"}; + loadIndexInfo.index = nullptr; + loadIndexInfo.uri = ""; + loadIndexInfo.index_store_version = 1; + loadIndexInfo.index_engine_version = + knowhere::Version::GetCurrentVersion().VersionNumber(); + loadIndexInfo.index_size = 1024 * 1024 * 1024; // 1G index size + + LoadResourceRequest request = EstimateLoadIndexResource(&loadIndexInfo); + ASSERT_EQ(request.has_raw_data, expected.has_raw_data); + ASSERT_EQ(request.final_memory_cost, expected.final_memory_cost); + ASSERT_EQ(request.final_disk_cost, expected.final_disk_cost); + ASSERT_EQ(request.max_memory_cost, expected.max_memory_cost); + ASSERT_EQ(request.max_disk_cost, expected.max_disk_cost); +} diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 6a11089741..f3cfdfaa57 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -24,8 +24,6 @@ import ( "testing" "time" - "github.com/milvus-io/milvus/pkg/util/metautil" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/atomic" @@ -41,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metautil" ) func TestClusteringCompactionTaskSuite(t *testing.T) { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 0459e12066..a78534fa5e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1807,7 +1807,6 @@ func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb. } if _, ok := s.stateChange[level.String()][state.String()]; !ok { s.stateChange[level.String()][state.String()] = make(map[string]int) - } s.stateChange[level.String()][state.String()][getSortStatus(isSorted)] += 1 diff --git a/internal/proto/cgo_msg.proto b/internal/proto/cgo_msg.proto index 6d851e95e0..6ccfca9e96 100644 --- a/internal/proto/cgo_msg.proto +++ b/internal/proto/cgo_msg.proto @@ -20,4 +20,5 @@ message LoadIndexInfo { string uri = 13; int64 index_store_version = 14; int32 index_engine_version = 15; + int64 index_file_size = 16; } diff --git a/internal/querynodev2/segments/load_index_info.go b/internal/querynodev2/segments/load_index_info.go index 554b3cc41d..ae05130211 100644 --- a/internal/querynodev2/segments/load_index_info.go +++ b/internal/querynodev2/segments/load_index_info.go @@ -173,7 +173,7 @@ func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngi return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed") } -func (li *LoadIndexInfo) finish(ctx context.Context, info *cgopb.LoadIndexInfo) error { +func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, info *cgopb.LoadIndexInfo) error { marshaled, err := proto.Marshal(info) if err != nil { return err @@ -185,10 +185,11 @@ func (li *LoadIndexInfo) finish(ctx context.Context, info *cgopb.LoadIndexInfo) return nil, nil }).Await() - if err := HandleCStatus(ctx, &status, "FinishLoadIndexInfo failed"); err != nil { - return err - } + return HandleCStatus(ctx, &status, "FinishLoadIndexInfo failed") +} +func (li *LoadIndexInfo) loadIndex(ctx context.Context) error { + var status C.CStatus _, _ = GetLoadPool().Submit(func() (any, error) { traceCtx := ParseCTraceContext(ctx) status = C.AppendIndexV2(traceCtx.ctx, li.cLoadIndexInfo) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index d5cedc7fa4..cce217f644 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -897,10 +897,10 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun log.Info("start loading field data for field") loadFieldDataInfo, err := newLoadFieldDataInfo(ctx) - defer deleteFieldDataInfo(loadFieldDataInfo) if err != nil { return err } + defer deleteFieldDataInfo(loadFieldDataInfo) err = loadFieldDataInfo.appendLoadFieldInfo(ctx, fieldID, rowCount) if err != nil { @@ -1074,26 +1074,12 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del return nil } -func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error { - log := log.Ctx(ctx).With( - zap.Int64("collectionID", s.Collection()), - zap.Int64("partitionID", s.Partition()), - zap.Int64("segmentID", s.ID()), - zap.Int64("fieldID", indexInfo.GetFieldID()), - zap.Int64("indexID", indexInfo.GetIndexID()), - ) - - old := s.GetIndex(indexInfo.GetFieldID()) - // the index loaded - if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() && old.IsLoaded { - log.Warn("index already loaded") - return nil - } - - ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.ID(), indexInfo.GetFieldID())) - defer sp.End() - - tr := timerecord.NewTimeRecorder("loadIndex") +func GetCLoadInfoWithFunc(ctx context.Context, + fieldSchema *schemapb.FieldSchema, + s *querypb.SegmentLoadInfo, + indexInfo *querypb.FieldIndexInfo, + f func(c *LoadIndexInfo) error, +) error { // 1. loadIndexInfo, err := newLoadIndexInfo(ctx) if err != nil { @@ -1101,15 +1087,6 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn } defer deleteLoadIndexInfo(loadIndexInfo) - schema, err := typeutil.CreateSchemaHelper(s.GetCollection().Schema()) - if err != nil { - return err - } - fieldSchema, err := schema.GetFieldFromID(indexInfo.GetFieldID()) - if err != nil { - return err - } - indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams) // as Knowhere reports error if encounter an unknown param, we need to delete it delete(indexParams, common.MmapEnabledKey) @@ -1133,9 +1110,9 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn enableMmap := isIndexMmapEnable(fieldSchema, indexInfo) indexInfoProto := &cgopb.LoadIndexInfo{ - CollectionID: s.Collection(), - PartitionID: s.Partition(), - SegmentID: s.ID(), + CollectionID: s.GetCollectionID(), + PartitionID: s.GetPartitionID(), + SegmentID: s.GetSegmentID(), Field: fieldSchema, EnableMmap: enableMmap, MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(), @@ -1146,46 +1123,100 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn IndexFiles: indexInfo.GetIndexFilePaths(), IndexEngineVersion: indexInfo.GetCurrentIndexVersion(), IndexStoreVersion: indexInfo.GetIndexStoreVersion(), + IndexFileSize: indexInfo.GetIndexSize(), } - newLoadIndexInfoSpan := tr.RecordSpan() - // 2. - if err := loadIndexInfo.finish(ctx, indexInfoProto); err != nil { - if loadIndexInfo.cleanLocalData(ctx) != nil { - log.Warn("failed to clean cached data on disk after append index failed", - zap.Int64("buildID", indexInfo.BuildID), - zap.Int64("index version", indexInfo.IndexVersion)) - } + if err := loadIndexInfo.appendLoadIndexInfo(ctx, indexInfoProto); err != nil { + log.Warn("fail to append load index info", zap.Error(err)) return err } - if s.Type() != SegmentTypeSealed { - errMsg := fmt.Sprintln("updateSegmentIndex failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID()) - return errors.New(errMsg) - } - appendLoadIndexInfoSpan := tr.RecordSpan() + return f(loadIndexInfo) +} - // 3. - err = s.UpdateIndexInfo(ctx, indexInfo, loadIndexInfo) - if err != nil { - return err - } - updateIndexInfoSpan := tr.RecordSpan() - if !typeutil.IsVectorType(fieldType) || s.HasRawData(indexInfo.GetFieldID()) { +func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", s.Collection()), + zap.Int64("partitionID", s.Partition()), + zap.Int64("segmentID", s.ID()), + zap.Int64("fieldID", indexInfo.GetFieldID()), + zap.Int64("indexID", indexInfo.GetIndexID()), + ) + + old := s.GetIndex(indexInfo.GetFieldID()) + // the index loaded + if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() && old.IsLoaded { + log.Warn("index already loaded") return nil } - // 4. - mmapChunkCache := paramtable.Get().QueryNodeCfg.MmapChunkCache.GetAsBool() - s.WarmupChunkCache(ctx, indexInfo.GetFieldID(), mmapChunkCache) - warmupChunkCacheSpan := tr.RecordSpan() - log.Info("Finish loading index", - zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan), - zap.Duration("appendLoadIndexInfoSpan", appendLoadIndexInfoSpan), - zap.Duration("updateIndexInfoSpan", updateIndexInfoSpan), - zap.Duration("warmupChunkCacheSpan", warmupChunkCacheSpan), - ) - return nil + ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.ID(), indexInfo.GetFieldID())) + defer sp.End() + + tr := timerecord.NewTimeRecorder("loadIndex") + + schemaHelper, err := typeutil.CreateSchemaHelper(s.GetCollection().Schema()) + if err != nil { + return err + } + fieldSchema, err := schemaHelper.GetFieldFromID(indexInfo.GetFieldID()) + if err != nil { + return err + } + + return s.innerLoadIndex(ctx, fieldSchema, indexInfo, tr, fieldType) +} + +func (s *LocalSegment) innerLoadIndex(ctx context.Context, + fieldSchema *schemapb.FieldSchema, + indexInfo *querypb.FieldIndexInfo, + tr *timerecord.TimeRecorder, + fieldType schemapb.DataType, +) error { + err := GetCLoadInfoWithFunc(ctx, fieldSchema, + s.LoadInfo(), indexInfo, func(loadIndexInfo *LoadIndexInfo) error { + newLoadIndexInfoSpan := tr.RecordSpan() + + if err := loadIndexInfo.loadIndex(ctx); err != nil { + if loadIndexInfo.cleanLocalData(ctx) != nil { + log.Warn("failed to clean cached data on disk after append index failed", + zap.Int64("buildID", indexInfo.BuildID), + zap.Int64("index version", indexInfo.IndexVersion)) + } + return err + } + if s.Type() != SegmentTypeSealed { + errMsg := fmt.Sprintln("updateSegmentIndex failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID()) + return errors.New(errMsg) + } + appendLoadIndexInfoSpan := tr.RecordSpan() + + // 3. + err := s.UpdateIndexInfo(ctx, indexInfo, loadIndexInfo) + if err != nil { + return err + } + updateIndexInfoSpan := tr.RecordSpan() + if !typeutil.IsVectorType(fieldType) || s.HasRawData(indexInfo.GetFieldID()) { + return nil + } + + // 4. + mmapChunkCache := paramtable.Get().QueryNodeCfg.MmapChunkCache.GetAsBool() + s.WarmupChunkCache(ctx, indexInfo.GetFieldID(), mmapChunkCache) + warmupChunkCacheSpan := tr.RecordSpan() + log.Info("Finish loading index", + zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan), + zap.Duration("appendLoadIndexInfoSpan", appendLoadIndexInfoSpan), + zap.Duration("updateIndexInfoSpan", updateIndexInfoSpan), + zap.Duration("warmupChunkCacheSpan", warmupChunkCacheSpan), + ) + return nil + }) + if err != nil { + log.Warn("load index failed", zap.Error(err)) + } + return err } func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextIndexStats, schemaHelper *typeutil.SchemaHelper) error { diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 4d8ce37453..16b7f7548a 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -49,6 +49,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/contextutil" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -88,6 +89,24 @@ type Loader interface { ) error } +type ResourceEstimate struct { + MaxMemoryCost uint64 + MaxDiskCost uint64 + FinalMemoryCost uint64 + FinalDiskCost uint64 + HasRawData bool +} + +func GetResourceEstimate(estimate *C.LoadResourceRequest) ResourceEstimate { + return ResourceEstimate{ + MaxMemoryCost: uint64(float64(estimate.max_memory_cost) * util.GB), + MaxDiskCost: uint64(float64(estimate.max_disk_cost) * util.GB), + FinalMemoryCost: uint64(float64(estimate.final_memory_cost) * util.GB), + FinalDiskCost: uint64(float64(estimate.final_disk_cost) * util.GB), + HasRawData: bool(estimate.has_raw_data), + } +} + type requestResourceResult struct { Resource LoadResource CommittedResource LoadResource @@ -1283,6 +1302,9 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn log.Warn("failed to create schema helper", zap.String("name", schema.GetName()), zap.Error(err)) return nil, err } + calculateDataSizeCount := 0 + ctx := context.Background() + for _, fieldBinlog := range loadInfo.BinlogPaths { fieldID := fieldBinlog.FieldID var mmapEnabled bool @@ -1293,43 +1315,48 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn return nil, err } binlogSize := uint64(getBinlogDataMemorySize(fieldBinlog)) + shouldCalculateDataSize := false if fieldIndexInfo, ok := fieldID2IndexInfo[fieldID]; ok { - mmapEnabled = isIndexMmapEnable(fieldSchema, fieldIndexInfo) - neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor, fieldBinlog) + var estimateResult ResourceEstimate + err := GetCLoadInfoWithFunc(ctx, fieldSchema, loadInfo, fieldIndexInfo, func(c *LoadIndexInfo) error { + loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo) + estimateResult = GetResourceEstimate(&loadResourceRequest) + return nil + }) if err != nil { - return nil, errors.Wrapf(err, "failed to get index size collection %d, segment %d, indexBuildID %d", + return nil, errors.Wrapf(err, "failed to estimate resource usage of index, collection %d, segment %d, indexBuildID %d", loadInfo.GetCollectionID(), loadInfo.GetSegmentID(), fieldIndexInfo.GetBuildID()) } - indexMemorySize += neededMemSize - if mmapEnabled { - segmentDiskSize += neededMemSize + neededDiskSize - } else { - segmentDiskSize += neededDiskSize - } - if !hasRawData(fieldIndexInfo) { - dataMmapEnable := isDataMmapEnable(fieldSchema) - segmentMemorySize += binlogSize - if dataMmapEnable { - segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog)) - } else { - segmentMemorySize += binlogSize - } + + indexMemorySize += estimateResult.MaxMemoryCost + segmentDiskSize += estimateResult.MaxDiskCost + if !estimateResult.HasRawData { + shouldCalculateDataSize = true } } else { + shouldCalculateDataSize = true + } + + if shouldCalculateDataSize { + calculateDataSizeCount += 1 mmapEnabled = isDataMmapEnable(fieldSchema) - segmentMemorySize += binlogSize - if mmapEnabled { - segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog)) - } else { - if multiplyFactor.enableTempSegmentIndex { + if !mmapEnabled || common.IsSystemField(fieldSchema.GetFieldID()) { + segmentMemorySize += binlogSize + if multiplyFactor.enableTempSegmentIndex && SupportInterimIndexDataType(fieldSchema.GetDataType()) { segmentMemorySize += uint64(float64(binlogSize) * multiplyFactor.tempSegmentIndexFactor) } + if DoubleMemorySystemField(fieldSchema.GetFieldID()) || DoubleMemoryDataType(fieldSchema.GetDataType()) { + segmentMemorySize += binlogSize + } + } else { + segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog)) } } + if mmapEnabled { mmapFieldCount++ } @@ -1340,9 +1367,6 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn segmentMemorySize += uint64(getBinlogDataMemorySize(fieldBinlog)) } - // binlog & statslog use general load factor - segmentMemorySize = uint64(float64(segmentMemorySize) * multiplyFactor.memoryUsageFactor) - // get size of delete data for _, fieldBinlog := range loadInfo.Deltalogs { // MemorySize of filedBinlog is the actual size in memory, so the expansionFactor @@ -1365,6 +1389,21 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn }, nil } +func DoubleMemoryDataType(dataType schemapb.DataType) bool { + return dataType == schemapb.DataType_String || + dataType == schemapb.DataType_VarChar || + dataType == schemapb.DataType_JSON +} + +func DoubleMemorySystemField(fieldID int64) bool { + return fieldID == common.TimeStampField +} + +func SupportInterimIndexDataType(dataType schemapb.DataType) bool { + return dataType == schemapb.DataType_FloatVector || + dataType == schemapb.DataType_SparseFloatVector +} + func (loader *segmentLoader) getFieldType(collectionID, fieldID int64) (schemapb.DataType, error) { collection := loader.manager.Collection.Get(collectionID) if collection == nil { diff --git a/internal/querynodev2/segments/utils.go b/internal/querynodev2/segments/utils.go index 14ddedd2d8..aa466578cb 100644 --- a/internal/querynodev2/segments/utils.go +++ b/internal/querynodev2/segments/utils.go @@ -284,8 +284,3 @@ func isDataMmapEnable(fieldSchema *schemapb.FieldSchema) bool { } return params.Params.QueryNodeCfg.MmapScalarField.GetAsBool() } - -func hasRawData(indexInfo *querypb.FieldIndexInfo) bool { - log.Warn("hasRawData is not implemented, please check it", zap.Int64("field_id", indexInfo.FieldID)) - return true -} diff --git a/pkg/util/constant.go b/pkg/util/constant.go index 4fae9f09a3..e51206875a 100644 --- a/pkg/util/constant.go +++ b/pkg/util/constant.go @@ -73,6 +73,7 @@ const ( RoleConfigPrivilege = "privilege" MaxEtcdTxnNum = 128 + GB = 1024 * 1024 * 1024 ) const (