#59 Topk result is incorrect for small dataset

Former-commit-id: f8c4a38365881252e66f280647aedf1e4c6395e5
This commit is contained in:
starlord 2019-10-21 19:24:18 +08:00
parent 18108119b2
commit 19dca4c969
3 changed files with 261 additions and 243 deletions

View File

@ -307,71 +307,71 @@ XSearchTask::MergeTopkToResultSet(const std::vector<int64_t>& input_ids, const s
}
}
void
XSearchTask::MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& tar_distance, uint64_t& tar_input_k,
const std::vector<int64_t>& src_ids, const std::vector<float>& src_distance,
uint64_t src_input_k, uint64_t nq, uint64_t topk, bool ascending) {
if (src_ids.empty() || src_distance.empty()) {
return;
}
uint64_t output_k = std::min(topk, tar_input_k + src_input_k);
std::vector<int64_t> id_buf(nq * output_k, -1);
std::vector<float> dist_buf(nq * output_k, 0.0);
uint64_t buf_k, src_k, tar_k;
uint64_t src_idx, tar_idx, buf_idx;
uint64_t src_input_k_multi_i, tar_input_k_multi_i, buf_k_multi_i;
for (uint64_t i = 0; i < nq; i++) {
src_input_k_multi_i = src_input_k * i;
tar_input_k_multi_i = tar_input_k * i;
buf_k_multi_i = output_k * i;
buf_k = src_k = tar_k = 0;
while (buf_k < output_k && src_k < src_input_k && tar_k < tar_input_k) {
src_idx = src_input_k_multi_i + src_k;
tar_idx = tar_input_k_multi_i + tar_k;
buf_idx = buf_k_multi_i + buf_k;
if ((ascending && src_distance[src_idx] < tar_distance[tar_idx]) ||
(!ascending && src_distance[src_idx] > tar_distance[tar_idx])) {
id_buf[buf_idx] = src_ids[src_idx];
dist_buf[buf_idx] = src_distance[src_idx];
src_k++;
} else {
id_buf[buf_idx] = tar_ids[tar_idx];
dist_buf[buf_idx] = tar_distance[tar_idx];
tar_k++;
}
buf_k++;
}
if (buf_k < output_k) {
if (src_k < src_input_k) {
while (buf_k < output_k && src_k < src_input_k) {
src_idx = src_input_k_multi_i + src_k;
buf_idx = buf_k_multi_i + buf_k;
id_buf[buf_idx] = src_ids[src_idx];
dist_buf[buf_idx] = src_distance[src_idx];
src_k++;
buf_k++;
}
} else {
while (buf_k < output_k && tar_k < tar_input_k) {
tar_idx = tar_input_k_multi_i + tar_k;
buf_idx = buf_k_multi_i + buf_k;
id_buf[buf_idx] = tar_ids[tar_idx];
dist_buf[buf_idx] = tar_distance[tar_idx];
tar_k++;
buf_k++;
}
}
}
}
tar_ids.swap(id_buf);
tar_distance.swap(dist_buf);
tar_input_k = output_k;
}
//void
//XSearchTask::MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& tar_distance, uint64_t& tar_input_k,
// const std::vector<int64_t>& src_ids, const std::vector<float>& src_distance,
// uint64_t src_input_k, uint64_t nq, uint64_t topk, bool ascending) {
// if (src_ids.empty() || src_distance.empty()) {
// return;
// }
//
// uint64_t output_k = std::min(topk, tar_input_k + src_input_k);
// std::vector<int64_t> id_buf(nq * output_k, -1);
// std::vector<float> dist_buf(nq * output_k, 0.0);
//
// uint64_t buf_k, src_k, tar_k;
// uint64_t src_idx, tar_idx, buf_idx;
// uint64_t src_input_k_multi_i, tar_input_k_multi_i, buf_k_multi_i;
//
// for (uint64_t i = 0; i < nq; i++) {
// src_input_k_multi_i = src_input_k * i;
// tar_input_k_multi_i = tar_input_k * i;
// buf_k_multi_i = output_k * i;
// buf_k = src_k = tar_k = 0;
// while (buf_k < output_k && src_k < src_input_k && tar_k < tar_input_k) {
// src_idx = src_input_k_multi_i + src_k;
// tar_idx = tar_input_k_multi_i + tar_k;
// buf_idx = buf_k_multi_i + buf_k;
// if ((ascending && src_distance[src_idx] < tar_distance[tar_idx]) ||
// (!ascending && src_distance[src_idx] > tar_distance[tar_idx])) {
// id_buf[buf_idx] = src_ids[src_idx];
// dist_buf[buf_idx] = src_distance[src_idx];
// src_k++;
// } else {
// id_buf[buf_idx] = tar_ids[tar_idx];
// dist_buf[buf_idx] = tar_distance[tar_idx];
// tar_k++;
// }
// buf_k++;
// }
//
// if (buf_k < output_k) {
// if (src_k < src_input_k) {
// while (buf_k < output_k && src_k < src_input_k) {
// src_idx = src_input_k_multi_i + src_k;
// buf_idx = buf_k_multi_i + buf_k;
// id_buf[buf_idx] = src_ids[src_idx];
// dist_buf[buf_idx] = src_distance[src_idx];
// src_k++;
// buf_k++;
// }
// } else {
// while (buf_k < output_k && tar_k < tar_input_k) {
// tar_idx = tar_input_k_multi_i + tar_k;
// buf_idx = buf_k_multi_i + buf_k;
// id_buf[buf_idx] = tar_ids[tar_idx];
// dist_buf[buf_idx] = tar_distance[tar_idx];
// tar_k++;
// buf_k++;
// }
// }
// }
// }
//
// tar_ids.swap(id_buf);
// tar_distance.swap(dist_buf);
// tar_input_k = output_k;
//}
} // namespace scheduler
} // namespace milvus

View File

@ -42,10 +42,10 @@ class XSearchTask : public Task {
MergeTopkToResultSet(const std::vector<int64_t>& input_ids, const std::vector<float>& input_distance,
uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result);
static void
MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& tar_distance, uint64_t& tar_input_k,
const std::vector<int64_t>& src_ids, const std::vector<float>& src_distance, uint64_t src_input_k,
uint64_t nq, uint64_t topk, bool ascending);
// static void
// MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& tar_distance, uint64_t& tar_input_k,
// const std::vector<int64_t>& src_ids, const std::vector<float>& src_distance, uint64_t src_input_k,
// uint64_t nq, uint64_t topk, bool ascending);
public:
TableFileSchemaPtr file_;

View File

@ -30,6 +30,7 @@ namespace ms = milvus::scheduler;
void
BuildResult(std::vector<int64_t>& output_ids,
std::vector<float>& output_distance,
uint64_t input_k,
uint64_t topk,
uint64_t nq,
bool ascending) {
@ -39,9 +40,15 @@ BuildResult(std::vector<int64_t>& output_ids,
output_distance.resize(nq * topk);
for (uint64_t i = 0; i < nq; i++) {
for (uint64_t j = 0; j < topk; j++) {
//insert valid items
for (uint64_t j = 0; j < input_k; j++) {
output_ids[i * topk + j] = (int64_t)(drand48() * 100000);
output_distance[i * topk + j] = ascending ? (j + drand48()) : ((topk - j) + drand48());
output_distance[i * topk + j] = ascending ? (j + drand48()) : ((input_k - j) + drand48());
}
//insert invalid items
for(uint64_t j = input_k; j < topk; j++) {
output_ids[i * topk + j] = -1;
output_distance[i * topk + j] = -1.0;
}
}
}
@ -83,23 +90,32 @@ CheckTopkResult(const std::vector<int64_t>& input_ids_1,
ASSERT_EQ(input_ids_1.size(), input_distance_1.size());
ASSERT_EQ(input_ids_2.size(), input_distance_2.size());
uint64_t input_k1 = input_ids_1.size() / nq;
uint64_t input_k2 = input_ids_2.size() / nq;
for (int64_t i = 0; i < nq; i++) {
std::vector<float>
src_vec(input_distance_1.begin() + i * input_k1, input_distance_1.begin() + (i + 1) * input_k1);
src_vec(input_distance_1.begin() + i * topk, input_distance_1.begin() + (i + 1) * topk);
src_vec.insert(src_vec.end(),
input_distance_2.begin() + i * input_k2,
input_distance_2.begin() + (i + 1) * input_k2);
input_distance_2.begin() + i * topk,
input_distance_2.begin() + (i + 1) * topk);
if (ascending) {
std::sort(src_vec.begin(), src_vec.end());
} else {
std::sort(src_vec.begin(), src_vec.end(), std::greater<float>());
}
uint64_t n = std::min(topk, input_k1 + input_k2);
//erase invalid items
std::vector<float>::iterator iter;
for (iter = src_vec.begin(); iter != src_vec.end();) {
if (*iter < 0.0)
iter = src_vec.erase(iter);
else
++iter;
}
uint64_t n = std::min(topk, result[i].size());
for (uint64_t j = 0; j < n; j++) {
if(result[i][j].first < 0) {
continue;
}
if (src_vec[j] != result[i][j].second) {
std::cout << src_vec[j] << " " << result[i][j].second << std::endl;
}
@ -114,8 +130,8 @@ void MergeTopkToResultSetTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uin
std::vector<int64_t> ids1, ids2;
std::vector<float> dist1, dist2;
ms::ResultSet result;
BuildResult(ids1, dist1, topk_1, nq, ascending);
BuildResult(ids2, dist2, topk_2, nq, ascending);
BuildResult(ids1, dist1, topk_1, topk, nq, ascending);
BuildResult(ids2, dist2, topk_2, topk, nq, ascending);
ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, topk_1, nq, topk, ascending, result);
ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, topk_2, nq, topk, ascending, result);
CheckTopkResult(ids1, dist1, ids2, dist2, topk, nq, ascending, result);
@ -142,62 +158,64 @@ TEST(DBSearchTest, MERGE_RESULT_SET_TEST) {
MergeTopkToResultSetTest(TOP_K/2, TOP_K/3, NQ, TOP_K, false);
}
void MergeTopkArrayTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uint64_t topk, bool ascending) {
std::vector<int64_t> ids1, ids2;
std::vector<float> dist1, dist2;
ms::ResultSet result;
BuildResult(ids1, dist1, topk_1, nq, ascending);
BuildResult(ids2, dist2, topk_2, nq, ascending);
uint64_t result_topk = std::min(topk, topk_1 + topk_2);
ms::XSearchTask::MergeTopkArray(ids1, dist1, topk_1, ids2, dist2, topk_2, nq, topk, ascending);
if (ids1.size() != result_topk * nq) {
std::cout << ids1.size() << " " << result_topk * nq << std::endl;
}
ASSERT_TRUE(ids1.size() == result_topk * nq);
ASSERT_TRUE(dist1.size() == result_topk * nq);
for (uint64_t i = 0; i < nq; i++) {
for (uint64_t k = 1; k < result_topk; k++) {
if (ascending) {
if (dist1[i * result_topk + k] < dist1[i * result_topk + k - 1]) {
std::cout << dist1[i * result_topk + k - 1] << " " << dist1[i * result_topk + k] << std::endl;
}
ASSERT_TRUE(dist1[i * result_topk + k] >= dist1[i * result_topk + k - 1]);
} else {
if (dist1[i * result_topk + k] > dist1[i * result_topk + k - 1]) {
std::cout << dist1[i * result_topk + k - 1] << " " << dist1[i * result_topk + k] << std::endl;
}
ASSERT_TRUE(dist1[i * result_topk + k] <= dist1[i * result_topk + k - 1]);
}
}
}
}
//void MergeTopkArrayTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uint64_t topk, bool ascending) {
// std::vector<int64_t> ids1, ids2;
// std::vector<float> dist1, dist2;
// ms::ResultSet result;
// BuildResult(ids1, dist1, topk_1, topk, nq, ascending);
// BuildResult(ids2, dist2, topk_2, topk, nq, ascending);
// uint64_t result_topk = std::min(topk, topk_1 + topk_2);
// ms::XSearchTask::MergeTopkArray(ids1, dist1, topk_1, ids2, dist2, topk_2, nq, topk, ascending);
// if (ids1.size() != result_topk * nq) {
// std::cout << ids1.size() << " " << result_topk * nq << std::endl;
// }
// ASSERT_TRUE(ids1.size() == result_topk * nq);
// ASSERT_TRUE(dist1.size() == result_topk * nq);
// for (uint64_t i = 0; i < nq; i++) {
// for (uint64_t k = 1; k < result_topk; k++) {
// float f0 = dist1[i * topk + k - 1];
// float f1 = dist1[i * topk + k];
// if (ascending) {
// if (f1 < f0) {
// std::cout << f0 << " " << f1 << std::endl;
// }
// ASSERT_TRUE(f1 >= f0);
// } else {
// if (f1 > f0) {
// std::cout << f0 << " " << f1 << std::endl;
// }
// ASSERT_TRUE(f1 <= f0);
// }
// }
// }
//}
TEST(DBSearchTest, MERGE_ARRAY_TEST) {
uint64_t NQ = 15;
uint64_t TOP_K = 64;
//TEST(DBSearchTest, MERGE_ARRAY_TEST) {
// uint64_t NQ = 15;
// uint64_t TOP_K = 64;
//
// /* test1, id1/dist1 valid, id2/dist2 empty */
// MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, true);
// MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, false);
// MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, true);
// MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, false);
/* test1, id1/dist1 valid, id2/dist2 empty */
MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, false);
MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, true);
MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, false);
/* test2, id1/dist1 valid, id2/dist2 valid */
MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, false);
/* test3, id1/dist1 small topk */
MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, false);
MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, false);
/* test4, id1/dist1 small topk, id2/dist2 small topk */
MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, false);
MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, true);
MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, false);
}
// /* test2, id1/dist1 valid, id2/dist2 valid */
// MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, true);
// MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, false);
//
// /* test3, id1/dist1 small topk */
// MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, true);
// MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, false);
// MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, true);
// MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, false);
//
// /* test4, id1/dist1 small topk, id2/dist2 small topk */
// MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, true);
// MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, false);
// MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, true);
// MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, false);
//}
TEST(DBSearchTest, REDUCE_PERF_TEST) {
int32_t index_file_num = 478; /* sift1B dataset, index files num */
@ -217,7 +235,7 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) {
/* generate testing data */
for (i = 0; i < index_file_num; i++) {
BuildResult(input_ids, input_distance, TOPK, NQ, ascending);
BuildResult(input_ids, input_distance, TOPK, TOPK, NQ, ascending);
id_vec.push_back(input_ids);
dist_vec.push_back(input_distance);
}
@ -255,114 +273,114 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) {
rc1.RecordSection("reduce done");
///////////////////////////////////////////////////////////////////////////////////////
/* method-2 */
std::vector<std::vector<int64_t>> id_vec_2(index_file_num);
std::vector<std::vector<float>> dist_vec_2(index_file_num);
std::vector<uint64_t> k_vec_2(index_file_num);
for (i = 0; i < index_file_num; i++) {
CopyResult(id_vec_2[i], dist_vec_2[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
k_vec_2[i] = top_k;
}
std::string str2 = "Method-2 " + std::to_string(max_thread_num) + " " +
std::to_string(nq) + " " + std::to_string(top_k);
milvus::TimeRecorder rc2(str2);
for (step = 1; step < index_file_num; step *= 2) {
for (i = 0; i + step < index_file_num; i += step * 2) {
ms::XSearchTask::MergeTopkArray(id_vec_2[i], dist_vec_2[i], k_vec_2[i],
id_vec_2[i + step], dist_vec_2[i + step], k_vec_2[i + step],
nq, top_k, ascending);
}
}
ms::XSearchTask::MergeTopkToResultSet(id_vec_2[0],
dist_vec_2[0],
k_vec_2[0],
nq,
top_k,
ascending,
final_result_2);
ASSERT_EQ(final_result_2.size(), nq);
rc2.RecordSection("reduce done");
for (i = 0; i < nq; i++) {
ASSERT_EQ(final_result[i].size(), final_result_2[i].size());
for (k = 0; k < final_result[i].size(); k++) {
if (final_result[i][k].first != final_result_2[i][k].first) {
std::cout << i << " " << k << std::endl;
}
ASSERT_EQ(final_result[i][k].first, final_result_2[i][k].first);
ASSERT_EQ(final_result[i][k].second, final_result_2[i][k].second);
}
}
///////////////////////////////////////////////////////////////////////////////////////
/* method-3 parallel */
std::vector<std::vector<int64_t>> id_vec_3(index_file_num);
std::vector<std::vector<float>> dist_vec_3(index_file_num);
std::vector<uint64_t> k_vec_3(index_file_num);
for (i = 0; i < index_file_num; i++) {
CopyResult(id_vec_3[i], dist_vec_3[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
k_vec_3[i] = top_k;
}
std::string str3 = "Method-3 " + std::to_string(max_thread_num) + " " +
std::to_string(nq) + " " + std::to_string(top_k);
milvus::TimeRecorder rc3(str3);
for (step = 1; step < index_file_num; step *= 2) {
for (i = 0; i + step < index_file_num; i += step * 2) {
threads_list.push_back(
threadPool.enqueue(ms::XSearchTask::MergeTopkArray,
std::ref(id_vec_3[i]),
std::ref(dist_vec_3[i]),
std::ref(k_vec_3[i]),
std::ref(id_vec_3[i + step]),
std::ref(dist_vec_3[i + step]),
std::ref(k_vec_3[i + step]),
nq,
top_k,
ascending));
}
while (threads_list.size() > 0) {
int nready = 0;
for (auto it = threads_list.begin(); it != threads_list.end(); it = it) {
auto &p = *it;
std::chrono::milliseconds span(0);
if (p.wait_for(span) == std::future_status::ready) {
threads_list.erase(it++);
++nready;
} else {
++it;
}
}
if (nready == 0) {
std::this_thread::yield();
}
}
}
ms::XSearchTask::MergeTopkToResultSet(id_vec_3[0],
dist_vec_3[0],
k_vec_3[0],
nq,
top_k,
ascending,
final_result_3);
ASSERT_EQ(final_result_3.size(), nq);
rc3.RecordSection("reduce done");
for (i = 0; i < nq; i++) {
ASSERT_EQ(final_result[i].size(), final_result_3[i].size());
for (k = 0; k < final_result[i].size(); k++) {
ASSERT_EQ(final_result[i][k].first, final_result_3[i][k].first);
ASSERT_EQ(final_result[i][k].second, final_result_3[i][k].second);
}
}
// ///////////////////////////////////////////////////////////////////////////////////////
// /* method-2 */
// std::vector<std::vector<int64_t>> id_vec_2(index_file_num);
// std::vector<std::vector<float>> dist_vec_2(index_file_num);
// std::vector<uint64_t> k_vec_2(index_file_num);
// for (i = 0; i < index_file_num; i++) {
// CopyResult(id_vec_2[i], dist_vec_2[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
// k_vec_2[i] = top_k;
// }
//
// std::string str2 = "Method-2 " + std::to_string(max_thread_num) + " " +
// std::to_string(nq) + " " + std::to_string(top_k);
// milvus::TimeRecorder rc2(str2);
//
// for (step = 1; step < index_file_num; step *= 2) {
// for (i = 0; i + step < index_file_num; i += step * 2) {
// ms::XSearchTask::MergeTopkArray(id_vec_2[i], dist_vec_2[i], k_vec_2[i],
// id_vec_2[i + step], dist_vec_2[i + step], k_vec_2[i + step],
// nq, top_k, ascending);
// }
// }
// ms::XSearchTask::MergeTopkToResultSet(id_vec_2[0],
// dist_vec_2[0],
// k_vec_2[0],
// nq,
// top_k,
// ascending,
// final_result_2);
// ASSERT_EQ(final_result_2.size(), nq);
//
// rc2.RecordSection("reduce done");
//
// for (i = 0; i < nq; i++) {
// ASSERT_EQ(final_result[i].size(), final_result_2[i].size());
// for (k = 0; k < final_result[i].size(); k++) {
// if (final_result[i][k].first != final_result_2[i][k].first) {
// std::cout << i << " " << k << std::endl;
// }
// ASSERT_EQ(final_result[i][k].first, final_result_2[i][k].first);
// ASSERT_EQ(final_result[i][k].second, final_result_2[i][k].second);
// }
// }
//
// ///////////////////////////////////////////////////////////////////////////////////////
// /* method-3 parallel */
// std::vector<std::vector<int64_t>> id_vec_3(index_file_num);
// std::vector<std::vector<float>> dist_vec_3(index_file_num);
// std::vector<uint64_t> k_vec_3(index_file_num);
// for (i = 0; i < index_file_num; i++) {
// CopyResult(id_vec_3[i], dist_vec_3[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
// k_vec_3[i] = top_k;
// }
//
// std::string str3 = "Method-3 " + std::to_string(max_thread_num) + " " +
// std::to_string(nq) + " " + std::to_string(top_k);
// milvus::TimeRecorder rc3(str3);
//
// for (step = 1; step < index_file_num; step *= 2) {
// for (i = 0; i + step < index_file_num; i += step * 2) {
// threads_list.push_back(
// threadPool.enqueue(ms::XSearchTask::MergeTopkArray,
// std::ref(id_vec_3[i]),
// std::ref(dist_vec_3[i]),
// std::ref(k_vec_3[i]),
// std::ref(id_vec_3[i + step]),
// std::ref(dist_vec_3[i + step]),
// std::ref(k_vec_3[i + step]),
// nq,
// top_k,
// ascending));
// }
//
// while (threads_list.size() > 0) {
// int nready = 0;
// for (auto it = threads_list.begin(); it != threads_list.end(); it = it) {
// auto &p = *it;
// std::chrono::milliseconds span(0);
// if (p.wait_for(span) == std::future_status::ready) {
// threads_list.erase(it++);
// ++nready;
// } else {
// ++it;
// }
// }
//
// if (nready == 0) {
// std::this_thread::yield();
// }
// }
// }
// ms::XSearchTask::MergeTopkToResultSet(id_vec_3[0],
// dist_vec_3[0],
// k_vec_3[0],
// nq,
// top_k,
// ascending,
// final_result_3);
// ASSERT_EQ(final_result_3.size(), nq);
//
// rc3.RecordSection("reduce done");
//
// for (i = 0; i < nq; i++) {
// ASSERT_EQ(final_result[i].size(), final_result_3[i].size());
// for (k = 0; k < final_result[i].size(); k++) {
// ASSERT_EQ(final_result[i][k].first, final_result_3[i][k].first);
// ASSERT_EQ(final_result[i][k].second, final_result_3[i][k].second);
// }
// }
}
}
}