milvus/core/unittest/test_concurrent_vector.cpp
FluorineDog b80de55ac8 Add Testcase for indexing
Signed-off-by: FluorineDog <guilin.gou@zilliz.com>
2020-09-15 10:00:00 +08:00

130 lines
4.1 KiB
C++

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <gtest/gtest.h>
#include <iostream>
#include <random>
#include <string>
#include <thread>
#include <vector>
#include "dog_segment/ConcurrentVector.h"
#include "dog_segment/SegmentBase.h"
// #include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "dog_segment/SegmentBase.h"
#include "dog_segment/AckResponder.h"
using std::cin;
using std::cout;
using std::endl;
using namespace milvus::engine;
using namespace milvus::dog_segment;
using std::vector;
TEST(ConcurrentVector, TestABI) {
ASSERT_EQ(TestABI(), 42);
assert(true);
}
TEST(ConcurrentVector, TestSingle) {
auto dim = 8;
ConcurrentVector<int, false, 32> c_vec(dim);
std::default_random_engine e(42);
int data = 0;
auto total_count = 0;
for (int i = 0; i < 10000; ++i) {
int insert_size = e() % 150;
vector<int> vec(insert_size * dim);
for (auto& x : vec) {
x = data++;
}
c_vec.grow_to_at_least(total_count + insert_size);
c_vec.set_data(total_count, vec.data(), insert_size);
total_count += insert_size;
}
ASSERT_EQ(c_vec.chunk_size(), (total_count + 31) / 32);
for (int i = 0; i < total_count; ++i) {
for (int d = 0; d < dim; ++d) {
auto std_data = d + i * dim;
ASSERT_EQ(c_vec.get_element(i)[d], std_data);
}
}
}
TEST(ConcurrentVector, TestMultithreads) {
auto dim = 8;
constexpr int threads = 16;
std::vector<int64_t> total_counts(threads);
ConcurrentVector<int64_t, false, 32> c_vec(dim);
std::atomic<int64_t> ack_counter = 0;
// std::mutex mutex;
auto executor = [&](int thread_id) {
std::default_random_engine e(42 + thread_id);
int64_t data = 0;
int64_t total_count = 0;
for (int i = 0; i < 10000; ++i) {
// std::lock_guard lck(mutex);
int insert_size = e() % 150;
vector<int64_t> vec(insert_size * dim);
for (auto& x : vec) {
x = data++ * threads + thread_id;
}
auto offset = ack_counter.fetch_add(insert_size);
c_vec.grow_to_at_least(offset + insert_size);
c_vec.set_data(offset, vec.data(), insert_size);
total_count += insert_size;
}
assert(data == total_count * dim);
total_counts[thread_id] = total_count;
};
std::vector<std::thread> pool;
for (int i = 0; i < threads; ++i) {
pool.emplace_back(executor, i);
}
for (auto& thread : pool) {
thread.join();
}
std::vector<int64_t> counts(threads);
auto N = ack_counter.load();
for (int64_t i = 0; i < N; ++i) {
for (int d = 0; d < dim; ++d) {
auto data = c_vec.get_element(i)[d];
auto thread_id = data % threads;
auto raw_data = data / threads;
auto std_data = counts[thread_id]++;
ASSERT_EQ(raw_data, std_data) << data;
}
}
}
TEST(ConcurrentVector, TestAckSingle) {
std::vector<std::tuple<int64_t, int64_t, int64_t>> raw_data;
std::default_random_engine e(42);
AckResponder ack;
int N = 10000;
for(int i = 0; i < 10000; ++i) {
auto weight = i + e() % 100;
raw_data.emplace_back(weight, i, (i + 1));
}
std::sort(raw_data.begin(), raw_data.end());
for(auto [_, b, e]: raw_data) {
EXPECT_LE(ack.GetAck(), b);
ack.AddSegment(b, e);
auto seg = ack.GetAck();
EXPECT_GE(seg + 100, b);
}
EXPECT_EQ(ack.GetAck(), N);
}