mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
enhance: support sparse vector mmap in growing segment type (#36566)
issue: https://github.com/milvus-io/milvus/issues/32984 related pr: https://github.com/milvus-io/milvus/pull/36565 Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
This commit is contained in:
parent
bb3ef5349f
commit
aa904be6ec
@ -78,7 +78,8 @@ constexpr bool IsVariableType =
|
||||
template <typename T>
|
||||
constexpr bool IsVariableTypeSupportInChunk =
|
||||
std::is_same_v<T, std::string> || std::is_same_v<T, Array> ||
|
||||
std::is_same_v<T, Json>;
|
||||
std::is_same_v<T, Json> ||
|
||||
std::is_same_v<T, knowhere::sparse::SparseRow<float>>;
|
||||
|
||||
template <typename T>
|
||||
using ChunkViewType = std::conditional_t<
|
||||
|
@ -40,14 +40,10 @@ struct FixedLengthChunk {
|
||||
size() {
|
||||
return size_;
|
||||
};
|
||||
Type
|
||||
get(const int i) const {
|
||||
return data_[i];
|
||||
}
|
||||
const Type&
|
||||
view(const int i) const {
|
||||
return data_[i];
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
int64_t size_ = 0;
|
||||
@ -73,19 +69,10 @@ struct VariableLengthChunk {
|
||||
throw std::runtime_error(
|
||||
"set should be a template specialization function");
|
||||
}
|
||||
inline Type
|
||||
get(const int i) const {
|
||||
throw std::runtime_error(
|
||||
"get should be a template specialization function");
|
||||
}
|
||||
const ChunkViewType<Type>&
|
||||
view(const int i) const {
|
||||
return data_[i];
|
||||
}
|
||||
const ChunkViewType<Type>&
|
||||
operator[](const int i) const {
|
||||
return view(i);
|
||||
}
|
||||
void*
|
||||
data() {
|
||||
return data_.data();
|
||||
@ -100,6 +87,8 @@ struct VariableLengthChunk {
|
||||
FixedVector<ChunkViewType<Type>> data_;
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
};
|
||||
|
||||
// Template specialization for string
|
||||
template <>
|
||||
inline void
|
||||
VariableLengthChunk<std::string>::set(const std::string* src,
|
||||
@ -128,12 +117,40 @@ VariableLengthChunk<std::string>::set(const std::string* src,
|
||||
offset += data_size;
|
||||
}
|
||||
}
|
||||
|
||||
// Template specialization for sparse vector
|
||||
template <>
|
||||
inline std::string
|
||||
VariableLengthChunk<std::string>::get(const int i) const {
|
||||
// copy to a string
|
||||
return std::string(data_[i]);
|
||||
inline void
|
||||
VariableLengthChunk<knowhere::sparse::SparseRow<float>>::set(
|
||||
const knowhere::sparse::SparseRow<float>* src,
|
||||
uint32_t begin,
|
||||
uint32_t length) {
|
||||
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
milvus::ErrorCode err_code;
|
||||
AssertInfo(
|
||||
begin + length <= size_,
|
||||
"failed to set a chunk with length: {} from beign {}, map_size={}",
|
||||
length,
|
||||
begin,
|
||||
size_);
|
||||
|
||||
size_t total_size = 0;
|
||||
for (auto i = 0; i < length; i++) {
|
||||
total_size += src[i].data_byte_size();
|
||||
}
|
||||
auto buf = (uint8_t*)mcm->Allocate(mmap_descriptor_, total_size);
|
||||
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
|
||||
for (auto i = 0, offset = 0; i < length; i++) {
|
||||
auto data_size = src[i].data_byte_size();
|
||||
uint8_t* data_ptr = buf + offset;
|
||||
std::memcpy(data_ptr, (uint8_t*)src[i].data(), data_size);
|
||||
data_[i + begin] =
|
||||
knowhere::sparse::SparseRow<float>(src[i].size(), data_ptr, false);
|
||||
offset += data_size;
|
||||
}
|
||||
}
|
||||
|
||||
// Template specialization for json
|
||||
template <>
|
||||
inline void
|
||||
VariableLengthChunk<Json>::set(const Json* src,
|
||||
@ -162,11 +179,8 @@ VariableLengthChunk<Json>::set(const Json* src,
|
||||
offset += data_size;
|
||||
}
|
||||
}
|
||||
template <>
|
||||
inline Json
|
||||
VariableLengthChunk<Json>::get(const int i) const {
|
||||
return std::move(Json(simdjson::padded_string(data_[i].data())));
|
||||
}
|
||||
|
||||
// Template specialization for array
|
||||
template <>
|
||||
inline void
|
||||
VariableLengthChunk<Array>::set(const Array* src,
|
||||
@ -198,14 +212,5 @@ VariableLengthChunk<Array>::set(const Array* src,
|
||||
offset += data_size;
|
||||
}
|
||||
}
|
||||
template <>
|
||||
inline Array
|
||||
VariableLengthChunk<Array>::get(const int i) const {
|
||||
auto array_view_i = data_[i];
|
||||
char* data = static_cast<char*>(const_cast<void*>(array_view_i.data()));
|
||||
return Array(data,
|
||||
array_view_i.byte_size(),
|
||||
array_view_i.get_element_type(),
|
||||
array_view_i.get_offsets_in_copy());
|
||||
}
|
||||
|
||||
} // namespace milvus
|
@ -32,8 +32,6 @@ class ChunkVectorBase {
|
||||
get_chunk_data(int64_t index) = 0;
|
||||
virtual int64_t
|
||||
get_chunk_size(int64_t index) = 0;
|
||||
virtual Type
|
||||
get_element(int64_t chunk_id, int64_t chunk_offset) = 0;
|
||||
virtual int64_t
|
||||
get_element_size() = 0;
|
||||
virtual int64_t
|
||||
@ -106,23 +104,6 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
|
||||
}
|
||||
}
|
||||
|
||||
Type
|
||||
get_element(int64_t chunk_id, int64_t chunk_offset) override {
|
||||
std::shared_lock<std::shared_mutex> lck(mutex_);
|
||||
auto chunk = vec_[chunk_id];
|
||||
AssertInfo(
|
||||
chunk_id < this->counter_ && chunk_offset < chunk.size(),
|
||||
fmt::format("index out of range, index={}, chunk_offset={}, cap={}",
|
||||
chunk_id,
|
||||
chunk_offset,
|
||||
chunk.size()));
|
||||
if constexpr (IsMmap) {
|
||||
return chunk.get(chunk_offset);
|
||||
} else {
|
||||
return chunk[chunk_offset];
|
||||
}
|
||||
}
|
||||
|
||||
ChunkViewType<Type>
|
||||
view_element(int64_t chunk_id, int64_t chunk_offset) override {
|
||||
std::shared_lock<std::shared_mutex> lck(mutex_);
|
||||
@ -229,7 +210,6 @@ SelectChunkVectorPtr(storage::MmapChunkDescriptorPtr& mmap_descriptor) {
|
||||
return std::make_unique<ThreadSafeChunkVector<Type>>();
|
||||
}
|
||||
} else {
|
||||
// todo: sparse float vector support mmap
|
||||
return std::make_unique<ThreadSafeChunkVector<Type>>();
|
||||
}
|
||||
}
|
||||
|
@ -22,8 +22,7 @@
|
||||
using namespace milvus::segcore;
|
||||
using namespace milvus;
|
||||
namespace pb = milvus::proto;
|
||||
|
||||
class ChunkVectorTest : public testing::Test {
|
||||
class ChunkVectorTest : public ::testing::TestWithParam<bool> {
|
||||
public:
|
||||
void
|
||||
SetUp() override {
|
||||
@ -172,9 +171,126 @@ TEST_F(ChunkVectorTest, FillDataWithMmap) {
|
||||
num_inserted);
|
||||
EXPECT_EQ(float_array_result->scalars().array_data().data_size(),
|
||||
num_inserted);
|
||||
// checking dense/sparse vector
|
||||
auto fp32_vec_res =
|
||||
fp32_vec_result.get()->mutable_vectors()->float_vector().data();
|
||||
auto fp16_vec_res = (float16*)fp16_vec_result.get()
|
||||
->mutable_vectors()
|
||||
->float16_vector()
|
||||
.data();
|
||||
auto bf16_vec_res = (bfloat16*)bf16_vec_result.get()
|
||||
->mutable_vectors()
|
||||
->bfloat16_vector()
|
||||
.data();
|
||||
auto sparse_vec_res = SparseBytesToRows(
|
||||
sparse_vec_result->vectors().sparse_float_vector().contents());
|
||||
EXPECT_TRUE(fp32_vec_res.size() == num_inserted * dim);
|
||||
auto fp32_vec_gt = dataset.get_col<float>(fp32_vec);
|
||||
auto fp16_vec_gt = dataset.get_col<float16>(fp16_vec);
|
||||
auto bf16_vec_gt = dataset.get_col<bfloat16>(bf16_vec);
|
||||
auto sparse_vec_gt =
|
||||
dataset.get_col<knowhere::sparse::SparseRow<float>>(sparse_vec);
|
||||
|
||||
for (size_t i = 0; i < num_inserted; ++i) {
|
||||
auto id = ids_ds->GetIds()[i];
|
||||
// check dense vector
|
||||
for (size_t j = 0; j < 128; ++j) {
|
||||
EXPECT_TRUE(fp32_vec_res[i * dim + j] ==
|
||||
fp32_vec_gt[(id % per_batch) * dim + j]);
|
||||
EXPECT_TRUE(fp16_vec_res[i * dim + j] ==
|
||||
fp16_vec_gt[(id % per_batch) * dim + j]);
|
||||
EXPECT_TRUE(bf16_vec_res[i * dim + j] ==
|
||||
bf16_vec_gt[(id % per_batch) * dim + j]);
|
||||
}
|
||||
//check sparse vector
|
||||
auto actual_row = sparse_vec_res[i];
|
||||
auto expected_row = sparse_vec_gt[(id % per_batch)];
|
||||
EXPECT_TRUE(actual_row.size() == expected_row.size());
|
||||
for (size_t j = 0; j < actual_row.size(); ++j) {
|
||||
EXPECT_TRUE(actual_row[j].id == expected_row[j].id);
|
||||
EXPECT_TRUE(actual_row[j].val == expected_row[j].val);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(IsSparse, ChunkVectorTest, ::testing::Bool());
|
||||
TEST_P(ChunkVectorTest, SearchWithMmap) {
|
||||
auto is_sparse = GetParam();
|
||||
auto data_type =
|
||||
is_sparse ? DataType::VECTOR_SPARSE_FLOAT : DataType::VECTOR_FLOAT;
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto pk = schema->AddDebugField("pk", DataType::INT64);
|
||||
auto random = schema->AddDebugField("random", DataType::DOUBLE);
|
||||
auto vec = schema->AddDebugField("embeddings", data_type, 128, metric_type);
|
||||
schema->set_primary_field_id(pk);
|
||||
|
||||
auto segment = CreateGrowingSegment(schema, empty_index_meta, 11, config);
|
||||
auto segmentImplPtr = dynamic_cast<SegmentGrowingImpl*>(segment.get());
|
||||
|
||||
milvus::proto::plan::PlanNode plan_node;
|
||||
auto vector_anns = plan_node.mutable_vector_anns();
|
||||
if (is_sparse) {
|
||||
vector_anns->set_vector_type(
|
||||
milvus::proto::plan::VectorType::SparseFloatVector);
|
||||
} else {
|
||||
vector_anns->set_vector_type(
|
||||
milvus::proto::plan::VectorType::FloatVector);
|
||||
}
|
||||
vector_anns->set_placeholder_tag("$0");
|
||||
vector_anns->set_field_id(102);
|
||||
auto query_info = vector_anns->mutable_query_info();
|
||||
query_info->set_topk(5);
|
||||
query_info->set_round_decimal(3);
|
||||
query_info->set_metric_type(metric_type);
|
||||
query_info->set_search_params(R"({"nprobe": 16})");
|
||||
auto plan_str = plan_node.SerializeAsString();
|
||||
|
||||
int64_t per_batch = 10000;
|
||||
int64_t n_batch = 3;
|
||||
int64_t top_k = 5;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema, per_batch);
|
||||
auto offset = segment->PreInsert(per_batch);
|
||||
auto pks = dataset.get_col<int64_t>(pk);
|
||||
segment->Insert(offset,
|
||||
per_batch,
|
||||
dataset.row_ids_.data(),
|
||||
dataset.timestamps_.data(),
|
||||
dataset.raw_);
|
||||
const VectorBase* field_data = nullptr;
|
||||
if (is_sparse) {
|
||||
field_data = segmentImplPtr->get_insert_record()
|
||||
.get_data<milvus::SparseFloatVector>(vec);
|
||||
} else {
|
||||
field_data = segmentImplPtr->get_insert_record()
|
||||
.get_data<milvus::FloatVector>(vec);
|
||||
}
|
||||
auto inserted = (i + 1) * per_batch;
|
||||
|
||||
auto num_queries = 5;
|
||||
auto ph_group_raw =
|
||||
is_sparse ? CreateSparseFloatPlaceholderGroup(num_queries)
|
||||
: CreatePlaceholderGroup(num_queries, 128, 1024);
|
||||
|
||||
auto plan = milvus::query::CreateSearchPlanByExpr(
|
||||
*schema, plan_str.data(), plan_str.size());
|
||||
auto ph_group =
|
||||
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
|
||||
Timestamp timestamp = 1000000;
|
||||
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
|
||||
EXPECT_EQ(sr->total_nq_, num_queries);
|
||||
EXPECT_EQ(sr->unity_topK_, top_k);
|
||||
EXPECT_EQ(sr->distances_.size(), num_queries * top_k);
|
||||
EXPECT_EQ(sr->seg_offsets_.size(), num_queries * top_k);
|
||||
for (auto i = 0; i < num_queries; i++) {
|
||||
for (auto k = 0; k < top_k; k++) {
|
||||
EXPECT_NE(sr->seg_offsets_.data()[i * top_k + k], -1);
|
||||
EXPECT_FALSE(std::isnan(sr->distances_.data()[i * top_k + k]));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
TEST_F(ChunkVectorTest, QueryWithMmap) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
schema->AddDebugField(
|
||||
|
Loading…
Reference in New Issue
Block a user