#1705 Limit the insert data batch size (#1936)

* #1705 Limit the insert data batch size

Signed-off-by: groot <yihua.mo@zilliz.com>

* typo

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix unittest

Signed-off-by: groot <yihua.mo@zilliz.com>
This commit is contained in:
groot 2020-04-16 00:18:23 +08:00 committed by GitHub
parent 0323aa1aad
commit 93244a2a79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 116 additions and 86 deletions

View File

@ -5,6 +5,7 @@ Please mark all change in change log and use the issue from GitHub
# Milvus 0.9.0 (TBD)
## Bug
- \#1705 Limit the insert data batch size
## Feature

View File

@ -118,6 +118,7 @@ InsertRequest::OnExecute() {
if ((collection_schema.flag_ & engine::meta::FLAG_MASK_NO_USERID) != 0 && user_provide_ids) {
std::string msg =
"Entities IDs are auto-generated. All vectors of this collection must use auto-generated IDs.";
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str());
return Status(SERVER_ILLEGAL_VECTOR_ID, msg);
}
@ -128,48 +129,20 @@ InsertRequest::OnExecute() {
ProfilerStart(fname.c_str());
#endif
// step 4: some metric type doesn't support float vectors
if (!vectors_data_.float_data_.empty()) { // insert float vectors
if (engine::utils::IsBinaryMetricType(collection_schema.metric_type_)) {
std::string msg = "Collection metric type doesn't support float vectors.";
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str());
return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg);
status = ValidationUtil::ValidateVectorData(vectors_data_, collection_schema);
if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid vector data: %s", "insert", 0, status.message().c_str());
return status;
}
// check prepared float data
if (vectors_data_.float_data_.size() % vector_count != 0) {
std::string msg = "The vector dimension must be equal to the collection dimension.";
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str());
return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg);
// step 5: check insert data limitation
status = ValidationUtil::ValidateVectorDataSize(vectors_data_, collection_schema);
if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid vector data: %s", "insert", 0, status.message().c_str());
return status;
}
fiu_do_on("InsertRequest.OnExecute.invalid_dim", collection_schema.dimension_ = -1);
if (vectors_data_.float_data_.size() / vector_count != collection_schema.dimension_) {
std::string msg = "The vector dimension must be equal to the collection dimension.";
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str());
return Status(SERVER_INVALID_VECTOR_DIMENSION, msg);
}
} else if (!vectors_data_.binary_data_.empty()) { // insert binary vectors
if (!engine::utils::IsBinaryMetricType(collection_schema.metric_type_)) {
std::string msg = "Collection metric type doesn't support binary vectors.";
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str());
return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg);
}
// check prepared binary data
if (vectors_data_.binary_data_.size() % vector_count != 0) {
std::string msg = "The vector dimension must be equal to the collection dimension.";
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str());
return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg);
}
if (vectors_data_.binary_data_.size() * 8 / vector_count != collection_schema.dimension_) {
std::string msg = "The vector dimension must be equal to the collection dimension.";
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "insert", 0, msg.c_str());
return Status(SERVER_INVALID_VECTOR_DIMENSION, msg);
}
}
// step 5: insert vectors
// step 6: insert vectors
auto vec_count = static_cast<uint64_t>(vector_count);
rc.RecordSection("prepare vectors data");
@ -189,7 +162,7 @@ InsertRequest::OnExecute() {
return Status(SERVER_ILLEGAL_VECTOR_ID, msg);
}
// step 6: update collection flag
// step 7: update collection flag
user_provide_ids ? collection_schema.flag_ |= engine::meta::FLAG_MASK_HAS_USERID
: collection_schema.flag_ |= engine::meta::FLAG_MASK_NO_USERID;
status = DBWrapper::DB()->UpdateCollectionFlag(collection_name_, collection_schema.flag_);

View File

@ -21,7 +21,9 @@
#include <arpa/inet.h>
#ifdef MILVUS_GPU_VERSION
#include <cuda_runtime.h>
#endif
#include <fiu-local.h>
@ -38,6 +40,8 @@ namespace {
constexpr size_t COLLECTION_NAME_SIZE_LIMIT = 255;
constexpr int64_t COLLECTION_DIMENSION_LIMIT = 32768;
constexpr int32_t INDEX_FILE_SIZE_LIMIT = 4096; // index trigger size max = 4096 MB
constexpr int64_t M_BYTE = 1024 * 1024;
constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * M_BYTE;
Status
CheckParameterRange(const milvus::json& json_params, const std::string& param_name, int64_t min, int64_t max,
@ -358,6 +362,25 @@ ValidationUtil::ValidateVectorData(const engine::VectorsData& vectors,
return Status::OK();
}
Status
ValidationUtil::ValidateVectorDataSize(const engine::VectorsData& vectors,
const engine::meta::CollectionSchema& table_schema) {
std::string msg =
"The amount of data inserted each time cannot exceed " + std::to_string(MAX_INSERT_DATA_SIZE / M_BYTE) + " MB";
if (engine::utils::IsBinaryMetricType(table_schema.metric_type_)) {
if (vectors.binary_data_.size() > MAX_INSERT_DATA_SIZE) {
return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg);
}
} else {
if (vectors.float_data_.size() * sizeof(float) > MAX_INSERT_DATA_SIZE) {
return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg);
}
}
return Status::OK();
}
Status
ValidationUtil::ValidateCollectionIndexFileSize(int64_t index_file_size) {
if (index_file_size <= 0 || index_file_size > INDEX_FILE_SIZE_LIMIT) {

View File

@ -49,6 +49,9 @@ class ValidationUtil {
static Status
ValidateVectorData(const engine::VectorsData& vectors, const engine::meta::CollectionSchema& table_schema);
static Status
ValidateVectorDataSize(const engine::VectorsData& vectors, const engine::meta::CollectionSchema& table_schema);
static Status
ValidateCollectionIndexFileSize(int64_t index_file_size);

View File

@ -346,11 +346,6 @@ TEST_F(RpcHandlerTest, INSERT_TEST) {
ASSERT_NE(vector_ids.vector_id_array_size(), VECTOR_COUNT);
fiu_disable("InsertRequest.OnExecute.throw_std_exception");
fiu_enable("InsertRequest.OnExecute.invalid_dim", 1, NULL, 0);
handler->Insert(&context, &request, &vector_ids);
ASSERT_NE(vector_ids.vector_id_array_size(), VECTOR_COUNT);
fiu_disable("InsertRequest.OnExecute.invalid_dim");
fiu_enable("InsertRequest.OnExecute.insert_fail", 1, NULL, 0);
handler->Insert(&context, &request, &vector_ids);
fiu_disable("InsertRequest.OnExecute.insert_fail");

View File

@ -437,33 +437,33 @@ TEST(ValidationUtilTest, VALIDATE_INDEX_TEST) {
}
TEST(ValidationUtilTest, VALIDATE_INDEX_PARAMS_TEST) {
milvus::engine::meta::CollectionSchema table_schema;
table_schema.dimension_ = 64;
milvus::engine::meta::CollectionSchema collection_schema;
collection_schema.dimension_ = 64;
milvus::json json_params = {};
auto status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_IDMAP);
ASSERT_TRUE(status.ok());
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_IVFFLAT);
ASSERT_FALSE(status.ok());
json_params = {{"nlist", "\t"}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_IVFSQ8H);
ASSERT_FALSE(status.ok());
json_params = {{"nlist", -1}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_IVFSQ8);
ASSERT_FALSE(status.ok());
@ -471,84 +471,84 @@ TEST(ValidationUtilTest, VALIDATE_INDEX_PARAMS_TEST) {
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_IVFFLAT);
ASSERT_TRUE(status.ok());
json_params = {{"nlist", -1}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_PQ);
ASSERT_FALSE(status.ok());
json_params = {{"nlist", 32}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_PQ);
ASSERT_FALSE(status.ok());
json_params = {{"nlist", 32}, {"m", 4}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_PQ);
ASSERT_TRUE(status.ok());
json_params = {{"search_length", -1}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::NSG_MIX);
ASSERT_FALSE(status.ok());
json_params = {{"search_length", 50}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::NSG_MIX);
ASSERT_FALSE(status.ok());
json_params = {{"search_length", 50}, {"out_degree", -1}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::NSG_MIX);
ASSERT_FALSE(status.ok());
json_params = {{"search_length", 50}, {"out_degree", 50}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::NSG_MIX);
ASSERT_FALSE(status.ok());
json_params = {{"search_length", 50}, {"out_degree", 50}, {"candidate_pool_size", -1}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::NSG_MIX);
ASSERT_FALSE(status.ok());
json_params = {{"search_length", 50}, {"out_degree", 50}, {"candidate_pool_size", 100}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::NSG_MIX);
ASSERT_FALSE(status.ok());
json_params = {{"search_length", 50}, {"out_degree", 50}, {"candidate_pool_size", 100}, {"knng", -1}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::NSG_MIX);
ASSERT_FALSE(status.ok());
json_params = {{"search_length", 50}, {"out_degree", 50}, {"candidate_pool_size", 100}, {"knng", 100}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::NSG_MIX);
ASSERT_TRUE(status.ok());
@ -556,72 +556,107 @@ TEST(ValidationUtilTest, VALIDATE_INDEX_PARAMS_TEST) {
json_params = {{"nlist", 32}, {"m", 4}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_PQ);
ASSERT_TRUE(status.ok());
json_params = {{"nlist", 32}, {"m", 3}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_PQ);
ASSERT_FALSE(status.ok());
table_schema.dimension_ = 99;
collection_schema.dimension_ = 99;
json_params = {{"nlist", 32}, {"m", 4}};
status =
milvus::server::ValidationUtil::ValidateIndexParams(json_params,
table_schema,
collection_schema,
(int32_t)milvus::engine::EngineType::FAISS_PQ);
ASSERT_FALSE(status.ok());
}
TEST(ValidationUtilTest, VALIDATE_SEARCH_PARAMS_TEST) {
int64_t topk = 10;
milvus::engine::meta::CollectionSchema table_schema;
table_schema.dimension_ = 64;
milvus::engine::meta::CollectionSchema collection_schema;
collection_schema.dimension_ = 64;
milvus::json json_params = {};
table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IDMAP;
auto status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IDMAP;
auto status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_TRUE(status.ok());
table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IVFFLAT;
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_IVFFLAT;
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_FALSE(status.ok());
json_params = {{"nprobe", "\t"}};
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_FALSE(status.ok());
table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_BIN_IDMAP;
collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::FAISS_BIN_IDMAP;
json_params = {{"nprobe", 32}};
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_TRUE(status.ok());
table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::NSG_MIX;
collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::NSG_MIX;
json_params = {};
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_FALSE(status.ok());
json_params = {{"search_length", 100}};
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_TRUE(status.ok());
table_schema.engine_type_ = (int32_t)milvus::engine::EngineType::HNSW;
collection_schema.engine_type_ = (int32_t)milvus::engine::EngineType::HNSW;
json_params = {};
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_FALSE(status.ok());
json_params = {{"ef", 5}};
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_FALSE(status.ok());
json_params = {{"ef", 100}};
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, table_schema, topk);
status = milvus::server::ValidationUtil::ValidateSearchParams(json_params, collection_schema, topk);
ASSERT_TRUE(status.ok());
}
TEST(ValidationUtilTest, VALIDATE_VECTOR_DATA_TEST) {
milvus::engine::meta::CollectionSchema collection_schema;
collection_schema.dimension_ = 64;
collection_schema.metric_type_ = (int32_t)milvus::engine::MetricType::L2;
milvus::engine::VectorsData vectors;
vectors.vector_count_ = 10;
vectors.float_data_.resize(32);
auto status = milvus::server::ValidationUtil::ValidateVectorData(vectors, collection_schema);
ASSERT_FALSE(status.ok());
vectors.float_data_.resize(vectors.vector_count_ * collection_schema.dimension_);
status = milvus::server::ValidationUtil::ValidateVectorData(vectors, collection_schema);
ASSERT_TRUE(status.ok());
vectors.float_data_.resize(150 * 1024 * 1024); // 600MB
status = milvus::server::ValidationUtil::ValidateVectorDataSize(vectors, collection_schema);
ASSERT_FALSE(status.ok());
collection_schema.metric_type_ = (int32_t)milvus::engine::MetricType::HAMMING;
vectors.float_data_.clear();
vectors.binary_data_.resize(50);
status = milvus::server::ValidationUtil::ValidateVectorData(vectors, collection_schema);
ASSERT_FALSE(status.ok());
vectors.binary_data_.resize(vectors.vector_count_ * collection_schema.dimension_ / 8);
status = milvus::server::ValidationUtil::ValidateVectorData(vectors, collection_schema);
ASSERT_TRUE(status.ok());
vectors.binary_data_.resize(600 * 1024 * 1024); // 600MB
status = milvus::server::ValidationUtil::ValidateVectorDataSize(vectors, collection_schema);
ASSERT_FALSE(status.ok());
}
TEST(ValidationUtilTest, VALIDATE_TOPK_TEST) {
ASSERT_EQ(milvus::server::ValidationUtil::ValidateSearchTopk(10).code(), milvus::SERVER_SUCCESS);
ASSERT_NE(milvus::server::ValidationUtil::ValidateSearchTopk(65536).code(), milvus::SERVER_SUCCESS);