Former-commit-id: eafa332562298fe5666632c017c7584c45272848
This commit is contained in:
xiaojun.lin 2019-10-14 19:27:46 +08:00
commit 5f39d252d1
13 changed files with 183 additions and 101 deletions

View File

@ -1,7 +1,7 @@
container('milvus-build-env') {
timeout(time: 5, unit: 'MINUTES') {
dir ("milvus_engine") {
dir ("cpp") {
dir ("core") {
gitlabCommitStatus(name: 'Packaged Engine') {
if (fileExists('milvus')) {
try {

View File

@ -1,7 +1,7 @@
container('milvus-build-env') {
timeout(time: 5, unit: 'MINUTES') {
dir ("milvus_engine") {
dir ("cpp") {
dir ("core") {
gitlabCommitStatus(name: 'Packaged Engine') {
if (fileExists('milvus')) {
try {

View File

@ -9,14 +9,17 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-572 - Milvus crash when get SIGINT
- MS-577 - Unittest Query randomly hung
- MS-587 - Count get wrong result after adding vectors and index built immediately
- MS-599 - search wrong result when table created with metric_type: IP
- MS-599 - Search wrong result when table created with metric_type: IP
- MS-601 - Docker logs error caused by get CPUTemperature error
- MS-622 - Delete vectors should be failed if date range is invalid
- MS-620 - Get table row counts display wrong error code
- MS-637 - out of memory when load too many tasks
- MS-637 - Out of memory when load too many tasks
- MS-640 - Cache object size calculate incorrect
- MS-641 - Segment fault(signal 11) in PickToLoad
- MS-639 - SQ8H index created failed and server hang
- MS-647 - [monitor] grafana display average cpu-temp
- MS-644 - Search crashed with index-type: flat
- MS-624 - Search vectors failed if time ranges long enough
- MS-652 - IVFSQH quantization double free
## Improvement

View File

@ -370,9 +370,9 @@ if(DEFINED ENV{MILVUS_GRPC_URL})
set(GRPC_SOURCE_URL "$ENV{MILVUS_GRPC_URL}")
else()
set(GRPC_SOURCE_URL
"http://git.zilliz.tech/kun.yu/grpc/-/archive/master/grpc-master.tar.gz")
"https://github.com/youny626/grpc-milvus/archive/${GRPC_VERSION}.zip")
endif()
set(GRPC_MD5 "7ec59ad54c85a12dcbbfede09bf413a9")
set(GRPC_MD5 "fdd2656424c0e0e046b21354513fc70f")
# ----------------------------------------------------------------------

View File

@ -81,7 +81,7 @@ class Meta {
UpdateTableFiles(TableFilesSchema& files) = 0;
virtual Status
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& partition,
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& dates,
DatePartionedTableFilesSchema& files) = 0;
virtual Status

View File

@ -1088,7 +1088,7 @@ MySQLMetaImpl::FilesToIndex(TableFilesSchema& files) {
}
Status
MySQLMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& partition,
MySQLMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& dates,
DatePartionedTableFilesSchema& files) {
files.clear();
@ -1108,9 +1108,9 @@ MySQLMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size
<< META_TABLEFILES << " "
<< "WHERE table_id = " << mysqlpp::quote << table_id;
if (!partition.empty()) {
if (!dates.empty()) {
std::stringstream partitionListSS;
for (auto& date : partition) {
for (auto& date : dates) {
partitionListSS << std::to_string(date) << ", ";
}
std::string partitionListStr = partitionListSS.str();

View File

@ -89,7 +89,7 @@ class MySQLMetaImpl : public Meta {
UpdateTableFiles(TableFilesSchema& files) override;
Status
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& partition,
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& dates,
DatePartionedTableFilesSchema& files) override;
Status

View File

@ -159,7 +159,7 @@ SqliteMetaImpl::Initialize() {
Status
SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
const DatesT &dates) {
if (dates.size() == 0) {
if (dates.empty()) {
return Status::OK();
}
@ -171,16 +171,35 @@ SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
}
try {
//sqlite_orm has a bug, 'in' statement cannot handle too many elements
//so we split one query into multi-queries, this is a work-around!!
std::vector<DatesT> split_dates;
split_dates.push_back(DatesT());
const size_t batch_size = 30;
for(DateT date : dates) {
DatesT& last_batch = *split_dates.rbegin();
last_batch.push_back(date);
if(last_batch.size() > batch_size) {
split_dates.push_back(DatesT());
}
}
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(
c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::date_, dates)));
for(auto& batch_dates : split_dates) {
if(batch_dates.empty()) {
continue;
}
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
where(
c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::date_, batch_dates)));
}
ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;
} catch (std::exception &e) {
@ -673,7 +692,7 @@ SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
Status
SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids,
const DatesT &partition,
const DatesT &dates,
DatePartionedTableFilesSchema &files) {
files.clear();
server::MetricCollector metric;
@ -702,23 +721,54 @@ SqliteMetaImpl::FilesToSearch(const std::string &table_id,
auto status = DescribeTable(table_schema);
if (!status.ok()) { return status; }
//sqlite_orm has a bug, 'in' statement cannot handle too many elements
//so we split one query into multi-queries, this is a work-around!!
std::vector<DatesT> split_dates;
split_dates.push_back(DatesT());
const size_t batch_size = 30;
for(DateT date : dates) {
DatesT& last_batch = *split_dates.rbegin();
last_batch.push_back(date);
if(last_batch.size() > batch_size) {
split_dates.push_back(DatesT());
}
}
//perform query
decltype(ConnectorPtr->select(select_columns)) selected;
if (partition.empty() && ids.empty()) {
if (dates.empty() && ids.empty()) {
auto filter = where(match_tableid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
} else if (partition.empty() && !ids.empty()) {
} else if (dates.empty() && !ids.empty()) {
auto match_fileid = in(&TableFileSchema::id_, ids);
auto filter = where(match_tableid and match_fileid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
} else if (!partition.empty() && ids.empty()) {
auto match_date = in(&TableFileSchema::date_, partition);
auto filter = where(match_tableid and match_date and match_type);
selected = ConnectorPtr->select(select_columns, filter);
} else if (!partition.empty() && !ids.empty()) {
auto match_fileid = in(&TableFileSchema::id_, ids);
auto match_date = in(&TableFileSchema::date_, partition);
auto filter = where(match_tableid and match_fileid and match_date and match_type);
selected = ConnectorPtr->select(select_columns, filter);
} else if (!dates.empty() && ids.empty()) {
for(auto& batch_dates : split_dates) {
if(batch_dates.empty()) {
continue;
}
auto match_date = in(&TableFileSchema::date_, batch_dates);
auto filter = where(match_tableid and match_date and match_type);
auto batch_selected = ConnectorPtr->select(select_columns, filter);
for (auto &file : batch_selected) {
selected.push_back(file);
}
}
} else if (!dates.empty() && !ids.empty()) {
for(auto& batch_dates : split_dates) {
if(batch_dates.empty()) {
continue;
}
auto match_fileid = in(&TableFileSchema::id_, ids);
auto match_date = in(&TableFileSchema::date_, batch_dates);
auto filter = where(match_tableid and match_fileid and match_date and match_type);
auto batch_selected = ConnectorPtr->select(select_columns, filter);
for (auto &file : batch_selected) {
selected.push_back(file);
}
}
}
Status ret;

View File

@ -89,7 +89,7 @@ class SqliteMetaImpl : public Meta {
UpdateTableFiles(TableFilesSchema& files) override;
Status
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& partition,
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, const DatesT& dates,
DatePartionedTableFilesSchema& files) override;
Status

View File

@ -222,10 +222,19 @@ PrometheusMetrics::CPUTemperature() {
std::vector<float> CPU_temperatures = server::SystemInfo::GetInstance().CPUTemperature();
float avg_cpu_temp = 0;
for (int i = 0; i < CPU_temperatures.size(); ++i) {
prometheus::Gauge& cpu_temp = CPU_temperature_.Add({{"CPU", std::to_string(i)}});
cpu_temp.Set(CPU_temperatures[i]);
avg_cpu_temp += CPU_temperatures[i];
}
avg_cpu_temp /= CPU_temperatures.size();
prometheus::Gauge& cpu_temp = CPU_temperature_.Add({{"CPU", std::to_string(0)}});
cpu_temp.Set(avg_cpu_temp);
// for (int i = 0; i < CPU_temperatures.size(); ++i) {
// prometheus::Gauge& cpu_temp = CPU_temperature_.Add({{"CPU", std::to_string(i)}});
// cpu_temp.Set(CPU_temperatures[i]);
// }
}
void

View File

@ -110,16 +110,18 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
bool moved = false;
// to support test task, REFACTOR
if (auto index_engine = search_task->index_engine_) {
auto location = index_engine->GetLocation();
if (resource->type() == ResourceType::CPU) {
if (auto index_engine = search_task->index_engine_) {
auto location = index_engine->GetLocation();
for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) {
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i);
PushTaskToResource(event->task_table_item_->task, dest_resource);
break;
for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) {
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i);
PushTaskToResource(event->task_table_item_->task, dest_resource);
break;
}
}
}
}

View File

@ -315,10 +315,10 @@ XSearchTask::MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& t
return;
}
std::vector<int64_t> id_buf(nq * topk, -1);
std::vector<float> dist_buf(nq * topk, 0.0);
uint64_t output_k = std::min(topk, tar_input_k + src_input_k);
std::vector<int64_t> id_buf(nq * output_k, -1);
std::vector<float> dist_buf(nq * output_k, 0.0);
uint64_t buf_k, src_k, tar_k;
uint64_t src_idx, tar_idx, buf_idx;
uint64_t src_input_k_multi_i, tar_input_k_multi_i, buf_k_multi_i;
@ -349,6 +349,7 @@ XSearchTask::MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& t
if (src_k < src_input_k) {
while (buf_k < output_k && src_k < src_input_k) {
src_idx = src_input_k_multi_i + src_k;
buf_idx = buf_k_multi_i + buf_k;
id_buf[buf_idx] = src_ids[src_idx];
dist_buf[buf_idx] = src_distance[src_idx];
src_k++;
@ -356,6 +357,8 @@ XSearchTask::MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& t
}
} else {
while (buf_k < output_k && tar_k < tar_input_k) {
tar_idx = tar_input_k_multi_i + tar_k;
buf_idx = buf_k_multi_i + buf_k;
id_buf[buf_idx] = tar_ids[tar_idx];
dist_buf[buf_idx] = tar_distance[tar_idx];
tar_k++;

View File

@ -110,87 +110,102 @@ CheckTopkResult(const std::vector<int64_t>& input_ids_1,
} // namespace
TEST(DBSearchTest, TOPK_TEST) {
uint64_t NQ = 15;
uint64_t TOP_K = 64;
bool ascending;
void MergeTopkToResultSetTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uint64_t topk, bool ascending) {
std::vector<int64_t> ids1, ids2;
std::vector<float> dist1, dist2;
ms::ResultSet result;
BuildResult(ids1, dist1, topk_1, nq, ascending);
BuildResult(ids2, dist2, topk_2, nq, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, topk_1, nq, topk, ascending, result);
ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, topk_2, nq, topk, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, topk, nq, ascending, result);
}
TEST(DBSearchTest, MERGE_RESULT_SET_TEST) {
uint64_t NQ = 15;
uint64_t TOP_K = 64;
/* test1, id1/dist1 valid, id2/dist2 empty */
ascending = true;
BuildResult(ids1, dist1, TOP_K, NQ, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
MergeTopkToResultSetTest(TOP_K, 0, NQ, TOP_K, true);
MergeTopkToResultSetTest(TOP_K, 0, NQ, TOP_K, false);
/* test2, id1/dist1 valid, id2/dist2 valid */
BuildResult(ids2, dist2, TOP_K, NQ, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
MergeTopkToResultSetTest(TOP_K, TOP_K, NQ, TOP_K, true);
MergeTopkToResultSetTest(TOP_K, TOP_K, NQ, TOP_K, false);
/* test3, id1/dist1 small topk */
ids1.clear();
dist1.clear();
result.clear();
BuildResult(ids1, dist1, TOP_K/2, NQ, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result);
ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
MergeTopkToResultSetTest(TOP_K/2, TOP_K, NQ, TOP_K, true);
MergeTopkToResultSetTest(TOP_K/2, TOP_K, NQ, TOP_K, false);
/* test4, id1/dist1 small topk, id2/dist2 small topk */
ids2.clear();
dist2.clear();
result.clear();
BuildResult(ids2, dist2, TOP_K/3, NQ, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result);
ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
MergeTopkToResultSetTest(TOP_K/2, TOP_K/3, NQ, TOP_K, true);
MergeTopkToResultSetTest(TOP_K/2, TOP_K/3, NQ, TOP_K, false);
}
/////////////////////////////////////////////////////////////////////////////////////////
ascending = false;
ids1.clear();
dist1.clear();
ids2.clear();
dist2.clear();
result.clear();
void MergeTopkArrayTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uint64_t topk, bool ascending) {
std::vector<int64_t> ids1, ids2;
std::vector<float> dist1, dist2;
ms::ResultSet result;
BuildResult(ids1, dist1, topk_1, nq, ascending);
BuildResult(ids2, dist2, topk_2, nq, ascending);
uint64_t result_topk = std::min(topk, topk_1 + topk_2);
ms::XSearchTask::MergeTopkArray(ids1, dist1, topk_1, ids2, dist2, topk_2, nq, topk, ascending);
if (ids1.size() != result_topk * nq) {
std::cout << ids1.size() << " " << result_topk * nq << std::endl;
}
ASSERT_TRUE(ids1.size() == result_topk * nq);
ASSERT_TRUE(dist1.size() == result_topk * nq);
for (uint64_t i = 0; i < nq; i++) {
for (uint64_t k = 1; k < result_topk; k++) {
if (ascending) {
if (dist1[i * result_topk + k] < dist1[i * result_topk + k - 1]) {
std::cout << dist1[i * result_topk + k - 1] << " " << dist1[i * result_topk + k] << std::endl;
}
ASSERT_TRUE(dist1[i * result_topk + k] >= dist1[i * result_topk + k - 1]);
} else {
if (dist1[i * result_topk + k] > dist1[i * result_topk + k - 1]) {
std::cout << dist1[i * result_topk + k - 1] << " " << dist1[i * result_topk + k] << std::endl;
}
ASSERT_TRUE(dist1[i * result_topk + k] <= dist1[i * result_topk + k - 1]);
}
}
}
}
TEST(DBSearchTest, MERGE_ARRAY_TEST) {
uint64_t NQ = 15;
uint64_t TOP_K = 64;
/* test1, id1/dist1 valid, id2/dist2 empty */
BuildResult(ids1, dist1, TOP_K, NQ, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, false);
MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, true);
MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, false);
/* test2, id1/dist1 valid, id2/dist2 valid */
BuildResult(ids2, dist2, TOP_K, NQ, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, false);
/* test3, id1/dist1 small topk */
ids1.clear();
dist1.clear();
result.clear();
BuildResult(ids1, dist1, TOP_K/2, NQ, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result);
ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, false);
MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, false);
/* test4, id1/dist1 small topk, id2/dist2 small topk */
ids2.clear();
dist2.clear();
result.clear();
BuildResult(ids2, dist2, TOP_K/3, NQ, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result);
ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result);
MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, false);
MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, false);
}
TEST(DBSearchTest, REDUCE_PERF_TEST) {
int32_t index_file_num = 478; /* sift1B dataset, index files num */
bool ascending = true;
std::vector<int32_t> thread_vec = {4, 8, 11};
std::vector<int32_t> nq_vec = {1, 10, 100, 1000};
std::vector<int32_t> topk_vec = {1, 4, 16, 64, 256, 1024};
std::vector<int32_t> thread_vec = {4, 8};
std::vector<int32_t> nq_vec = {1, 10, 100};
std::vector<int32_t> topk_vec = {1, 4, 16, 64};
int32_t NQ = nq_vec[nq_vec.size()-1];
int32_t TOPK = topk_vec[topk_vec.size()-1];