From aa904be6ec0b57788e3d28450cd5d21e2960920c Mon Sep 17 00:00:00 2001 From: cqy123456 <39671710+cqy123456@users.noreply.github.com> Date: Tue, 15 Oct 2024 10:59:23 +0800 Subject: [PATCH] enhance: support sparse vector mmap in growing segment type (#36566) issue: https://github.com/milvus-io/milvus/issues/32984 related pr: https://github.com/milvus-io/milvus/pull/36565 Signed-off-by: cqy123456 --- internal/core/src/common/VectorTrait.h | 3 +- internal/core/src/mmap/ChunkData.h | 71 ++++++----- internal/core/src/mmap/ChunkVector.h | 20 ---- internal/core/unittest/test_chunk_vector.cpp | 120 ++++++++++++++++++- 4 files changed, 158 insertions(+), 56 deletions(-) diff --git a/internal/core/src/common/VectorTrait.h b/internal/core/src/common/VectorTrait.h index d987acb41a..e021558bbd 100644 --- a/internal/core/src/common/VectorTrait.h +++ b/internal/core/src/common/VectorTrait.h @@ -78,7 +78,8 @@ constexpr bool IsVariableType = template constexpr bool IsVariableTypeSupportInChunk = std::is_same_v || std::is_same_v || - std::is_same_v; + std::is_same_v || + std::is_same_v>; template using ChunkViewType = std::conditional_t< diff --git a/internal/core/src/mmap/ChunkData.h b/internal/core/src/mmap/ChunkData.h index da2cefe915..d6a097e62d 100644 --- a/internal/core/src/mmap/ChunkData.h +++ b/internal/core/src/mmap/ChunkData.h @@ -40,14 +40,10 @@ struct FixedLengthChunk { size() { return size_; }; - Type - get(const int i) const { - return data_[i]; - } const Type& view(const int i) const { return data_[i]; - } + }; private: int64_t size_ = 0; @@ -73,19 +69,10 @@ struct VariableLengthChunk { throw std::runtime_error( "set should be a template specialization function"); } - inline Type - get(const int i) const { - throw std::runtime_error( - "get should be a template specialization function"); - } const ChunkViewType& view(const int i) const { return data_[i]; } - const ChunkViewType& - operator[](const int i) const { - return view(i); - } void* data() { return data_.data(); @@ -100,6 +87,8 @@ struct VariableLengthChunk { FixedVector> data_; storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; }; + +// Template specialization for string template <> inline void VariableLengthChunk::set(const std::string* src, @@ -128,12 +117,40 @@ VariableLengthChunk::set(const std::string* src, offset += data_size; } } + +// Template specialization for sparse vector template <> -inline std::string -VariableLengthChunk::get(const int i) const { - // copy to a string - return std::string(data_[i]); +inline void +VariableLengthChunk>::set( + const knowhere::sparse::SparseRow* src, + uint32_t begin, + uint32_t length) { + auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); + milvus::ErrorCode err_code; + AssertInfo( + begin + length <= size_, + "failed to set a chunk with length: {} from beign {}, map_size={}", + length, + begin, + size_); + + size_t total_size = 0; + for (auto i = 0; i < length; i++) { + total_size += src[i].data_byte_size(); + } + auto buf = (uint8_t*)mcm->Allocate(mmap_descriptor_, total_size); + AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager."); + for (auto i = 0, offset = 0; i < length; i++) { + auto data_size = src[i].data_byte_size(); + uint8_t* data_ptr = buf + offset; + std::memcpy(data_ptr, (uint8_t*)src[i].data(), data_size); + data_[i + begin] = + knowhere::sparse::SparseRow(src[i].size(), data_ptr, false); + offset += data_size; + } } + +// Template specialization for json template <> inline void VariableLengthChunk::set(const Json* src, @@ -162,11 +179,8 @@ VariableLengthChunk::set(const Json* src, offset += data_size; } } -template <> -inline Json -VariableLengthChunk::get(const int i) const { - return std::move(Json(simdjson::padded_string(data_[i].data()))); -} + +// Template specialization for array template <> inline void VariableLengthChunk::set(const Array* src, @@ -198,14 +212,5 @@ VariableLengthChunk::set(const Array* src, offset += data_size; } } -template <> -inline Array -VariableLengthChunk::get(const int i) const { - auto array_view_i = data_[i]; - char* data = static_cast(const_cast(array_view_i.data())); - return Array(data, - array_view_i.byte_size(), - array_view_i.get_element_type(), - array_view_i.get_offsets_in_copy()); -} + } // namespace milvus \ No newline at end of file diff --git a/internal/core/src/mmap/ChunkVector.h b/internal/core/src/mmap/ChunkVector.h index ed9ec7c0ce..dc059dcfcd 100644 --- a/internal/core/src/mmap/ChunkVector.h +++ b/internal/core/src/mmap/ChunkVector.h @@ -32,8 +32,6 @@ class ChunkVectorBase { get_chunk_data(int64_t index) = 0; virtual int64_t get_chunk_size(int64_t index) = 0; - virtual Type - get_element(int64_t chunk_id, int64_t chunk_offset) = 0; virtual int64_t get_element_size() = 0; virtual int64_t @@ -106,23 +104,6 @@ class ThreadSafeChunkVector : public ChunkVectorBase { } } - Type - get_element(int64_t chunk_id, int64_t chunk_offset) override { - std::shared_lock lck(mutex_); - auto chunk = vec_[chunk_id]; - AssertInfo( - chunk_id < this->counter_ && chunk_offset < chunk.size(), - fmt::format("index out of range, index={}, chunk_offset={}, cap={}", - chunk_id, - chunk_offset, - chunk.size())); - if constexpr (IsMmap) { - return chunk.get(chunk_offset); - } else { - return chunk[chunk_offset]; - } - } - ChunkViewType view_element(int64_t chunk_id, int64_t chunk_offset) override { std::shared_lock lck(mutex_); @@ -229,7 +210,6 @@ SelectChunkVectorPtr(storage::MmapChunkDescriptorPtr& mmap_descriptor) { return std::make_unique>(); } } else { - // todo: sparse float vector support mmap return std::make_unique>(); } } diff --git a/internal/core/unittest/test_chunk_vector.cpp b/internal/core/unittest/test_chunk_vector.cpp index 2e5991d4c5..66867bbbd0 100644 --- a/internal/core/unittest/test_chunk_vector.cpp +++ b/internal/core/unittest/test_chunk_vector.cpp @@ -22,8 +22,7 @@ using namespace milvus::segcore; using namespace milvus; namespace pb = milvus::proto; - -class ChunkVectorTest : public testing::Test { +class ChunkVectorTest : public ::testing::TestWithParam { public: void SetUp() override { @@ -172,9 +171,126 @@ TEST_F(ChunkVectorTest, FillDataWithMmap) { num_inserted); EXPECT_EQ(float_array_result->scalars().array_data().data_size(), num_inserted); + // checking dense/sparse vector + auto fp32_vec_res = + fp32_vec_result.get()->mutable_vectors()->float_vector().data(); + auto fp16_vec_res = (float16*)fp16_vec_result.get() + ->mutable_vectors() + ->float16_vector() + .data(); + auto bf16_vec_res = (bfloat16*)bf16_vec_result.get() + ->mutable_vectors() + ->bfloat16_vector() + .data(); + auto sparse_vec_res = SparseBytesToRows( + sparse_vec_result->vectors().sparse_float_vector().contents()); + EXPECT_TRUE(fp32_vec_res.size() == num_inserted * dim); + auto fp32_vec_gt = dataset.get_col(fp32_vec); + auto fp16_vec_gt = dataset.get_col(fp16_vec); + auto bf16_vec_gt = dataset.get_col(bf16_vec); + auto sparse_vec_gt = + dataset.get_col>(sparse_vec); + + for (size_t i = 0; i < num_inserted; ++i) { + auto id = ids_ds->GetIds()[i]; + // check dense vector + for (size_t j = 0; j < 128; ++j) { + EXPECT_TRUE(fp32_vec_res[i * dim + j] == + fp32_vec_gt[(id % per_batch) * dim + j]); + EXPECT_TRUE(fp16_vec_res[i * dim + j] == + fp16_vec_gt[(id % per_batch) * dim + j]); + EXPECT_TRUE(bf16_vec_res[i * dim + j] == + bf16_vec_gt[(id % per_batch) * dim + j]); + } + //check sparse vector + auto actual_row = sparse_vec_res[i]; + auto expected_row = sparse_vec_gt[(id % per_batch)]; + EXPECT_TRUE(actual_row.size() == expected_row.size()); + for (size_t j = 0; j < actual_row.size(); ++j) { + EXPECT_TRUE(actual_row[j].id == expected_row[j].id); + EXPECT_TRUE(actual_row[j].val == expected_row[j].val); + } + } } } +INSTANTIATE_TEST_SUITE_P(IsSparse, ChunkVectorTest, ::testing::Bool()); +TEST_P(ChunkVectorTest, SearchWithMmap) { + auto is_sparse = GetParam(); + auto data_type = + is_sparse ? DataType::VECTOR_SPARSE_FLOAT : DataType::VECTOR_FLOAT; + auto schema = std::make_shared(); + auto pk = schema->AddDebugField("pk", DataType::INT64); + auto random = schema->AddDebugField("random", DataType::DOUBLE); + auto vec = schema->AddDebugField("embeddings", data_type, 128, metric_type); + schema->set_primary_field_id(pk); + + auto segment = CreateGrowingSegment(schema, empty_index_meta, 11, config); + auto segmentImplPtr = dynamic_cast(segment.get()); + + milvus::proto::plan::PlanNode plan_node; + auto vector_anns = plan_node.mutable_vector_anns(); + if (is_sparse) { + vector_anns->set_vector_type( + milvus::proto::plan::VectorType::SparseFloatVector); + } else { + vector_anns->set_vector_type( + milvus::proto::plan::VectorType::FloatVector); + } + vector_anns->set_placeholder_tag("$0"); + vector_anns->set_field_id(102); + auto query_info = vector_anns->mutable_query_info(); + query_info->set_topk(5); + query_info->set_round_decimal(3); + query_info->set_metric_type(metric_type); + query_info->set_search_params(R"({"nprobe": 16})"); + auto plan_str = plan_node.SerializeAsString(); + + int64_t per_batch = 10000; + int64_t n_batch = 3; + int64_t top_k = 5; + for (int64_t i = 0; i < n_batch; i++) { + auto dataset = DataGen(schema, per_batch); + auto offset = segment->PreInsert(per_batch); + auto pks = dataset.get_col(pk); + segment->Insert(offset, + per_batch, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + const VectorBase* field_data = nullptr; + if (is_sparse) { + field_data = segmentImplPtr->get_insert_record() + .get_data(vec); + } else { + field_data = segmentImplPtr->get_insert_record() + .get_data(vec); + } + auto inserted = (i + 1) * per_batch; + + auto num_queries = 5; + auto ph_group_raw = + is_sparse ? CreateSparseFloatPlaceholderGroup(num_queries) + : CreatePlaceholderGroup(num_queries, 128, 1024); + + auto plan = milvus::query::CreateSearchPlanByExpr( + *schema, plan_str.data(), plan_str.size()); + auto ph_group = + ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + Timestamp timestamp = 1000000; + auto sr = segment->Search(plan.get(), ph_group.get(), timestamp); + EXPECT_EQ(sr->total_nq_, num_queries); + EXPECT_EQ(sr->unity_topK_, top_k); + EXPECT_EQ(sr->distances_.size(), num_queries * top_k); + EXPECT_EQ(sr->seg_offsets_.size(), num_queries * top_k); + for (auto i = 0; i < num_queries; i++) { + for (auto k = 0; k < top_k; k++) { + EXPECT_NE(sr->seg_offsets_.data()[i * top_k + k], -1); + EXPECT_FALSE(std::isnan(sr->distances_.data()[i * top_k + k])); + } + } + } +} TEST_F(ChunkVectorTest, QueryWithMmap) { auto schema = std::make_shared(); schema->AddDebugField(