diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cb65f086d..5832eb1a37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Please mark all change in change log and use the issue from GitHub # Milvus 0.9.0 (TBD) ## Bug +- \#1705 Limit the insert data batch size ## Feature diff --git a/core/src/server/delivery/request/InsertRequest.cpp b/core/src/server/delivery/request/InsertRequest.cpp index cc19e1e69e..96c822c699 100644 --- a/core/src/server/delivery/request/InsertRequest.cpp +++ b/core/src/server/delivery/request/InsertRequest.cpp @@ -118,6 +118,7 @@ InsertRequest::OnExecute() { if ((collection_schema.flag_ & engine::meta::FLAG_MASK_NO_USERID) != 0 && user_provide_ids) { std::string msg = "Entities IDs are auto-generated. All vectors of this collection must use auto-generated IDs."; + LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str()); return Status(SERVER_ILLEGAL_VECTOR_ID, msg); } @@ -128,48 +129,20 @@ InsertRequest::OnExecute() { ProfilerStart(fname.c_str()); #endif // step 4: some metric type doesn't support float vectors - if (!vectors_data_.float_data_.empty()) { // insert float vectors - if (engine::utils::IsBinaryMetricType(collection_schema.metric_type_)) { - std::string msg = "Collection metric type doesn't support float vectors."; - LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str()); - return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg); - } - - // check prepared float data - if (vectors_data_.float_data_.size() % vector_count != 0) { - std::string msg = "The vector dimension must be equal to the collection dimension."; - LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str()); - return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg); - } - - fiu_do_on("InsertRequest.OnExecute.invalid_dim", collection_schema.dimension_ = -1); - if (vectors_data_.float_data_.size() / vector_count != collection_schema.dimension_) { - std::string msg = "The vector dimension must be equal to the collection dimension."; - LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str()); - return Status(SERVER_INVALID_VECTOR_DIMENSION, msg); - } - } else if (!vectors_data_.binary_data_.empty()) { // insert binary vectors - if (!engine::utils::IsBinaryMetricType(collection_schema.metric_type_)) { - std::string msg = "Collection metric type doesn't support binary vectors."; - LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str()); - return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg); - } - - // check prepared binary data - if (vectors_data_.binary_data_.size() % vector_count != 0) { - std::string msg = "The vector dimension must be equal to the collection dimension."; - LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str()); - return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg); - } - - if (vectors_data_.binary_data_.size() * 8 / vector_count != collection_schema.dimension_) { - std::string msg = "The vector dimension must be equal to the collection dimension."; - LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str()); - return Status(SERVER_INVALID_VECTOR_DIMENSION, msg); - } + status = ValidationUtil::ValidateVectorData(vectors_data_, collection_schema); + if (!status.ok()) { + LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid vector data: %s", "insert", 0, status.message().c_str()); + return status; } - // step 5: insert vectors + // step 5: check insert data limitation + status = ValidationUtil::ValidateVectorDataSize(vectors_data_, collection_schema); + if (!status.ok()) { + LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid vector data: %s", "insert", 0, status.message().c_str()); + return status; + } + + // step 6: insert vectors auto vec_count = static_cast(vector_count); rc.RecordSection("prepare vectors data"); @@ -189,7 +162,7 @@ InsertRequest::OnExecute() { return Status(SERVER_ILLEGAL_VECTOR_ID, msg); } - // step 6: update collection flag + // step 7: update collection flag user_provide_ids ? collection_schema.flag_ |= engine::meta::FLAG_MASK_HAS_USERID : collection_schema.flag_ |= engine::meta::FLAG_MASK_NO_USERID; status = DBWrapper::DB()->UpdateCollectionFlag(collection_name_, collection_schema.flag_); diff --git a/core/src/utils/ValidationUtil.cpp b/core/src/utils/ValidationUtil.cpp index 3fab7904f3..755f6f069d 100644 --- a/core/src/utils/ValidationUtil.cpp +++ b/core/src/utils/ValidationUtil.cpp @@ -21,7 +21,9 @@ #include #ifdef MILVUS_GPU_VERSION + #include + #endif #include @@ -38,6 +40,8 @@ namespace { constexpr size_t COLLECTION_NAME_SIZE_LIMIT = 255; constexpr int64_t COLLECTION_DIMENSION_LIMIT = 32768; constexpr int32_t INDEX_FILE_SIZE_LIMIT = 4096; // index trigger size max = 4096 MB +constexpr int64_t M_BYTE = 1024 * 1024; +constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * M_BYTE; Status CheckParameterRange(const milvus::json& json_params, const std::string& param_name, int64_t min, int64_t max, @@ -358,6 +362,25 @@ ValidationUtil::ValidateVectorData(const engine::VectorsData& vectors, return Status::OK(); } +Status +ValidationUtil::ValidateVectorDataSize(const engine::VectorsData& vectors, + const engine::meta::CollectionSchema& table_schema) { + std::string msg = + "The amount of data inserted each time cannot exceed " + std::to_string(MAX_INSERT_DATA_SIZE / M_BYTE) + " MB"; + if (engine::utils::IsBinaryMetricType(table_schema.metric_type_)) { + if (vectors.binary_data_.size() > MAX_INSERT_DATA_SIZE) { + return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg); + } + + } else { + if (vectors.float_data_.size() * sizeof(float) > MAX_INSERT_DATA_SIZE) { + return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg); + } + } + + return Status::OK(); +} + Status ValidationUtil::ValidateCollectionIndexFileSize(int64_t index_file_size) { if (index_file_size <= 0 || index_file_size > INDEX_FILE_SIZE_LIMIT) { diff --git a/core/src/utils/ValidationUtil.h b/core/src/utils/ValidationUtil.h index 4f682cb9fc..0c30dd1457 100644 --- a/core/src/utils/ValidationUtil.h +++ b/core/src/utils/ValidationUtil.h @@ -49,6 +49,9 @@ class ValidationUtil { static Status ValidateVectorData(const engine::VectorsData& vectors, const engine::meta::CollectionSchema& table_schema); + static Status + ValidateVectorDataSize(const engine::VectorsData& vectors, const engine::meta::CollectionSchema& table_schema); + static Status ValidateCollectionIndexFileSize(int64_t index_file_size); diff --git a/core/unittest/server/test_rpc.cpp b/core/unittest/server/test_rpc.cpp index b362b2eaab..726dc403ce 100644 --- a/core/unittest/server/test_rpc.cpp +++ b/core/unittest/server/test_rpc.cpp @@ -346,11 +346,6 @@ TEST_F(RpcHandlerTest, INSERT_TEST) { ASSERT_NE(vector_ids.vector_id_array_size(), VECTOR_COUNT); fiu_disable("InsertRequest.OnExecute.throw_std_exception"); - fiu_enable("InsertRequest.OnExecute.invalid_dim", 1, NULL, 0); - handler->Insert(&context, &request, &vector_ids); - ASSERT_NE(vector_ids.vector_id_array_size(), VECTOR_COUNT); - fiu_disable("InsertRequest.OnExecute.invalid_dim"); - fiu_enable("InsertRequest.OnExecute.insert_fail", 1, NULL, 0); handler->Insert(&context, &request, &vector_ids); fiu_disable("InsertRequest.OnExecute.insert_fail"); diff --git a/core/unittest/server/test_util.cpp b/core/unittest/server/test_util.cpp index 2d81c83105..2602056e74 100644 --- a/core/unittest/server/test_util.cpp +++ b/core/unittest/server/test_util.cpp @@ -411,7 +411,7 @@ TEST(ValidationUtilTest, VALIDATE_DIMENSION_TEST) { TEST(ValidationUtilTest, VALIDATE_INDEX_TEST) { ASSERT_EQ(milvus::server::ValidationUtil::ValidateCollectionIndexType( - (int)milvus::engine::EngineType::INVALID).code(), milvus::SERVER_INVALID_INDEX_TYPE); + (int)milvus::engine::EngineType::INVALID).code(), milvus::SERVER_INVALID_INDEX_TYPE); for (int i = 1; i <= (int)milvus::engine::EngineType::MAX_VALUE; i++) { #ifndef MILVUS_GPU_VERSION if (i == (int)milvus::engine::EngineType::FAISS_IVFSQ8H) { @@ -424,7 +424,7 @@ TEST(ValidationUtilTest, VALIDATE_INDEX_TEST) { ASSERT_EQ( milvus::server::ValidationUtil::ValidateCollectionIndexType( - (int)milvus::engine::EngineType::MAX_VALUE + 1).code(), milvus::SERVER_INVALID_INDEX_TYPE); + (int)milvus::engine::EngineType::MAX_VALUE + 1).code(), milvus::SERVER_INVALID_INDEX_TYPE); ASSERT_EQ(milvus::server::ValidationUtil::ValidateCollectionIndexFileSize(0).code(), milvus::SERVER_INVALID_INDEX_FILE_SIZE); @@ -437,33 +437,33 @@ TEST(ValidationUtilTest, VALIDATE_INDEX_TEST) { } TEST(ValidationUtilTest, VALIDATE_INDEX_PARAMS_TEST) { - milvus::engine::meta::CollectionSchema table_schema; - table_schema.dimension_ = 64; + milvus::engine::meta::CollectionSchema collection_schema; + collection_schema.dimension_ = 64; milvus::json json_params = {}; auto status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_IDMAP); ASSERT_TRUE(status.ok()); status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_IVFFLAT); ASSERT_FALSE(status.ok()); json_params = {{"nlist", "\t"}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_IVFSQ8H); ASSERT_FALSE(status.ok()); json_params = {{"nlist", -1}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_IVFSQ8); ASSERT_FALSE(status.ok()); @@ -471,84 +471,84 @@ TEST(ValidationUtilTest, VALIDATE_INDEX_PARAMS_TEST) { status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_IVFFLAT); ASSERT_TRUE(status.ok()); json_params = {{"nlist", -1}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_PQ); ASSERT_FALSE(status.ok()); json_params = {{"nlist", 32}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_PQ); ASSERT_FALSE(status.ok()); json_params = {{"nlist", 32}, {"m", 4}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_PQ); ASSERT_TRUE(status.ok()); json_params = {{"search_length", -1}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::NSG_MIX); ASSERT_FALSE(status.ok()); json_params = {{"search_length", 50}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::NSG_MIX); ASSERT_FALSE(status.ok()); json_params = {{"search_length", 50}, {"out_degree", -1}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::NSG_MIX); ASSERT_FALSE(status.ok()); json_params = {{"search_length", 50}, {"out_degree", 50}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::NSG_MIX); ASSERT_FALSE(status.ok()); json_params = {{"search_length", 50}, {"out_degree", 50}, {"candidate_pool_size", -1}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::NSG_MIX); ASSERT_FALSE(status.ok()); json_params = {{"search_length", 50}, {"out_degree", 50}, {"candidate_pool_size", 100}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::NSG_MIX); ASSERT_FALSE(status.ok()); json_params = {{"search_length", 50}, {"out_degree", 50}, {"candidate_pool_size", 100}, {"knng", -1}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::NSG_MIX); ASSERT_FALSE(status.ok()); json_params = {{"search_length", 50}, {"out_degree", 50}, {"candidate_pool_size", 100}, {"knng", 100}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::NSG_MIX); ASSERT_TRUE(status.ok()); @@ -556,72 +556,107 @@ TEST(ValidationUtilTest, VALIDATE_INDEX_PARAMS_TEST) { json_params = {{"nlist", 32}, {"m", 4}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_PQ); ASSERT_TRUE(status.ok()); json_params = {{"nlist", 32}, {"m", 3}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_PQ); ASSERT_FALSE(status.ok()); - table_schema.dimension_ = 99; + collection_schema.dimension_ = 99; json_params = {{"nlist", 32}, {"m", 4}}; status = milvus::server::ValidationUtil::ValidateIndexParams(json_params, - table_schema, + collection_schema, (int32_t)milvus::engine::EngineType::FAISS_PQ); ASSERT_FALSE(status.ok()); } TEST(ValidationUtilTest, VALIDATE_SEARCH_PARAMS_TEST) { int64_t topk = 10; - milvus::engine::meta::CollectionSchema table_schema; - table_schema.dimension_ = 64; + milvus::engine::meta::CollectionSchema collection_schema; + collection_schema.dimension_ = 64; milvus::json json_params = {}; - table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IDMAP; - auto status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IDMAP; + auto status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_TRUE(status.ok()); - table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IVFFLAT; - status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IVFFLAT; + status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_FALSE(status.ok()); json_params = {{"nprobe", "\t"}}; - status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_FALSE(status.ok()); - table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_BIN_IDMAP; + collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_BIN_IDMAP; json_params = {{"nprobe", 32}}; - status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_TRUE(status.ok()); - table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::NSG_MIX; + collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::NSG_MIX; json_params = {}; - status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_FALSE(status.ok()); json_params = {{"search_length", 100}}; - status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_TRUE(status.ok()); - table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::HNSW; + collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::HNSW; json_params = {}; - status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_FALSE(status.ok()); json_params = {{"ef", 5}}; - status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_FALSE(status.ok()); json_params = {{"ef", 100}}; - status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk); + status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk); ASSERT_TRUE(status.ok()); } +TEST(ValidationUtilTest, VALIDATE_VECTOR_DATA_TEST) { + milvus::engine::meta::CollectionSchema collection_schema; + collection_schema.dimension_ = 64; + collection_schema.metric_type_ = (int32_t)milvus::engine::MetricType::L2; + + milvus::engine::VectorsData vectors; + vectors.vector_count_ = 10; + vectors.float_data_.resize(32); + + auto status = milvus::server::ValidationUtil::ValidateVectorData(vectors, collection_schema); + ASSERT_FALSE(status.ok()); + + vectors.float_data_.resize(vectors.vector_count_ * collection_schema.dimension_); + status = milvus::server::ValidationUtil::ValidateVectorData(vectors, collection_schema); + ASSERT_TRUE(status.ok()); + + vectors.float_data_.resize(150 * 1024 * 1024); // 600MB + status = milvus::server::ValidationUtil::ValidateVectorDataSize(vectors, collection_schema); + ASSERT_FALSE(status.ok()); + + collection_schema.metric_type_ = (int32_t)milvus::engine::MetricType::HAMMING; + vectors.float_data_.clear(); + vectors.binary_data_.resize(50); + status = milvus::server::ValidationUtil::ValidateVectorData(vectors, collection_schema); + ASSERT_FALSE(status.ok()); + + vectors.binary_data_.resize(vectors.vector_count_ * collection_schema.dimension_ / 8); + status = milvus::server::ValidationUtil::ValidateVectorData(vectors, collection_schema); + ASSERT_TRUE(status.ok()); + + vectors.binary_data_.resize(600 * 1024 * 1024); // 600MB + status = milvus::server::ValidationUtil::ValidateVectorDataSize(vectors, collection_schema); + ASSERT_FALSE(status.ok()); +} + TEST(ValidationUtilTest, VALIDATE_TOPK_TEST) { ASSERT_EQ(milvus::server::ValidationUtil::ValidateSearchTopk(10).code(), milvus::SERVER_SUCCESS); ASSERT_NE(milvus::server::ValidationUtil::ValidateSearchTopk(65536).code(), milvus::SERVER_SUCCESS);