From 070dfc77bf32a31ea56b12dbef17e92f102739c7 Mon Sep 17 00:00:00 2001 From: Buqian Zheng Date: Mon, 11 Mar 2024 14:45:02 +0800 Subject: [PATCH] feat: [Sparse Float Vector] segcore basics and index building (#30357) This commit adds sparse float vector support to segcore with the following: 1. data type enum declarations 2. Adds corresponding data structures for handling sparse float vectors in various scenarios, including: * FieldData as a bridge between the binlog and the in memory data structures * mmap::Column as the in memory representation of a sparse float vector column of a sealed segment; * ConcurrentVector as the in memory representation of a sparse float vector of a growing segment which supports inserts. 3. Adds logic in payload reader/writer to serialize/deserialize from/to binlog 4. Adds the ability to allow the index node to build sparse float vector index 5. Adds the ability to allow the query node to build growing index for growing segment and temp index for sealed segment without index built This commit also includes some code cleanness, comment improvement, and some unit tests for sparse vector. https://github.com/milvus-io/milvus/issues/29419 Signed-off-by: Buqian Zheng --- internal/core/src/common/FieldData.cpp | 23 +- internal/core/src/common/FieldData.h | 8 + internal/core/src/common/FieldDataInterface.h | 124 ++++++++++- internal/core/src/common/FieldMeta.h | 23 +- internal/core/src/common/Schema.cpp | 7 +- internal/core/src/common/Schema.h | 9 - internal/core/src/common/Span.h | 6 +- internal/core/src/common/Types.h | 6 +- internal/core/src/common/Utils.h | 48 +++++ internal/core/src/common/VectorTrait.h | 37 ++-- internal/core/src/common/type_c.h | 1 + internal/core/src/index/IndexFactory.cpp | 6 +- internal/core/src/index/Utils.cpp | 24 +++ internal/core/src/index/VectorMemIndex.cpp | 77 +++++-- internal/core/src/indexbuilder/IndexFactory.h | 2 + internal/core/src/indexbuilder/index_c.cpp | 26 +++ internal/core/src/indexbuilder/index_c.h | 8 + internal/core/src/mmap/Column.h | 92 +++++++- internal/core/src/mmap/Utils.h | 52 +---- internal/core/src/segcore/CMakeLists.txt | 2 - .../core/src/segcore/ConcurrentVector.cpp | 10 +- internal/core/src/segcore/ConcurrentVector.h | 198 ++++++++++-------- internal/core/src/segcore/FieldIndexing.cpp | 99 ++++++--- internal/core/src/segcore/FieldIndexing.h | 128 +++++++---- .../core/src/segcore/IndexConfigGenerator.cpp | 27 ++- .../core/src/segcore/IndexConfigGenerator.h | 2 + internal/core/src/segcore/InsertRecord.cpp | 12 -- internal/core/src/segcore/InsertRecord.h | 11 +- internal/core/src/segcore/ScalarIndex.cpp | 88 -------- internal/core/src/segcore/ScalarIndex.h | 66 ------ internal/core/src/segcore/SegmentGrowing.h | 2 +- .../core/src/segcore/SegmentGrowingImpl.cpp | 18 +- .../core/src/segcore/SegmentGrowingImpl.h | 2 +- .../core/src/segcore/SegmentSealedImpl.cpp | 39 +++- internal/core/src/segcore/SegmentSealedImpl.h | 1 - internal/core/src/segcore/Utils.cpp | 37 +++- internal/core/src/segcore/segment_c.cpp | 13 +- internal/core/src/storage/ChunkCache.cpp | 1 + internal/core/src/storage/ChunkManager.h | 2 +- internal/core/src/storage/Event.cpp | 15 +- internal/core/src/storage/PayloadReader.cpp | 4 +- internal/core/src/storage/PayloadWriter.cpp | 7 +- internal/core/src/storage/Util.cpp | 40 +++- internal/core/src/storage/parquet_c.h | 2 + internal/core/unittest/test_binlog_index.cpp | 6 +- .../core/unittest/test_concurrent_vector.cpp | 6 +- internal/core/unittest/test_data_codec.cpp | 41 ++++ internal/core/unittest/test_growing_index.cpp | 10 +- internal/core/unittest/test_index_c_api.cpp | 64 ++++++ internal/core/unittest/test_index_wrapper.cpp | 122 ++++++----- internal/core/unittest/test_indexing.cpp | 4 +- .../core/unittest/test_range_search_sort.cpp | 12 +- internal/core/unittest/test_retrieve.cpp | 27 --- internal/core/unittest/test_utils.cpp | 11 +- internal/core/unittest/test_utils/Constants.h | 3 + internal/core/unittest/test_utils/DataGen.h | 109 ++++++++-- .../unittest/test_utils/c_api_test_utils.h | 25 --- .../test_utils/indexbuilder_test_utils.h | 9 + 58 files changed, 1229 insertions(+), 625 deletions(-) delete mode 100644 internal/core/src/segcore/InsertRecord.cpp delete mode 100644 internal/core/src/segcore/ScalarIndex.cpp delete mode 100644 internal/core/src/segcore/ScalarIndex.h diff --git a/internal/core/src/common/FieldData.cpp b/internal/core/src/common/FieldData.cpp index a9bd2f817f..5f6fb00d72 100644 --- a/internal/core/src/common/FieldData.cpp +++ b/internal/core/src/common/FieldData.cpp @@ -26,10 +26,10 @@ namespace milvus { -template +template void -FieldDataImpl::FillFieldData(const void* source, - ssize_t element_count) { +FieldDataImpl::FillFieldData(const void* source, + ssize_t element_count) { if (element_count == 0) { return; } @@ -57,9 +57,9 @@ GetDataInfoFromArray(const std::shared_ptr array) { return std::make_pair(typed_array->raw_values(), element_count); } -template +template void -FieldDataImpl::FillFieldData( +FieldDataImpl::FillFieldData( const std::shared_ptr array) { AssertInfo(array != nullptr, "null arrow array"); auto element_count = array->length(); @@ -159,6 +159,18 @@ FieldDataImpl::FillFieldData( array); return FillFieldData(array_info.first, array_info.second); } + case DataType::VECTOR_SPARSE_FLOAT: { + AssertInfo(array->type()->id() == arrow::Type::type::BINARY, + "inconsistent data type"); + auto arr = std::dynamic_pointer_cast(array); + std::vector> values; + for (size_t index = 0; index < element_count; ++index) { + auto view = arr->GetString(index); + values.push_back( + CopyAndWrapSparseRow(view.data(), view.size())); + } + return FillFieldData(values.data(), element_count); + } default: { throw SegcoreError(DataTypeInvalid, GetName() + "::FillFieldData" + @@ -186,6 +198,7 @@ template class FieldDataImpl; template class FieldDataImpl; template class FieldDataImpl; template class FieldDataImpl; +template class FieldDataImpl, true>; FieldDataPtr InitScalarFieldData(const DataType& type, int64_t cap_rows) { diff --git a/internal/core/src/common/FieldData.h b/internal/core/src/common/FieldData.h index b767231da3..60e0c74b3a 100644 --- a/internal/core/src/common/FieldData.h +++ b/internal/core/src/common/FieldData.h @@ -121,6 +121,14 @@ class FieldData : public FieldDataImpl { } }; +template <> +class FieldData : public FieldDataSparseVectorImpl { + public: + explicit FieldData(DataType data_type, int64_t buffered_num_rows = 0) + : FieldDataSparseVectorImpl(data_type, buffered_num_rows) { + } +}; + using FieldDataPtr = std::shared_ptr; using FieldDataChannel = Channel; using FieldDataChannelPtr = std::shared_ptr; diff --git a/internal/core/src/common/FieldDataInterface.h b/internal/core/src/common/FieldDataInterface.h index f23d8f57f7..f5ce6a4299 100644 --- a/internal/core/src/common/FieldDataInterface.h +++ b/internal/core/src/common/FieldDataInterface.h @@ -32,6 +32,7 @@ #include "common/VectorTrait.h" #include "common/EasyAssert.h" #include "common/Array.h" +#include "knowhere/dataset.h" namespace milvus { @@ -43,24 +44,33 @@ class FieldDataBase { } virtual ~FieldDataBase() = default; + // For all FieldDataImpl subclasses, source is a pointer to element_count of + // Type virtual void FillFieldData(const void* source, ssize_t element_count) = 0; virtual void FillFieldData(const std::shared_ptr array) = 0; + // For all FieldDataImpl subclasses, this method returns Type* that points + // at all rows in this field data. virtual void* Data() = 0; + // For all FieldDataImpl subclasses, this method returns a Type* that points + // at the offset-th row of this field data. virtual const void* RawValue(ssize_t offset) const = 0; + // Returns the serialized bytes size of all rows. virtual int64_t Size() const = 0; + // Returns the serialized bytes size of the index-th row. virtual int64_t Size(ssize_t index) const = 0; + // Number of filled rows virtual size_t Length() const = 0; @@ -71,9 +81,11 @@ class FieldDataBase { Reserve(size_t cap) = 0; public: + // row capacity virtual int64_t get_num_rows() const = 0; + // each row is represented as how many Type elements virtual int64_t get_dim() const = 0; @@ -86,11 +98,9 @@ class FieldDataBase { const DataType data_type_; }; -template +template class FieldDataImpl : public FieldDataBase { public: - // constants - using Chunk = FixedVector; FieldDataImpl(FieldDataImpl&&) = delete; FieldDataImpl(const FieldDataImpl&) = delete; @@ -105,13 +115,16 @@ class FieldDataImpl : public FieldDataBase { int64_t buffered_num_rows = 0) : FieldDataBase(data_type), num_rows_(buffered_num_rows), - dim_(is_scalar ? 1 : dim) { + dim_(is_type_entire_row ? 1 : dim) { field_data_.resize(num_rows_ * dim_); } - explicit FieldDataImpl(size_t dim, DataType type, Chunk&& field_data) - : FieldDataBase(type), dim_(is_scalar ? 1 : dim) { + explicit FieldDataImpl(size_t dim, + DataType type, + FixedVector&& field_data) + : FieldDataBase(type), dim_(is_type_entire_row ? 1 : dim) { field_data_ = std::move(field_data); + Assert(field_data.size() % dim == 0); num_rows_ = field_data.size() / dim; } @@ -122,10 +135,18 @@ class FieldDataImpl : public FieldDataBase { FillFieldData(const std::shared_ptr array) override; virtual void - FillFieldData(const std::shared_ptr& array){}; + FillFieldData(const std::shared_ptr& array) { + PanicInfo(NotImplemented, + "FillFieldData(const std::shared_ptr& " + "array) not implemented by default"); + } virtual void - FillFieldData(const std::shared_ptr& array){}; + FillFieldData(const std::shared_ptr& array) { + PanicInfo(NotImplemented, + "FillFieldData(const std::shared_ptr& " + "array) not implemented by default"); + } std::string GetName() const { @@ -209,9 +230,11 @@ class FieldDataImpl : public FieldDataBase { } protected: - Chunk field_data_; + FixedVector field_data_; + // number of elements field_data_ can hold int64_t num_rows_; mutable std::shared_mutex num_rows_mutex_; + // number of actual elements in field_data_ size_t length_{}; mutable std::shared_mutex tell_mutex_; @@ -322,6 +345,89 @@ class FieldDataJsonImpl : public FieldDataImpl { } }; +class FieldDataSparseVectorImpl + : public FieldDataImpl, true> { + public: + explicit FieldDataSparseVectorImpl(DataType data_type, + int64_t total_num_rows = 0) + : FieldDataImpl, true>( + /*dim=*/1, data_type, total_num_rows), + vec_dim_(0) { + AssertInfo(data_type == DataType::VECTOR_SPARSE_FLOAT, + "invalid data type for sparse vector"); + } + + int64_t + Size() const override { + int64_t data_size = 0; + for (size_t i = 0; i < length(); ++i) { + data_size += field_data_[i].data_byte_size(); + } + return data_size; + } + + int64_t + Size(ssize_t offset) const override { + AssertInfo(offset < get_num_rows(), + "field data subscript out of range"); + AssertInfo(offset < length(), + "subscript position don't has valid value"); + return field_data_[offset].data_byte_size(); + } + + // source is a pointer to element_count of + // knowhere::sparse::SparseRow + void + FillFieldData(const void* source, ssize_t element_count) override { + if (element_count == 0) { + return; + } + + std::lock_guard lck(tell_mutex_); + if (length_ + element_count > get_num_rows()) { + resize_field_data(length_ + element_count); + } + auto ptr = + static_cast*>(source); + for (int64_t i = 0; i < element_count; ++i) { + auto& row = ptr[i]; + vec_dim_ = std::max(vec_dim_, row.dim()); + } + std::copy_n(ptr, element_count, field_data_.data() + length_); + length_ += element_count; + } + + // each binary in array is a knowhere::sparse::SparseRow + void + FillFieldData(const std::shared_ptr& array) override { + auto n = array->length(); + if (n == 0) { + return; + } + + std::lock_guard lck(tell_mutex_); + if (length_ + n > get_num_rows()) { + resize_field_data(length_ + n); + } + + for (int64_t i = 0; i < array->length(); ++i) { + auto view = array->GetView(i); + auto& row = field_data_[length_ + i]; + row = CopyAndWrapSparseRow(view.data(), view.size()); + vec_dim_ = std::max(vec_dim_, row.dim()); + } + length_ += n; + } + + int64_t + Dim() const { + return vec_dim_; + } + + private: + int64_t vec_dim_; +}; + class FieldDataArrayImpl : public FieldDataImpl { public: explicit FieldDataArrayImpl(DataType data_type, int64_t total_num_rows = 0) diff --git a/internal/core/src/common/FieldMeta.h b/internal/core/src/common/FieldMeta.h index e1e50ae161..bb8d590fa0 100644 --- a/internal/core/src/common/FieldMeta.h +++ b/internal/core/src/common/FieldMeta.h @@ -54,6 +54,10 @@ datatype_sizeof(DataType data_type, int dim = 1) { case DataType::VECTOR_BFLOAT16: { return sizeof(bfloat16) * dim; } + // Not supporting VECTOR_SPARSE_FLOAT here intentionally. We can't + // easily estimately the size of a sparse float vector. Caller of this + // method must handle this case themselves and must not pass + // VECTOR_SPARSE_FLOAT data_type. default: { throw SegcoreError(DataTypeInvalid, fmt::format("invalid type is {}", data_type)); @@ -100,6 +104,9 @@ datatype_name(DataType data_type) { case DataType::VECTOR_BFLOAT16: { return "vector_bfloat16"; } + case DataType::VECTOR_SPARSE_FLOAT: { + return "vector_sparse_float"; + } default: { PanicInfo(DataTypeInvalid, "Unsupported DataType({})", data_type); } @@ -111,7 +118,13 @@ datatype_is_vector(DataType datatype) { return datatype == DataType::VECTOR_BINARY || datatype == DataType::VECTOR_FLOAT || datatype == DataType::VECTOR_FLOAT16 || - datatype == DataType::VECTOR_BFLOAT16; + datatype == DataType::VECTOR_BFLOAT16 || + datatype == DataType::VECTOR_SPARSE_FLOAT; +} + +inline bool +datatype_is_sparse_vector(DataType datatype) { + return datatype == DataType::VECTOR_SPARSE_FLOAT; } inline bool @@ -153,6 +166,7 @@ datatype_is_variable(DataType datatype) { case DataType::STRING: case DataType::ARRAY: case DataType::JSON: + case DataType::VECTOR_SPARSE_FLOAT: return true; default: return false; @@ -217,6 +231,8 @@ class FieldMeta { Assert(datatype_is_array(type_)); } + // pass in any value for dim for sparse vector is ok as it'll never be used: + // get_dim() not allowed to be invoked on a sparse vector field. FieldMeta(const FieldName& name, FieldId id, DataType type, @@ -232,6 +248,8 @@ class FieldMeta { int64_t get_dim() const { Assert(datatype_is_vector(type_)); + // should not attempt to get dim() of a sparse vector from schema. + Assert(!datatype_is_sparse_vector(type_)); Assert(vector_info_.has_value()); return vector_info_->dim_; } @@ -282,6 +300,9 @@ class FieldMeta { size_t get_sizeof() const { + AssertInfo(!datatype_is_sparse_vector(type_), + "should not attempt to get_sizeof() of a sparse vector from " + "schema"); static const size_t ARRAY_SIZE = 128; static const size_t JSON_SIZE = 512; if (is_vector()) { diff --git a/internal/core/src/common/Schema.cpp b/internal/core/src/common/Schema.cpp index fae2cb6ed4..831e279b49 100644 --- a/internal/core/src/common/Schema.cpp +++ b/internal/core/src/common/Schema.cpp @@ -54,8 +54,11 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) { auto type_map = RepeatedKeyValToMap(child.type_params()); auto index_map = RepeatedKeyValToMap(child.index_params()); - AssertInfo(type_map.count("dim"), "dim not found"); - auto dim = boost::lexical_cast(type_map.at("dim")); + int64_t dim = 0; + if (!datatype_is_sparse_vector(data_type)) { + AssertInfo(type_map.count("dim"), "dim not found"); + dim = boost::lexical_cast(type_map.at("dim")); + } if (!index_map.count("metric_type")) { schema->AddField(name, field_id, data_type, dim, std::nullopt); } else { diff --git a/internal/core/src/common/Schema.h b/internal/core/src/common/Schema.h index 71187f1004..b1068dd650 100644 --- a/internal/core/src/common/Schema.h +++ b/internal/core/src/common/Schema.h @@ -132,11 +132,6 @@ class Schema { return fields_.at(field_id); } - auto - get_total_sizeof() const { - return total_sizeof_; - } - FieldId get_field_id(const FieldName& field_name) const { AssertInfo(name_ids_.count(field_name), "Cannot find field_name"); @@ -181,9 +176,6 @@ class Schema { fields_.emplace(field_id, field_meta); field_ids_.emplace_back(field_id); - - auto field_sizeof = field_meta.get_sizeof(); - total_sizeof_ += field_sizeof; } private: @@ -197,7 +189,6 @@ class Schema { std::unordered_map name_ids_; // field_name -> field_id std::unordered_map id_names_; // field_id -> field_name - int64_t total_sizeof_ = 0; std::optional primary_field_id_opt_; }; diff --git a/internal/core/src/common/Span.h b/internal/core/src/common/Span.h index 4ab50fb99c..cc6cbf2b72 100644 --- a/internal/core/src/common/Span.h +++ b/internal/core/src/common/Span.h @@ -60,9 +60,9 @@ class Span; // TODO: refine Span to support T=FloatVector template -class Span< - T, - typename std::enable_if_t || std::is_same_v>> { +class Span || IsScalar || + std::is_same_v>> { public: using embedded_type = T; explicit Span(const T* data, int64_t row_count) diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 2c822590dd..c0e742a031 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -80,6 +80,7 @@ enum class DataType { VECTOR_FLOAT = 101, VECTOR_FLOAT16 = 102, VECTOR_BFLOAT16 = 103, + VECTOR_SPARSE_FLOAT = 104, }; using Timestamp = uint64_t; // TODO: use TiKV-like timestamp @@ -92,7 +93,7 @@ using ScalarArray = proto::schema::ScalarField; using DataArray = proto::schema::FieldData; using VectorArray = proto::schema::VectorField; using IdArray = proto::schema::IDs; -using InsertData = proto::segcore::InsertRecord; +using InsertRecordProto = proto::segcore::InsertRecord; using PkType = std::variant; inline size_t @@ -379,6 +380,9 @@ struct fmt::formatter : formatter { case milvus::DataType::VECTOR_BFLOAT16: name = "VECTOR_BFLOAT16"; break; + case milvus::DataType::VECTOR_SPARSE_FLOAT: + name = "VECTOR_SPARSE_FLOAT"; + break; } return formatter::format(name, ctx); } diff --git a/internal/core/src/common/Utils.h b/internal/core/src/common/Utils.h index 9372e4c031..cd3948348e 100644 --- a/internal/core/src/common/Utils.h +++ b/internal/core/src/common/Utils.h @@ -31,6 +31,7 @@ #include "common/EasyAssert.h" #include "knowhere/dataset.h" #include "knowhere/expected.h" +#include "knowhere/sparse_utils.h" #include "simdjson.h" namespace milvus { @@ -213,4 +214,51 @@ GetCommonPrefix(const std::string& str1, const std::string& str2) { return str1.substr(0, i); } +inline knowhere::sparse::SparseRow +CopyAndWrapSparseRow(const void* data, size_t size) { + size_t num_elements = + size / knowhere::sparse::SparseRow::element_size(); + knowhere::sparse::SparseRow row(num_elements); + std::memcpy(row.data(), data, size); + // TODO(SPARSE): validate + return row; +} + +// Iterable is a list of bytes, each is a byte array representation of a single +// sparse float row. This helper function converts such byte arrays into a list +// of knowhere::sparse::SparseRow. The resulting list is a deep copy of +// the source data. +template +std::unique_ptr[]> +SparseBytesToRows(const Iterable& rows) { + AssertInfo(rows.size() > 0, "at least 1 sparse row should be provided"); + auto res = + std::make_unique[]>(rows.size()); + for (size_t i = 0; i < rows.size(); ++i) { + res[i] = + std::move(CopyAndWrapSparseRow(rows[i].data(), rows[i].size())); + } + return res; +} + +// SparseRowsToProto converts a vector of knowhere::sparse::SparseRow to +// a milvus::proto::schema::SparseFloatArray. The resulting proto is a deep copy +// of the source data. +inline void SparseRowsToProto(const knowhere::sparse::SparseRow* source, + int64_t rows, + milvus::proto::schema::SparseFloatArray* proto) { + int64_t max_dim = 0; + for (size_t i = 0; i < rows; ++i) { + if (source + i == nullptr) { + // empty row + proto->add_contents(); + continue; + } + auto& row = source[i]; + max_dim = std::max(max_dim, row.dim()); + proto->add_contents(row.data(), row.data_byte_size()); + } + proto->set_dim(max_dim); +} + } // namespace milvus diff --git a/internal/core/src/common/VectorTrait.h b/internal/core/src/common/VectorTrait.h index 28481e0988..8062910ce2 100644 --- a/internal/core/src/common/VectorTrait.h +++ b/internal/core/src/common/VectorTrait.h @@ -48,20 +48,11 @@ class BFloat16Vector : public VectorTrait { static constexpr auto metric_type = DataType::VECTOR_BFLOAT16; }; -template -inline constexpr int64_t -element_sizeof(int64_t dim) { - static_assert(std::is_base_of_v); - if constexpr (std::is_same_v) { - return dim * sizeof(float); - } else if constexpr (std::is_same_v) { - return dim * sizeof(float16); - } else if constexpr (std::is_same_v) { - return dim * sizeof(bfloat16); - } else { - return dim / 8; - } -} +class SparseFloatVector : public VectorTrait { + public: + using embedded_type = float; + static constexpr auto metric_type = DataType::VECTOR_SPARSE_FLOAT; +}; template constexpr bool IsVector = std::is_base_of_v; @@ -73,6 +64,10 @@ constexpr bool IsScalar = std::is_same_v || std::is_same_v || std::is_same_v; +template +constexpr bool IsSparse = std::is_same_v || + std::is_same_v>; + template struct EmbeddedTypeImpl; @@ -86,11 +81,15 @@ struct EmbeddedTypeImpl>> { using type = std::conditional_t< std::is_same_v, float, - std::conditional_t, - float16, - std::conditional_t, - bfloat16, - uint8_t>>>; + std::conditional_t< + std::is_same_v, + float16, + std::conditional_t< + std::is_same_v, + bfloat16, + std::conditional_t, + void, + uint8_t>>>>; }; template diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 14aa83017e..acdf68a7bd 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -52,6 +52,7 @@ enum CDataType { FloatVector = 101, Float16Vector = 102, BFloat16Vector = 103, + SparseFloatVector = 104, }; typedef enum CDataType CDataType; diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index c0df7bd3bb..ac8cb432e5 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -205,7 +205,8 @@ IndexFactory::CreateVectorIndex( } } else { // create mem index switch (data_type) { - case DataType::VECTOR_FLOAT: { + case DataType::VECTOR_FLOAT: + case DataType::VECTOR_SPARSE_FLOAT: { return std::make_unique>( index_type, metric_type, version, file_manager_context); } @@ -311,7 +312,8 @@ IndexFactory::CreateVectorIndex( } } else { // create mem index switch (data_type) { - case DataType::VECTOR_FLOAT: { + case DataType::VECTOR_FLOAT: + case DataType::VECTOR_SPARSE_FLOAT: { return std::make_unique>( create_index_info, file_manager_context, space); } diff --git a/internal/core/src/index/Utils.cpp b/internal/core/src/index/Utils.cpp index b1d889e171..a9ad1cf1a0 100644 --- a/internal/core/src/index/Utils.cpp +++ b/internal/core/src/index/Utils.cpp @@ -68,6 +68,30 @@ unsupported_index_combinations() { static std::vector> ret{ std::make_tuple(knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, knowhere::metric::L2), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::L2), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::COSINE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::HAMMING), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::JACCARD), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::SUBSTRUCTURE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::SUPERSTRUCTURE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::L2), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::COSINE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::HAMMING), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::JACCARD), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::SUBSTRUCTURE), + std::make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::SUPERSTRUCTURE), }; return ret; } diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index 28ad4eb93f..f37ae18acc 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -483,37 +483,68 @@ VectorMemIndex::Build(const Config& config) { auto insert_files = GetValueFromConfig>(config, "insert_files"); AssertInfo(insert_files.has_value(), - "insert file paths is empty when build disk ann index"); + "insert file paths is empty when building in memory index"); auto field_datas = file_manager_->CacheRawDataToMemory(insert_files.value()); - int64_t total_size = 0; - int64_t total_num_rows = 0; - int64_t dim = 0; - for (auto data : field_datas) { - total_size += data->Size(); - total_num_rows += data->get_num_rows(); - AssertInfo(dim == 0 || dim == data->get_dim(), - "inconsistent dim value between field datas!"); - dim = data->get_dim(); - } - - auto buf = std::shared_ptr(new uint8_t[total_size]); - int64_t offset = 0; - for (auto data : field_datas) { - std::memcpy(buf.get() + offset, data->Data(), data->Size()); - offset += data->Size(); - data.reset(); - } - field_datas.clear(); - Config build_config; build_config.update(config); build_config.erase("insert_files"); build_config.erase(VEC_OPT_FIELDS); + if (GetIndexType().find("SPARSE") == std::string::npos) { + int64_t total_size = 0; + int64_t total_num_rows = 0; + int64_t dim = 0; + for (auto data : field_datas) { + total_size += data->Size(); + total_num_rows += data->get_num_rows(); + AssertInfo(dim == 0 || dim == data->get_dim(), + "inconsistent dim value between field datas!"); + dim = data->get_dim(); + } - auto dataset = GenDataset(total_num_rows, dim, buf.get()); - BuildWithDataset(dataset, build_config); + auto buf = std::shared_ptr(new uint8_t[total_size]); + int64_t offset = 0; + // TODO: avoid copying + for (auto data : field_datas) { + std::memcpy(buf.get() + offset, data->Data(), data->Size()); + offset += data->Size(); + data.reset(); + } + field_datas.clear(); + + auto dataset = GenDataset(total_num_rows, dim, buf.get()); + BuildWithDataset(dataset, build_config); + } else { + // sparse + int64_t total_rows = 0; + int64_t dim = 0; + for (auto field_data : field_datas) { + total_rows += field_data->Length(); + dim = std::max( + dim, + std::dynamic_pointer_cast>( + field_data) + ->Dim()); + } + std::vector> vec(total_rows); + int64_t offset = 0; + for (auto field_data : field_datas) { + auto ptr = static_cast*>( + field_data->Data()); + AssertInfo(ptr, "failed to cast field data to sparse rows"); + for (size_t i = 0; i < field_data->Length(); ++i) { + // this does a deep copy of field_data's data. + // TODO: avoid copying by enforcing field data to give up + // ownership. + vec[offset + i] = ptr[i]; + } + offset += field_data->Length(); + } + auto dataset = GenDataset(total_rows, dim, vec.data()); + dataset->SetIsSparse(true); + BuildWithDataset(dataset, build_config); + } } template diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index 3b6e6874de..cd361499b4 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -66,6 +66,7 @@ class IndexFactory { case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: case DataType::VECTOR_BINARY: + case DataType::VECTOR_SPARSE_FLOAT: return std::make_unique(type, config, context); default: throw SegcoreError( @@ -101,6 +102,7 @@ class IndexFactory { case DataType::VECTOR_BINARY: case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: + case DataType::VECTOR_SPARSE_FLOAT: return std::make_unique( type, field_name, dim, config, file_manager_context, space); default: diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index b921118586..1beba763b7 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -351,6 +351,32 @@ BuildBinaryVecIndex(CIndex index, int64_t data_size, const uint8_t* vectors) { return status; } +CStatus +BuildSparseFloatVecIndex(CIndex index, + int64_t row_num, + int64_t dim, + const uint8_t* vectors) { + auto status = CStatus(); + try { + AssertInfo( + index, + "failed to build sparse float vector index, passed index was null"); + auto real_index = + reinterpret_cast(index); + auto cIndex = + dynamic_cast(real_index); + auto ds = knowhere::GenDataSet(row_num, dim, vectors); + ds->SetIsSparse(true); + cIndex->Build(ds); + status.error_code = Success; + status.error_msg = ""; + } catch (std::exception& e) { + status.error_code = UnexpectedError; + status.error_msg = strdup(e.what()); + } + return status; +} + // field_data: // 1, serialized proto::schema::BoolArray, if type is bool; // 2, serialized proto::schema::StringArray, if type is string; diff --git a/internal/core/src/indexbuilder/index_c.h b/internal/core/src/indexbuilder/index_c.h index d13e121737..16cd76e453 100644 --- a/internal/core/src/indexbuilder/index_c.h +++ b/internal/core/src/indexbuilder/index_c.h @@ -20,6 +20,7 @@ extern "C" { #include "common/binary_set_c.h" #include "indexbuilder/type_c.h" +// used only in test CStatus CreateIndexV0(enum CDataType dtype, const char* serialized_type_params, @@ -43,6 +44,13 @@ BuildFloat16VecIndex(CIndex index, int64_t data_size, const uint8_t* vectors); CStatus BuildBFloat16VecIndex(CIndex index, int64_t data_size, const uint8_t* vectors); + +CStatus +BuildSparseFloatVecIndex(CIndex index, + int64_t row_num, + int64_t dim, + const uint8_t* vectors); + // field_data: // 1, serialized proto::schema::BoolArray, if type is bool; // 2, serialized proto::schema::StringArray, if type is string; diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index ce7c085b63..0bed86c02d 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -34,6 +34,10 @@ #include "fmt/format.h" #include "log/Log.h" #include "mmap/Utils.h" +#include "common/FieldData.h" +#include "common/FieldDataInterface.h" +#include "common/Array.h" +#include "knowhere/dataset.h" namespace milvus { @@ -134,7 +138,7 @@ class ColumnBase { column.size_ = 0; } - const char* + virtual const char* Data() const { return data_; } @@ -144,14 +148,14 @@ class ColumnBase { return num_rows_; }; - const size_t + virtual size_t ByteSize() const { return cap_size_ + padding_; } // The capacity of the column, - // DO NOT call this for variable length column. - size_t + // DO NOT call this for variable length column(including SparseFloatColumn). + virtual size_t Capacity() const { return cap_size_ / type_size_; } @@ -159,8 +163,8 @@ class ColumnBase { virtual SpanBase Span() const = 0; - void - AppendBatch(const FieldDataPtr& data) { + virtual void + AppendBatch(const FieldDataPtr data) { size_t required_size = size_ + data->Size(); if (required_size > cap_size_) { Expand(required_size * 2 + padding_); @@ -174,7 +178,7 @@ class ColumnBase { } // Append one row - void + virtual void Append(const char* data, size_t size) { size_t required_size = size_ + size; if (required_size > cap_size_) { @@ -263,6 +267,80 @@ class Column : public ColumnBase { } }; +// mmap not yet supported, thus SparseFloatColumn is not using fields in super +// class such as ColumnBase::data. +class SparseFloatColumn : public ColumnBase { + public: + // memory mode ctor + SparseFloatColumn(const FieldMeta& field_meta) : ColumnBase(0, field_meta) { + } + // mmap mode ctor + SparseFloatColumn(const File& file, + size_t size, + const FieldMeta& field_meta) + : ColumnBase(file, size, field_meta) { + AssertInfo(false, "SparseFloatColumn mmap mode not supported"); + } + + SparseFloatColumn(SparseFloatColumn&& column) noexcept + : ColumnBase(std::move(column)), + dim_(column.dim_), + vec_(std::move(column.vec_)) { + } + + ~SparseFloatColumn() override = default; + + const char* + Data() const override { + return static_cast(static_cast(vec_.data())); + } + + // This is used to advice mmap prefetch, we don't currently support mmap for + // sparse float vector thus not implemented for now. + size_t + ByteSize() const override { + throw std::runtime_error( + "ByteSize not supported for sparse float column"); + } + + size_t + Capacity() const override { + throw std::runtime_error( + "Capacity not supported for sparse float column"); + } + + SpanBase + Span() const override { + throw std::runtime_error("Span not supported for sparse float column"); + } + + void + AppendBatch(const FieldDataPtr data) override { + auto ptr = static_cast*>( + data->Data()); + vec_.insert(vec_.end(), ptr, ptr + data->Length()); + for (size_t i = 0; i < data->Length(); ++i) { + dim_ = std::max(dim_, ptr[i].dim()); + } + num_rows_ += data->Length(); + } + + void + Append(const char* data, size_t size) override { + throw std::runtime_error( + "Append not supported for sparse float column"); + } + + int64_t + Dim() const { + return dim_; + } + + private: + int64_t dim_ = 0; + std::vector> vec_; +}; + template class VariableColumn : public ColumnBase { public: diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index 7b1dc8308d..94219e4fdd 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -31,52 +31,6 @@ namespace milvus { -inline size_t -GetDataSize(const std::vector& datas) { - size_t total_size{0}; - for (const auto& data : datas) { - total_size += data->Size(); - } - - return total_size; -} - -inline void* -FillField(DataType data_type, const FieldDataPtr& data, void* dst) { - char* dest = reinterpret_cast(dst); - if (datatype_is_variable(data_type)) { - switch (data_type) { - case DataType::STRING: - case DataType::VARCHAR: { - for (ssize_t i = 0; i < data->get_num_rows(); ++i) { - auto str = - static_cast(data->RawValue(i)); - memcpy(dest, str->data(), str->size()); - dest += str->size(); - } - break; - } - case DataType::JSON: { - for (ssize_t i = 0; i < data->get_num_rows(); ++i) { - auto padded_string = - static_cast(data->RawValue(i))->data(); - memcpy(dest, padded_string.data(), padded_string.size()); - dest += padded_string.size(); - } - break; - } - default: - PanicInfo( - DataTypeInvalid, "not supported data type {}", data_type); - } - } else { - memcpy(dst, data->Data(), data->Size()); - dest += data->Size(); - } - - return dest; -} - inline size_t WriteFieldData(File& file, DataType data_type, @@ -124,6 +78,12 @@ WriteFieldData(File& file, } break; } + case DataType::VECTOR_SPARSE_FLOAT: { + // TODO(SPARSE): this is for mmap to write data to disk so that + // the file can be mmaped into memory. + throw std::runtime_error( + "WriteFieldData for VECTOR_SPARSE_FLOAT not implemented"); + } default: PanicInfo(DataTypeInvalid, "not supported data type {}", diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index 4719a860d1..9986882e7a 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -24,7 +24,6 @@ set(SEGCORE_FILES SegmentGrowingImpl.cpp SegmentSealedImpl.cpp FieldIndexing.cpp - InsertRecord.cpp Reduce.cpp metrics_c.cpp plan_c.cpp @@ -35,7 +34,6 @@ set(SEGCORE_FILES SegcoreConfig.cpp IndexConfigGenerator.cpp segcore_init_c.cpp - ScalarIndex.cpp TimestampIndex.cpp Utils.cpp ConcurrentVector.cpp) diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp index 3bf9f4ee20..0fc665d303 100644 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ b/internal/core/src/segcore/ConcurrentVector.cpp @@ -36,8 +36,16 @@ VectorBase::set_data_raw(ssize_t element_offset, } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { return set_data_raw( element_offset, VEC_FIELD_DATA(data, bfloat16), element_count); + } else if (field_meta.get_data_type() == + DataType::VECTOR_SPARSE_FLOAT) { + return set_data_raw( + element_offset, + SparseBytesToRows( + data->vectors().sparse_float_vector().contents()) + .get(), + element_count); } else { - PanicInfo(DataTypeInvalid, "unsupported"); + PanicInfo(DataTypeInvalid, "unsupported vector type"); } } diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index cdbac4e197..05287460d9 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -93,9 +93,6 @@ class VectorBase { } virtual ~VectorBase() = default; - virtual void - grow_to_at_least(int64_t element_count) = 0; - virtual void set_data_raw(ssize_t element_offset, const void* source, @@ -105,12 +102,13 @@ class VectorBase { set_data_raw(ssize_t element_offset, const std::vector& data) = 0; - void + virtual void set_data_raw(ssize_t element_offset, ssize_t element_count, const DataArray* data, const FieldMeta& field_meta); + // used only by sealed segment to load system field virtual void fill_chunk_data(const std::vector& data) = 0; @@ -135,7 +133,7 @@ class VectorBase { const int64_t size_per_chunk_; }; -template +template class ConcurrentVectorImpl : public VectorBase { public: // constants @@ -149,7 +147,7 @@ class ConcurrentVectorImpl : public VectorBase { operator=(const ConcurrentVectorImpl&) = delete; using TraitType = std::conditional_t< - is_scalar, + is_type_entire_row, Type, std::conditional_t< std::is_same_v, @@ -162,27 +160,16 @@ class ConcurrentVectorImpl : public VectorBase { BinaryVector>>>>; public: - explicit ConcurrentVectorImpl(ssize_t dim, int64_t size_per_chunk) - : VectorBase(size_per_chunk), Dim(is_scalar ? 1 : dim) { - // Assert(is_scalar ? dim == 1 : dim != 1); - } - - void - grow_to_at_least(int64_t element_count) override { - auto chunk_count = upper_div(element_count, size_per_chunk_); - chunks_.emplace_to_at_least(chunk_count, Dim * size_per_chunk_); - } - - void - grow_on_demand(int64_t element_count) { - auto chunk_count = upper_div(element_count, size_per_chunk_); - chunks_.emplace_to_at_least(chunk_count, Dim * element_count); + explicit ConcurrentVectorImpl(ssize_t elements_per_row, + int64_t size_per_chunk) + : VectorBase(size_per_chunk), + elements_per_row_(is_type_entire_row ? 1 : elements_per_row) { } Span get_span(int64_t chunk_id) const { auto& chunk = get_chunk(chunk_id); - if constexpr (is_scalar) { + if constexpr (is_type_entire_row) { return Span(chunk.data(), chunk.size()); } else if constexpr (std::is_same_v || // NOLINT std::is_same_v) { @@ -191,7 +178,8 @@ class ConcurrentVectorImpl : public VectorBase { } else { static_assert( std::is_same_v); - return Span(chunk.data(), chunk.size(), Dim); + return Span( + chunk.data(), chunk.size(), elements_per_row_); } } @@ -201,15 +189,14 @@ class ConcurrentVectorImpl : public VectorBase { } void - fill_chunk_data(const std::vector& datas) - override { // used only for sealed segment - AssertInfo(chunks_.size() == 0, "no empty concurrent vector"); + fill_chunk_data(const std::vector& datas) override { + AssertInfo(chunks_.size() == 0, "non empty concurrent vector"); int64_t element_count = 0; for (auto& field_data : datas) { element_count += field_data->get_num_rows(); } - chunks_.emplace_to_at_least(1, Dim * element_count); + chunks_.emplace_to_at_least(1, elements_per_row_ * element_count); int64_t offset = 0; for (auto& field_data : datas) { auto num_rows = field_data->get_num_rows(); @@ -236,11 +223,70 @@ class ConcurrentVectorImpl : public VectorBase { if (element_count == 0) { return; } - this->grow_to_at_least(element_offset + element_count); + chunks_.emplace_to_at_least( + upper_div(element_offset + element_count, size_per_chunk_), + elements_per_row_ * size_per_chunk_); set_data( element_offset, static_cast(source), element_count); } + const Chunk& + get_chunk(ssize_t chunk_index) const { + return chunks_[chunk_index]; + } + + Chunk& + get_chunk(ssize_t index) { + return chunks_[index]; + } + + const void* + get_chunk_data(ssize_t chunk_index) const override { + return chunks_[chunk_index].data(); + } + + // just for fun, don't use it directly + const Type* + get_element(ssize_t element_index) const { + auto chunk_id = element_index / size_per_chunk_; + auto chunk_offset = element_index % size_per_chunk_; + return get_chunk(chunk_id).data() + chunk_offset * elements_per_row_; + } + + const Type& + operator[](ssize_t element_index) const { + AssertInfo( + elements_per_row_ == 1, + fmt::format( + "The value of elements_per_row_ is not 1, elements_per_row_={}", + elements_per_row_)); + auto chunk_id = element_index / size_per_chunk_; + auto chunk_offset = element_index % size_per_chunk_; + return get_chunk(chunk_id)[chunk_offset]; + } + + ssize_t + num_chunk() const override { + return chunks_.size(); + } + + bool + empty() override { + for (size_t i = 0; i < chunks_.size(); i++) { + if (get_chunk(i).size() > 0) { + return false; + } + } + + return true; + } + + void + clear() { + chunks_.clear(); + } + + private: void set_data(ssize_t element_offset, const Type* source, @@ -277,60 +323,6 @@ class ConcurrentVectorImpl : public VectorBase { } } - const Chunk& - get_chunk(ssize_t chunk_index) const { - return chunks_[chunk_index]; - } - - Chunk& - get_chunk(ssize_t index) { - return chunks_[index]; - } - - const void* - get_chunk_data(ssize_t chunk_index) const override { - return chunks_[chunk_index].data(); - } - - // just for fun, don't use it directly - const Type* - get_element(ssize_t element_index) const { - auto chunk_id = element_index / size_per_chunk_; - auto chunk_offset = element_index % size_per_chunk_; - return get_chunk(chunk_id).data() + chunk_offset * Dim; - } - - const Type& - operator[](ssize_t element_index) const { - AssertInfo(Dim == 1, - fmt::format("The value of Dim is not 1, Dim={}", Dim)); - auto chunk_id = element_index / size_per_chunk_; - auto chunk_offset = element_index % size_per_chunk_; - return get_chunk(chunk_id)[chunk_offset]; - } - - ssize_t - num_chunk() const override { - return chunks_.size(); - } - - bool - empty() override { - for (size_t i = 0; i < chunks_.size(); i++) { - if (get_chunk(i).size() > 0) { - return false; - } - } - - return true; - } - - void - clear() { - chunks_.clear(); - } - - private: void fill_chunk(ssize_t chunk_id, ssize_t chunk_offset, @@ -349,12 +341,12 @@ class ConcurrentVectorImpl : public VectorBase { Chunk& chunk = chunks_[chunk_id]; auto ptr = chunk.data(); - std::copy_n(source + source_offset * Dim, - element_count * Dim, - ptr + chunk_offset * Dim); + std::copy_n(source + source_offset * elements_per_row_, + element_count * elements_per_row_, + ptr + chunk_offset * elements_per_row_); } - const ssize_t Dim; + const ssize_t elements_per_row_; private: ThreadSafeVector chunks_; @@ -370,6 +362,40 @@ class ConcurrentVector : public ConcurrentVectorImpl { } }; +template <> +class ConcurrentVector + : public ConcurrentVectorImpl, true> { + public: + explicit ConcurrentVector(int64_t size_per_chunk) + : ConcurrentVectorImpl, + true>::ConcurrentVectorImpl(1, size_per_chunk), + dim_(0) { + } + + void + set_data_raw(ssize_t element_offset, + const void* source, + ssize_t element_count) override { + auto* src = + static_cast*>(source); + for (int i = 0; i < element_count; ++i) { + dim_ = std::max(dim_, src[i].dim()); + } + ConcurrentVectorImpl, + true>::set_data_raw(element_offset, + source, + element_count); + } + + int64_t + Dim() const { + return dim_; + } + + private: + int64_t dim_; +}; + template <> class ConcurrentVector : public ConcurrentVectorImpl { diff --git a/internal/core/src/segcore/FieldIndexing.cpp b/internal/core/src/segcore/FieldIndexing.cpp index 09af62b6e1..7455346080 100644 --- a/internal/core/src/segcore/FieldIndexing.cpp +++ b/internal/core/src/segcore/FieldIndexing.cpp @@ -11,6 +11,7 @@ #include #include + #include "common/EasyAssert.h" #include "fmt/format.h" #include "index/ScalarIndexSort.h" @@ -29,8 +30,8 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta, int64_t segment_max_row_count, const SegcoreConfig& segcore_config) : FieldIndexing(field_meta, segcore_config), - build(false), - sync_with_index(false), + built_(false), + sync_with_index_(false), config_(std::make_unique(segment_max_row_count, field_index_meta, segcore_config, @@ -45,6 +46,7 @@ void VectorFieldIndexing::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { + // No BuildIndexRange support for sparse vector. AssertInfo(field_meta_.get_data_type() == DataType::VECTOR_FLOAT, "Data type of vector field is not VECTOR_FLOAT"); auto dim = field_meta_.get_dim(); @@ -85,13 +87,65 @@ VectorFieldIndexing::GetDataFromIndex(const int64_t* seg_offsets, } void -VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, - int64_t size, - const VectorBase* field_raw_data, - const void* data_source) { +VectorFieldIndexing::AppendSegmentIndexSparse(int64_t reserved_offset, + int64_t size, + int64_t new_data_dim, + const VectorBase* field_raw_data, + const void* data_source) { + auto conf = get_build_params(); + auto source = dynamic_cast*>( + field_raw_data); + AssertInfo(source, + "field_raw_data can't cast to " + "ConcurrentVector type"); + AssertInfo(size > 0, "append 0 sparse rows to index is not allowed"); + if (!built_) { + AssertInfo(!sync_with_index_, "index marked synced before built"); + idx_t total_rows = reserved_offset + size; + idx_t chunk_id = 0; + auto dim = source->Dim(); + + while (total_rows > 0) { + auto mat = static_cast*>( + source->get_chunk_data(chunk_id)); + auto rows = std::min(source->get_size_per_chunk(), total_rows); + auto dataset = knowhere::GenDataSet(rows, dim, mat); + dataset->SetIsSparse(true); + try { + if (chunk_id == 0) { + index_->BuildWithDataset(dataset, conf); + } else { + index_->AddWithDataset(dataset, conf); + } + } catch (SegcoreError& error) { + LOG_ERROR("growing sparse index build error: {}", error.what()); + return; + } + index_cur_.fetch_add(rows); + total_rows -= rows; + chunk_id++; + } + built_ = true; + sync_with_index_ = true; + // if not built_, new rows in data_source have already been added to + // source(ConcurrentVector) and thus added to the + // index, thus no need to add again. + return; + } + + auto dataset = knowhere::GenDataSet(size, new_data_dim, data_source); + dataset->SetIsSparse(true); + index_->AddWithDataset(dataset, conf); + index_cur_.fetch_add(size); +} + +void +VectorFieldIndexing::AppendSegmentIndexDense(int64_t reserved_offset, + int64_t size, + const VectorBase* field_raw_data, + const void* data_source) { AssertInfo(field_meta_.get_data_type() == DataType::VECTOR_FLOAT, "Data type of vector field is not VECTOR_FLOAT"); - auto dim = field_meta_.get_dim(); auto conf = get_build_params(); auto source = @@ -100,8 +154,9 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, auto size_per_chunk = source->get_size_per_chunk(); //append vector [vector_id_beg, vector_id_end] into index //build index [vector_id_beg, build_threshold) when index not exist - if (!build) { + if (!built_) { idx_t vector_id_beg = index_cur_.load(); + Assert(vector_id_beg == 0); idx_t vector_id_end = get_build_threshold() - 1; auto chunk_id_beg = vector_id_beg / size_per_chunk; auto chunk_id_end = vector_id_end / size_per_chunk; @@ -143,7 +198,7 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, return; } index_cur_.fetch_add(vec_num); - build = true; + built_ = true; } //append rest data when index has built idx_t vector_id_beg = index_cur_.load(); @@ -153,11 +208,12 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, int64_t vec_num = vector_id_end - vector_id_beg + 1; if (vec_num <= 0) { - sync_with_index.store(true); + sync_with_index_.store(true); return; } - if (sync_with_index.load()) { + if (sync_with_index_.load()) { + Assert(size == vec_num); auto dataset = knowhere::GenDataSet(vec_num, dim, data_source); index_->AddWithDataset(dataset, conf); index_cur_.fetch_add(vec_num); @@ -179,7 +235,7 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, index_->AddWithDataset(dataset, conf); index_cur_.fetch_add(chunk_sz); } - sync_with_index.store(true); + sync_with_index_.store(true); } } @@ -188,6 +244,8 @@ VectorFieldIndexing::get_build_params() const { auto config = config_->GetBuildBaseParams(); config[knowhere::meta::DIM] = std::to_string(field_meta_.get_dim()); config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1); + // for sparse float vector: drop_ratio_build config is not allowed to be set + // on growing segment index. return config; } @@ -203,7 +261,7 @@ VectorFieldIndexing::get_index_cursor() { } bool VectorFieldIndexing::sync_data_with_index() const { - return sync_with_index.load(); + return sync_with_index_.load(); } bool @@ -243,17 +301,10 @@ CreateIndex(const FieldMeta& field_meta, int64_t segment_max_row_count, const SegcoreConfig& segcore_config) { if (field_meta.is_vector()) { - if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { - return std::make_unique(field_meta, - field_index_meta, - segment_max_row_count, - segcore_config); - } else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) { - return std::make_unique(field_meta, - field_index_meta, - segment_max_row_count, - segcore_config); - } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { + if (field_meta.get_data_type() == DataType::VECTOR_FLOAT || + field_meta.get_data_type() == DataType::VECTOR_FLOAT16 || + field_meta.get_data_type() == DataType::VECTOR_BFLOAT16 || + field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) { return std::make_unique(field_meta, field_index_meta, segment_max_row_count, diff --git a/internal/core/src/segcore/FieldIndexing.h b/internal/core/src/segcore/FieldIndexing.h index 249a4b99da..0033a6b051 100644 --- a/internal/core/src/segcore/FieldIndexing.h +++ b/internal/core/src/segcore/FieldIndexing.h @@ -51,10 +51,18 @@ class FieldIndexing { const VectorBase* vec_base) = 0; virtual void - AppendSegmentIndex(int64_t reserved_offset, - int64_t size, - const VectorBase* vec_base, - const void* data_source) = 0; + AppendSegmentIndexDense(int64_t reserved_offset, + int64_t size, + const VectorBase* vec_base, + const void* data_source) = 0; + + // new_data_dim is the dimension of the new data being appended(data_source) + virtual void + AppendSegmentIndexSparse(int64_t reserved_offset, + int64_t size, + int64_t new_data_dim, + const VectorBase* vec_base, + const void* data_source) = 0; virtual void GetDataFromIndex(const int64_t* seg_offsets, @@ -109,12 +117,22 @@ class ScalarFieldIndexing : public FieldIndexing { const VectorBase* vec_base) override; void - AppendSegmentIndex(int64_t reserved_offset, - int64_t size, - const VectorBase* vec_base, - const void* data_source) override { + AppendSegmentIndexDense(int64_t reserved_offset, + int64_t size, + const VectorBase* vec_base, + const void* data_source) override { PanicInfo(Unsupported, - "scalar index don't support append segment index"); + "scalar index doesn't support append vector segment index"); + } + + void + AppendSegmentIndexSparse(int64_t reserved_offset, + int64_t size, + int64_t new_data_dim, + const VectorBase* vec_base, + const void* data_source) override { + PanicInfo(Unsupported, + "scalar index doesn't support append vector segment index"); } void @@ -171,10 +189,17 @@ class VectorFieldIndexing : public FieldIndexing { const VectorBase* vec_base) override; void - AppendSegmentIndex(int64_t reserved_offset, - int64_t size, - const VectorBase* field_raw_data, - const void* data_source) override; + AppendSegmentIndexDense(int64_t reserved_offset, + int64_t size, + const VectorBase* field_raw_data, + const void* data_source) override; + + void + AppendSegmentIndexSparse(int64_t reserved_offset, + int64_t size, + int64_t new_data_dim, + const VectorBase* field_raw_data, + const void* data_source) override; void GetDataFromIndex(const int64_t* seg_offsets, @@ -214,9 +239,13 @@ class VectorFieldIndexing : public FieldIndexing { get_search_params(const SearchInfo& searchInfo) const; private: + // current number of rows in index. std::atomic index_cur_ = 0; - std::atomic build; - std::atomic sync_with_index; + // whether the growing index has been built. + std::atomic built_; + // whether all insertd data has been added to growing index and can be + // searched. + std::atomic sync_with_index_; std::unique_ptr config_; std::unique_ptr index_; tbb::concurrent_vector> data_; @@ -283,19 +312,28 @@ class IndexingRecord { FieldId fieldId, const DataArray* stream_data, const InsertRecord& record) { - if (is_in(fieldId)) { - auto& indexing = field_indexings_.at(fieldId); - if (indexing->get_field_meta().is_vector() && - indexing->get_field_meta().get_data_type() == - DataType::VECTOR_FLOAT && - reserved_offset + size >= indexing->get_build_threshold()) { - auto field_raw_data = record.get_field_data_base(fieldId); - indexing->AppendSegmentIndex( - reserved_offset, - size, - field_raw_data, - stream_data->vectors().float_vector().data().data()); - } + if (!is_in(fieldId)) { + return; + } + auto& indexing = field_indexings_.at(fieldId); + auto type = indexing->get_field_meta().get_data_type(); + auto field_raw_data = record.get_field_data_base(fieldId); + if (type == DataType::VECTOR_FLOAT && + reserved_offset + size >= indexing->get_build_threshold()) { + indexing->AppendSegmentIndexDense( + reserved_offset, + size, + field_raw_data, + stream_data->vectors().float_vector().data().data()); + } else if (type == DataType::VECTOR_SPARSE_FLOAT) { + auto data = SparseBytesToRows( + stream_data->vectors().sparse_float_vector().contents()); + indexing->AppendSegmentIndexSparse( + reserved_offset, + size, + stream_data->vectors().sparse_float_vector().dim(), + field_raw_data, + data.get()); } } @@ -307,16 +345,28 @@ class IndexingRecord { FieldId fieldId, const FieldDataPtr data, const InsertRecord& record) { - if (is_in(fieldId)) { - auto& indexing = field_indexings_.at(fieldId); - if (indexing->get_field_meta().is_vector() && - indexing->get_field_meta().get_data_type() == - DataType::VECTOR_FLOAT && - reserved_offset + size >= indexing->get_build_threshold()) { - auto vec_base = record.get_field_data_base(fieldId); - indexing->AppendSegmentIndex( - reserved_offset, size, vec_base, data->Data()); - } + if (!is_in(fieldId)) { + return; + } + auto& indexing = field_indexings_.at(fieldId); + auto type = indexing->get_field_meta().get_data_type(); + const void* p = data->Data(); + + if (type == DataType::VECTOR_FLOAT && + reserved_offset + size >= indexing->get_build_threshold()) { + auto vec_base = record.get_field_data_base(fieldId); + indexing->AppendSegmentIndexDense( + reserved_offset, size, vec_base, data->Data()); + } else if (type == DataType::VECTOR_SPARSE_FLOAT) { + auto vec_base = record.get_field_data_base(fieldId); + indexing->AppendSegmentIndexSparse( + reserved_offset, + size, + std::dynamic_pointer_cast>( + data) + ->Dim(), + vec_base, + p); } } @@ -396,14 +446,12 @@ class IndexingRecord { IndexMetaPtr index_meta_; const SegcoreConfig& segcore_config_; - private: // control info std::atomic resource_ack_ = 0; // std::atomic finished_ack_ = 0; AckResponder finished_ack_; std::mutex mutex_; - private: // field_offset => indexing std::map> field_indexings_; }; diff --git a/internal/core/src/segcore/IndexConfigGenerator.cpp b/internal/core/src/segcore/IndexConfigGenerator.cpp index 583b457d92..c9c3906672 100644 --- a/internal/core/src/segcore/IndexConfigGenerator.cpp +++ b/internal/core/src/segcore/IndexConfigGenerator.cpp @@ -20,8 +20,23 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout, : max_index_row_count_(max_index_row_cout), config_(config) { origin_index_type_ = index_meta_.GetIndexType(); metric_type_ = index_meta_.GeMetricType(); + // Currently for dense vector index, if the segment is growing, we use IVFCC + // as the index type; if the segment is sealed but its index has not been + // built by the index node, we use IVFFLAT as the temp index type and + // release it once the index node has finished building the index and query + // node has loaded it. - index_type_ = support_index_types.at(segment_type); + // But for sparse vector index(INDEX_SPARSE_INVERTED_INDEX and + // INDEX_SPARSE_WAND), those index themselves can be used as the temp index + // type, so we can avoid the extra step of "releast temp and load". + + if (origin_index_type_ == + knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || + origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) { + index_type_ = origin_index_type_; + } else { + index_type_ = support_index_types.at(segment_type); + } build_params_[knowhere::meta::METRIC_TYPE] = metric_type_; build_params_[knowhere::indexparam::NLIST] = std::to_string(config_.get_nlist()); @@ -29,6 +44,8 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout, std::max((int)(config_.get_chunk_rows() / config_.get_nlist()), 48)); search_params_[knowhere::indexparam::NPROBE] = std::to_string(config_.get_nprobe()); + // note for sparse vector index: drop_ratio_build is not allowed for growing + // segment index. LOG_INFO( "VecIndexConfig: origin_index_type={}, index_type={}, metric_type={}", origin_index_type_, @@ -38,6 +55,14 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout, int64_t VecIndexConfig::GetBuildThreshold() const noexcept { + // For sparse, do not impose a threshold and start using index with any + // number of rows. Unlike dense vector index, growing sparse vector index + // does not require a minimum number of rows to train. + if (origin_index_type_ == + knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || + origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) { + return 0; + } assert(VecIndexConfig::index_build_ratio.count(index_type_)); auto ratio = VecIndexConfig::index_build_ratio.at(index_type_); assert(ratio >= 0.0 && ratio < 1.0); diff --git a/internal/core/src/segcore/IndexConfigGenerator.h b/internal/core/src/segcore/IndexConfigGenerator.h index 563e95e483..ce8c20b609 100644 --- a/internal/core/src/segcore/IndexConfigGenerator.h +++ b/internal/core/src/segcore/IndexConfigGenerator.h @@ -27,6 +27,8 @@ enum class IndexConfigLevel { SYSTEM_ASSIGN = 3 }; +// this is the config used for generating growing index or the temp sealed index +// when the segment is sealed before the index is built. class VecIndexConfig { inline static const std::map support_index_types = {{SegmentType::Growing, knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC}, diff --git a/internal/core/src/segcore/InsertRecord.cpp b/internal/core/src/segcore/InsertRecord.cpp deleted file mode 100644 index be9cc0a85a..0000000000 --- a/internal/core/src/segcore/InsertRecord.cpp +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#include "InsertRecord.h" diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index cb79020036..1723e40910 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -346,6 +346,11 @@ struct InsertRecord { this->append_field_data( field_id, field_meta.get_dim(), size_per_chunk); continue; + } else if (field_meta.get_data_type() == + DataType::VECTOR_SPARSE_FLOAT) { + this->append_field_data(field_id, + size_per_chunk); + continue; } else { PanicInfo(DataTypeInvalid, fmt::format("unsupported vector type", @@ -524,8 +529,7 @@ struct InsertRecord { AssertInfo(fields_data_.find(field_id) != fields_data_.end(), "Cannot find field_data with field_id: " + std::to_string(field_id.get())); - auto ptr = fields_data_.at(field_id).get(); - return ptr; + return fields_data_.at(field_id).get(); } // get field data in given type, const version @@ -552,7 +556,7 @@ struct InsertRecord { template void append_field_data(FieldId field_id, int64_t size_per_chunk) { - static_assert(IsScalar); + static_assert(IsScalar || IsSparse); fields_data_.emplace( field_id, std::make_unique>(size_per_chunk)); } @@ -608,7 +612,6 @@ struct InsertRecord { std::unique_ptr pk2offset_; private: - // std::vector> fields_data_; std::unordered_map> fields_data_{}; mutable std::shared_mutex shared_mutex_{}; }; diff --git a/internal/core/src/segcore/ScalarIndex.cpp b/internal/core/src/segcore/ScalarIndex.cpp deleted file mode 100644 index c5aaacdd70..0000000000 --- a/internal/core/src/segcore/ScalarIndex.cpp +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#include "common/EasyAssert.h" -#include "ScalarIndex.h" - -namespace milvus::segcore { -std::pair, std::vector> -ScalarIndexVector::do_search_ids(const IdArray& ids) const { - auto res_ids = std::make_unique(); - // TODO: support string array - static_assert(std::is_same_v); - AssertInfo(ids.has_int_id(), "ids doesn't have int_id field"); - auto src_ids = ids.int_id(); - auto dst_ids = res_ids->mutable_int_id(); - std::vector dst_offsets; - - // TODO: a possible optimization: - // TODO: sort the input id array to make access cache friendly - - // assume no repeated key now - // TODO: support repeated key - for (auto id : src_ids.data()) { - using Pair = std::pair; - auto [iter_beg, iter_end] = - std::equal_range(mapping_.begin(), - mapping_.end(), - std::make_pair(id, SegOffset(0)), - [](const Pair& left, const Pair& right) { - return left.first < right.first; - }); - - for (auto& iter = iter_beg; iter != iter_end; iter++) { - auto [entry_id, entry_offset] = *iter; - dst_ids->add_data(entry_id); - dst_offsets.push_back(entry_offset); - } - } - return {std::move(res_ids), std::move(dst_offsets)}; -} - -std::pair, std::vector> -ScalarIndexVector::do_search_ids(const std::vector& ids) const { - std::vector dst_offsets; - std::vector dst_ids; - - for (auto id : ids) { - using Pair = std::pair; - auto [iter_beg, iter_end] = - std::equal_range(mapping_.begin(), - mapping_.end(), - std::make_pair(id, SegOffset(0)), - [](const Pair& left, const Pair& right) { - return left.first < right.first; - }); - - for (auto& iter = iter_beg; iter != iter_end; iter++) { - auto [entry_id, entry_offset] = *iter_beg; - dst_ids.emplace_back(entry_id); - dst_offsets.push_back(entry_offset); - } - } - return {std::move(dst_ids), std::move(dst_offsets)}; -} - -void -ScalarIndexVector::append_data(const ScalarIndexVector::T* ids, - int64_t count, - SegOffset base) { - for (int64_t i = 0; i < count; ++i) { - auto offset = base + SegOffset(i); - mapping_.emplace_back(ids[i], offset); - } -} - -void -ScalarIndexVector::build() { - std::sort(mapping_.begin(), mapping_.end()); -} -} // namespace milvus::segcore diff --git a/internal/core/src/segcore/ScalarIndex.h b/internal/core/src/segcore/ScalarIndex.h deleted file mode 100644 index ae3e846fce..0000000000 --- a/internal/core/src/segcore/ScalarIndex.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#pragma once - -#include -#include -#include -#include - -#include "common/Types.h" -#include "pb/schema.pb.h" - -namespace milvus::segcore { - -class ScalarIndexBase { - public: - virtual std::pair, std::vector> - do_search_ids(const IdArray& ids) const = 0; - virtual std::pair, std::vector> - do_search_ids(const std::vector& ids) const = 0; - virtual ~ScalarIndexBase() = default; - virtual std::string - debug() const = 0; -}; - -class ScalarIndexVector : public ScalarIndexBase { - using T = int64_t; - - public: - // TODO: use proto::schema::ids - void - append_data(const T* ids, int64_t count, SegOffset base); - - void - build(); - - std::pair, std::vector> - do_search_ids(const IdArray& ids) const override; - - std::pair, std::vector> - do_search_ids(const std::vector& ids) const override; - - std::string - debug() const override { - std::string dbg_str; - for (auto pr : mapping_) { - dbg_str += "<" + std::to_string(pr.first) + "->" + - std::to_string(pr.second.get()) + ">"; - } - return dbg_str; - } - - private: - std::vector> mapping_; -}; - -} // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index f00b3fc3fc..5d51fe3cb5 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -32,7 +32,7 @@ class SegmentGrowing : public SegmentInternalInterface { int64_t size, const int64_t* row_ids, const Timestamp* timestamps, - const InsertData* insert_data) = 0; + const InsertRecordProto* insert_record_proto) = 0; SegmentType type() const override { diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index b9aca5f639..d78f80f303 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -87,15 +87,13 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, int64_t num_rows, const int64_t* row_ids, const Timestamp* timestamps_raw, - const InsertData* insert_data) { - AssertInfo(insert_data->num_rows() == num_rows, + const InsertRecordProto* insert_record_proto) { + AssertInfo(insert_record_proto->num_rows() == num_rows, "Entities_raw count not equal to insert size"); - // AssertInfo(insert_data->fields_data_size() == schema_->size(), - // "num fields of insert data not equal to num of schema fields"); // step 1: check insert data if valid std::unordered_map field_id_to_offset; int64_t field_offset = 0; - for (const auto& field : insert_data->fields_data()) { + for (const auto& field : insert_record_proto->fields_data()) { auto field_id = FieldId(field.field_id()); AssertInfo(!field_id_to_offset.count(field_id), "duplicate field data"); field_id_to_offset.emplace(field_id, field_offset++); @@ -122,7 +120,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, insert_record_.get_field_data_base(field_id)->set_data_raw( reserved_offset, num_rows, - &insert_data->fields_data(data_offset), + &insert_record_proto->fields_data(data_offset), field_meta); } //insert vector data into index @@ -131,13 +129,15 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, reserved_offset, num_rows, field_id, - &insert_data->fields_data(data_offset), + &insert_record_proto->fields_data(data_offset), insert_record_); } // update average row data size auto field_data_size = GetRawDataSizeOfDataArray( - &insert_data->fields_data(data_offset), field_meta, num_rows); + &insert_record_proto->fields_data(data_offset), + field_meta, + num_rows); if (datatype_is_variable(field_meta.get_data_type())) { SegmentInternalInterface::set_field_avg_size( field_id, num_rows, field_data_size); @@ -153,7 +153,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, AssertInfo(field_id.get() != INVALID_FIELD_ID, "Primary key is -1"); std::vector pks(num_rows); ParsePksFromFieldData( - pks, insert_data->fields_data(field_id_to_offset[field_id])); + pks, insert_record_proto->fields_data(field_id_to_offset[field_id])); for (int i = 0; i < num_rows; ++i) { insert_record_.insert_pk(pks[i], reserved_offset + i); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 9d7a0668e9..d26fb6fb14 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -45,7 +45,7 @@ class SegmentGrowingImpl : public SegmentGrowing { int64_t size, const int64_t* row_ids, const Timestamp* timestamps, - const InsertData* insert_data) override; + const InsertRecordProto* insert_record_proto) override; bool Contain(const PkType& pk) const override { diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index cd0367cedf..120316529c 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -37,6 +37,7 @@ #include "common/FieldData.h" #include "common/Types.h" #include "log/Log.h" +#include "mmap/Utils.h" #include "pb/schema.pb.h" #include "mmap/Types.h" #include "query/ScalarIndex.h" @@ -252,7 +253,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { field_data_info.channel->set_capacity(parallel_degree * 2); auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); - auto load_future = pool.Submit( + pool.Submit( LoadFieldDatasFromRemote, insert_files, field_data_info.channel); LOG_INFO("segment {} submits load field {} task to thread pool", @@ -272,6 +273,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { void SegmentSealedImpl::LoadFieldDataV2(const LoadFieldDataInfo& load_info) { + // TODO(SPARSE): support storage v2 // NOTE: lock only when data is ready to avoid starvation // only one field for now, parallel load field data in golang size_t num_rows = storage::GetNumRowsForLoadInfo(load_info); @@ -435,6 +437,16 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { column = std::move(var_column); break; } + case milvus::DataType::VECTOR_SPARSE_FLOAT: { + auto col = std::make_shared(field_meta); + FieldDataPtr field_data; + while (data.channel->pop(field_data)) { + stats_.mem_size += field_data->Size(); + col->AppendBatch(field_data); + } + column = std::move(col); + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported data type", data_type)); @@ -566,6 +578,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { column = std::move(arr_column); break; } + // TODO(SPARSE) support mmap default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported data type {}", data_type)); @@ -1514,14 +1527,17 @@ SegmentSealedImpl::generate_binlog_index(const FieldId field_id) { auto& field_index_meta = col_index_meta_->GetFieldIndexMeta(field_id); auto& index_params = field_index_meta.GetIndexParams(); + bool is_sparse = + field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT; + auto enable_binlog_index = [&]() { // checkout config if (!segcore_config_.get_enable_interim_segment_index()) { return false; } // check data type - if (!field_meta.is_vector() || - field_meta.get_data_type() != DataType::VECTOR_FLOAT) { + if (field_meta.get_data_type() != DataType::VECTOR_FLOAT && + !is_sparse) { return false; } // check index type @@ -1546,7 +1562,7 @@ SegmentSealedImpl::generate_binlog_index(const FieldId field_id) { std::shared_lock lck(mutex_); row_count = num_rows_.value(); } - auto dim = field_meta.get_dim(); + // generate index params auto field_binlog_config = std::unique_ptr( new VecIndexConfig(row_count, @@ -1556,19 +1572,24 @@ SegmentSealedImpl::generate_binlog_index(const FieldId field_id) { if (row_count < field_binlog_config->GetBuildThreshold()) { return false; } - auto build_config = field_binlog_config->GetBuildBaseParams(); - build_config[knowhere::meta::DIM] = std::to_string(dim); - build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1); - auto index_metric = field_binlog_config->GetMetricType(); - std::shared_ptr vec_data{}; { std::shared_lock lck(mutex_); vec_data = fields_.at(field_id); } + auto dim = is_sparse + ? dynamic_cast(vec_data.get())->Dim() + : field_meta.get_dim(); + + auto build_config = field_binlog_config->GetBuildBaseParams(); + build_config[knowhere::meta::DIM] = std::to_string(dim); + build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1); + auto index_metric = field_binlog_config->GetMetricType(); + auto dataset = knowhere::GenDataSet(row_count, dim, (void*)vec_data->Data()); dataset->SetIsOwner(false); + dataset->SetIsSparse(is_sparse); index::IndexBasePtr vec_index = std::make_unique>( diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index c1cb987041..713ffec7a3 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -24,7 +24,6 @@ #include "ConcurrentVector.h" #include "DeletedRecord.h" -#include "ScalarIndex.h" #include "SealedIndexingRecord.h" #include "SegmentSealed.h" #include "TimestampIndex.h" diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index a418c53b40..b05adadba9 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -21,8 +21,9 @@ #include "index/ScalarIndex.h" #include "log/Log.h" #include "mmap/Utils.h" -#include "storage/ThreadPool.h" +#include "common/FieldData.h" #include "storage/RemoteChunkManagerSingleton.h" +#include "common/Common.h" #include "storage/ThreadPools.h" #include "storage/Util.h" @@ -205,6 +206,11 @@ GetRawDataSizeOfDataArray(const DataArray* data, break; } + case DataType::VECTOR_SPARSE_FLOAT: { + // TODO(SPARSE, size) + result += data->vectors().sparse_float_vector().ByteSizeLong(); + break; + } default: { PanicInfo( DataTypeInvalid, @@ -338,6 +344,10 @@ CreateVectorDataArray(int64_t count, const FieldMeta& field_meta) { obj->resize(length * sizeof(bfloat16)); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + // does nothing here + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); @@ -446,8 +456,11 @@ CreateVectorDataArrayFrom(const void* data_raw, field_meta.get_data_type())); auto vector_array = data_array->mutable_vectors(); - auto dim = field_meta.get_dim(); - vector_array->set_dim(dim); + auto dim = 0; + if (!datatype_is_sparse_vector(data_type)) { + dim = field_meta.get_dim(); + vector_array->set_dim(dim); + } switch (data_type) { case DataType::VECTOR_FLOAT: { auto length = count * dim; @@ -479,6 +492,15 @@ CreateVectorDataArrayFrom(const void* data_raw, obj->assign(data, length * sizeof(bfloat16)); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + SparseRowsToProto( + reinterpret_cast*>( + data_raw), + count, + vector_array->mutable_sparse_float_vector()); + vector_array->set_dim(vector_array->sparse_float_vector().dim()); + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); @@ -534,6 +556,15 @@ MergeDataArray( auto data = VEC_FIELD_DATA(src_field_data, binary); auto obj = vector_array->mutable_binary_vector(); obj->assign(data + src_offset * num_bytes, num_bytes); + } else if (field_meta.get_data_type() == + DataType::VECTOR_SPARSE_FLOAT) { + auto src = src_field_data->vectors().sparse_float_vector(); + auto dst = vector_array->mutable_sparse_float_vector(); + if (src.dim() > dst->dim()) { + dst->set_dim(src.dim()); + } + vector_array->set_dim(dst->dim()); + *dst->mutable_contents() = src.contents(); } else { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 75f697f93f..111294f915 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -207,12 +207,17 @@ Insert(CSegmentInterface c_segment, const uint64_t data_info_len) { try { auto segment = static_cast(c_segment); - auto insert_data = std::make_unique(); - auto suc = insert_data->ParseFromArray(data_info, data_info_len); + auto insert_record_proto = + std::make_unique(); + auto suc = + insert_record_proto->ParseFromArray(data_info, data_info_len); AssertInfo(suc, "failed to parse insert data from records"); - segment->Insert( - reserved_offset, size, row_ids, timestamps, insert_data.get()); + segment->Insert(reserved_offset, + size, + row_ids, + timestamps, + insert_record_proto.get()); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 1e40dfe850..99638558ea 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -15,6 +15,7 @@ // limitations under the License. #include "ChunkCache.h" +#include "mmap/Utils.h" namespace milvus::storage { diff --git a/internal/core/src/storage/ChunkManager.h b/internal/core/src/storage/ChunkManager.h index 6b0cfb8091..9f51154ee6 100644 --- a/internal/core/src/storage/ChunkManager.h +++ b/internal/core/src/storage/ChunkManager.h @@ -58,7 +58,7 @@ class ChunkManager { Read(const std::string& filepath, void* buf, uint64_t len) = 0; /** - * @brief Write buffer to file with offset + * @brief Write buffer to file without offset * @param filepath * @param buf * @param len diff --git a/internal/core/src/storage/Event.cpp b/internal/core/src/storage/Event.cpp index 55ff73ced2..dbe0f8861a 100644 --- a/internal/core/src/storage/Event.cpp +++ b/internal/core/src/storage/Event.cpp @@ -215,7 +215,8 @@ std::vector BaseEventData::Serialize() { auto data_type = field_data->get_data_type(); std::shared_ptr payload_writer; - if (milvus::datatype_is_vector(data_type)) { + if (milvus::datatype_is_vector(data_type) && + data_type != DataType::VECTOR_SPARSE_FLOAT) { payload_writer = std::make_unique(data_type, field_data->get_dim()); } else { @@ -259,6 +260,18 @@ BaseEventData::Serialize() { } break; } + case DataType::VECTOR_SPARSE_FLOAT: { + for (size_t offset = 0; offset < field_data->get_num_rows(); + ++offset) { + auto row = + static_cast*>( + field_data->RawValue(offset)); + payload_writer->add_one_binary_payload( + static_cast(row->data()), + row->data_byte_size()); + } + break; + } default: { auto payload = Payload{data_type, diff --git a/internal/core/src/storage/PayloadReader.cpp b/internal/core/src/storage/PayloadReader.cpp index 2c1ae76fa7..0305b5ce4b 100644 --- a/internal/core/src/storage/PayloadReader.cpp +++ b/internal/core/src/storage/PayloadReader.cpp @@ -59,7 +59,9 @@ PayloadReader::init(std::shared_ptr input) { int64_t column_index = 0; auto file_meta = arrow_reader->parquet_reader()->metadata(); - dim_ = datatype_is_vector(column_type_) + // dim is unused for sparse float vector + dim_ = (datatype_is_vector(column_type_) && + column_type_ != DataType::VECTOR_SPARSE_FLOAT) ? GetDimensionFromFileMetaData( file_meta->schema()->Column(column_index), column_type_) : 1; diff --git a/internal/core/src/storage/PayloadWriter.cpp b/internal/core/src/storage/PayloadWriter.cpp index 54c47ed81e..52551a821f 100644 --- a/internal/core/src/storage/PayloadWriter.cpp +++ b/internal/core/src/storage/PayloadWriter.cpp @@ -31,6 +31,9 @@ PayloadWriter::PayloadWriter(const DataType column_type) // create payload writer for vector data type PayloadWriter::PayloadWriter(const DataType column_type, int dim) : column_type_(column_type) { + AssertInfo(column_type != DataType::VECTOR_SPARSE_FLOAT, + "PayloadWriter for Sparse Float Vector should be created " + "using the constructor without dimension"); init_dimension(dim); } @@ -58,7 +61,9 @@ PayloadWriter::add_one_string_payload(const char* str, int str_size) { void PayloadWriter::add_one_binary_payload(const uint8_t* data, int length) { AssertInfo(output_ == nullptr, "payload writer has been finished"); - AssertInfo(milvus::datatype_is_binary(column_type_), "mismatch data type"); + AssertInfo(milvus::datatype_is_binary(column_type_) || + milvus::datatype_is_sparse_vector(column_type_), + "mismatch data type"); AddOneBinaryToArrowBuilder(builder_, data, length); rows_.fetch_add(1); } diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index ed2c597d2a..31d5600515 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -39,8 +39,10 @@ #include "storage/OpenDALChunkManager.h" #endif #include "storage/Types.h" -#include "storage/ThreadPools.h" #include "storage/Util.h" +#include "storage/ThreadPools.h" +#include "storage/MemFileManagerImpl.h" +#include "storage/DiskFileManagerImpl.h" namespace milvus::storage { @@ -170,6 +172,12 @@ AddPayloadToArrowBuilder(std::shared_ptr builder, add_vector_payload(builder, const_cast(raw_data), length); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + PanicInfo(DataTypeInvalid, + "Sparse Float Vector payload should be added by calling " + "add_one_binary_payload", + data_type); + } default: { PanicInfo(DataTypeInvalid, "unsupported data type {}", data_type); } @@ -242,6 +250,10 @@ CreateArrowBuilder(DataType data_type) { case DataType::JSON: { return std::make_shared(); } + // sparse float vector doesn't require a dim + case DataType::VECTOR_SPARSE_FLOAT: { + return std::make_shared(); + } default: { PanicInfo( DataTypeInvalid, "unsupported numeric data type {}", data_type); @@ -311,6 +323,10 @@ CreateArrowSchema(DataType data_type) { case DataType::JSON: { return arrow::schema({arrow::field("val", arrow::binary())}); } + // sparse float vector doesn't require a dim + case DataType::VECTOR_SPARSE_FLOAT: { + return arrow::schema({arrow::field("val", arrow::binary())}); + } default: { PanicInfo( DataTypeInvalid, "unsupported numeric data type {}", data_type); @@ -341,6 +357,9 @@ CreateArrowSchema(DataType data_type, int dim) { return arrow::schema({arrow::field( "val", arrow::fixed_size_binary(dim * sizeof(bfloat16)))}); } + case DataType::VECTOR_SPARSE_FLOAT: { + return arrow::schema({arrow::field("val", arrow::binary())}); + } default: { PanicInfo( DataTypeInvalid, "unsupported vector data type {}", data_type); @@ -364,6 +383,11 @@ GetDimensionFromFileMetaData(const parquet::ColumnDescriptor* schema, case DataType::VECTOR_BFLOAT16: { return schema->type_length() / sizeof(bfloat16); } + case DataType::VECTOR_SPARSE_FLOAT: { + PanicInfo(DataTypeInvalid, + fmt::format("GetDimensionFromFileMetaData should not be " + "called for sparse vector")); + } default: PanicInfo(DataTypeInvalid, "unsupported data type {}", data_type); } @@ -501,11 +525,12 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, field_data->FillFieldData(buf, element_count); auto insertData = std::make_shared(field_data); insertData->SetFieldDataMeta(field_data_meta); - auto serialized_index_data = insertData->serialize_to_remote_file(); - auto serialized_index_size = serialized_index_data.size(); - chunk_manager->Write( - object_key, serialized_index_data.data(), serialized_index_size); - return std::make_pair(std::move(object_key), serialized_index_size); + auto serialized_inserted_data = insertData->serialize_to_remote_file(); + auto serialized_inserted_data_size = serialized_inserted_data.size(); + chunk_manager->Write(object_key, + serialized_inserted_data.data(), + serialized_inserted_data_size); + return std::make_pair(std::move(object_key), serialized_inserted_data_size); } std::vector>> @@ -738,6 +763,9 @@ CreateFieldData(const DataType& type, int64_t dim, int64_t total_num_rows) { case DataType::VECTOR_BFLOAT16: return std::make_shared>( dim, type, total_num_rows); + case DataType::VECTOR_SPARSE_FLOAT: + return std::make_shared>( + type, total_num_rows); default: throw SegcoreError( DataTypeInvalid, diff --git a/internal/core/src/storage/parquet_c.h b/internal/core/src/storage/parquet_c.h index db54eb7c63..0348c9461b 100644 --- a/internal/core/src/storage/parquet_c.h +++ b/internal/core/src/storage/parquet_c.h @@ -31,6 +31,8 @@ typedef struct CBuffer { } CBuffer; //============= payload writer ====================== +// TODO(SPARSE): CPayloadWriter is no longer used as we switch to the payload +// writer in golang. Thus not implementing sparse float vector support here. typedef void* CPayloadWriter; CPayloadWriter NewPayloadWriter(int columnType); diff --git a/internal/core/unittest/test_binlog_index.cpp b/internal/core/unittest/test_binlog_index.cpp index d4ddc870a6..2737af6c92 100644 --- a/internal/core/unittest/test_binlog_index.cpp +++ b/internal/core/unittest/test_binlog_index.cpp @@ -141,9 +141,9 @@ class BinlogIndexTest : public ::testing::TestWithParam { std::shared_ptr vec_data; }; -INSTANTIATE_TEST_CASE_P(MetricTypeParameters, - BinlogIndexTest, - ::testing::Values(knowhere::metric::L2)); +INSTANTIATE_TEST_SUITE_P(MetricTypeParameters, + BinlogIndexTest, + ::testing::Values(knowhere::metric::L2)); TEST_P(BinlogIndexTest, Accuracy) { IndexMetaPtr collection_index_meta = diff --git a/internal/core/unittest/test_concurrent_vector.cpp b/internal/core/unittest/test_concurrent_vector.cpp index eea65bd4fa..a59f07d0de 100644 --- a/internal/core/unittest/test_concurrent_vector.cpp +++ b/internal/core/unittest/test_concurrent_vector.cpp @@ -34,8 +34,7 @@ TEST(ConcurrentVector, TestSingle) { for (auto& x : vec) { x = data++; } - c_vec.grow_to_at_least(total_count + insert_size); - c_vec.set_data(total_count, vec.data(), insert_size); + c_vec.set_data_raw(total_count, vec.data(), insert_size); total_count += insert_size; } ASSERT_EQ(c_vec.num_chunk(), (total_count + 31) / 32); @@ -66,8 +65,7 @@ TEST(ConcurrentVector, TestMultithreads) { x = data++ * threads + thread_id; } auto offset = ack_counter.fetch_add(insert_size); - c_vec.grow_to_at_least(offset + insert_size); - c_vec.set_data(offset, vec.data(), insert_size); + c_vec.set_data_raw(offset, vec.data(), insert_size); total_count += insert_size; } assert(data == total_count * dim); diff --git a/internal/core/unittest/test_data_codec.cpp b/internal/core/unittest/test_data_codec.cpp index 36a6621bc9..0a4e7b36ff 100644 --- a/internal/core/unittest/test_data_codec.cpp +++ b/internal/core/unittest/test_data_codec.cpp @@ -22,6 +22,8 @@ #include "storage/Util.h" #include "common/Consts.h" #include "common/Json.h" +#include "test_utils/Constants.h" +#include "test_utils/DataGen.h" using namespace milvus; @@ -274,6 +276,45 @@ TEST(storage, InsertDataFloatVector) { ASSERT_EQ(data, new_data); } +TEST(storage, InsertDataSparseFloat) { + auto n_rows = 100; + auto vecs = milvus::segcore::GenerateRandomSparseFloatVector( + n_rows, kTestSparseDim, kTestSparseVectorDensity); + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::VECTOR_SPARSE_FLOAT, kTestSparseDim, n_rows); + field_data->FillFieldData(vecs.get(), n_rows); + + storage::InsertData insert_data(field_data); + storage::FieldDataMeta field_data_meta{100, 101, 102, 103}; + insert_data.SetFieldDataMeta(field_data_meta); + insert_data.SetTimestamps(0, 100); + + auto serialized_bytes = insert_data.Serialize(storage::StorageType::Remote); + std::shared_ptr serialized_data_ptr(serialized_bytes.data(), + [&](uint8_t*) {}); + auto new_insert_data = storage::DeserializeFileData( + serialized_data_ptr, serialized_bytes.size()); + ASSERT_EQ(new_insert_data->GetCodecType(), storage::InsertDataType); + ASSERT_EQ(new_insert_data->GetTimeRage(), + std::make_pair(Timestamp(0), Timestamp(100))); + auto new_payload = new_insert_data->GetFieldData(); + ASSERT_TRUE(new_payload->get_data_type() == + storage::DataType::VECTOR_SPARSE_FLOAT); + ASSERT_EQ(new_payload->get_num_rows(), n_rows); + auto new_data = static_cast*>( + new_payload->Data()); + + for (auto i = 0; i < n_rows; ++i) { + auto& original = vecs[i]; + auto& new_vec = new_data[i]; + ASSERT_EQ(original.size(), new_vec.size()); + for (auto j = 0; j < original.size(); ++j) { + ASSERT_EQ(original[j].id, new_vec[j].id); + ASSERT_EQ(original[j].val, new_vec[j].val); + } + } +} + TEST(storage, InsertDataBinaryVector) { std::vector data = {1, 2, 3, 4, 5, 6, 7, 8}; int DIM = 16; diff --git a/internal/core/unittest/test_growing_index.cpp b/internal/core/unittest/test_growing_index.cpp index eb12a2793d..b7814ef5b6 100644 --- a/internal/core/unittest/test_growing_index.cpp +++ b/internal/core/unittest/test_growing_index.cpp @@ -155,11 +155,11 @@ class GrowingIndexGetVectorTest : public ::testing::TestWithParam { const char* metricType; }; -INSTANTIATE_TEST_CASE_P(IndexTypeParameters, - GrowingIndexGetVectorTest, - ::testing::Values(knowhere::metric::L2, - knowhere::metric::COSINE, - knowhere::metric::IP)); +INSTANTIATE_TEST_SUITE_P(IndexTypeParameters, + GrowingIndexGetVectorTest, + ::testing::Values(knowhere::metric::L2, + knowhere::metric::COSINE, + knowhere::metric::IP)); TEST_P(GrowingIndexGetVectorTest, GetVector) { auto schema = std::make_shared(); diff --git a/internal/core/unittest/test_index_c_api.cpp b/internal/core/unittest/test_index_c_api.cpp index f76f864d3e..042255028a 100644 --- a/internal/core/unittest/test_index_c_api.cpp +++ b/internal/core/unittest/test_index_c_api.cpp @@ -79,6 +79,70 @@ TEST(FloatVecIndex, All) { { DeleteBinarySet(binary_set); } } +TEST(SparseFloatVecIndex, All) { + auto index_type = knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX; + auto metric_type = knowhere::metric::IP; + indexcgo::TypeParams type_params; + indexcgo::IndexParams index_params; + std::tie(type_params, index_params) = + generate_params(index_type, metric_type); + std::string type_params_str, index_params_str; + bool ok = google::protobuf::TextFormat::PrintToString(type_params, + &type_params_str); + assert(ok); + ok = google::protobuf::TextFormat::PrintToString(index_params, + &index_params_str); + assert(ok); + auto dataset = GenDatasetWithDataType( + NB, metric_type, milvus::DataType::VECTOR_SPARSE_FLOAT); + auto xb_data = dataset.get_col>( + milvus::FieldId(100)); + CDataType dtype = SparseFloatVector; + CIndex index; + CStatus status; + CBinarySet binary_set; + CIndex copy_index; + + { + status = CreateIndexV0( + dtype, type_params_str.c_str(), index_params_str.c_str(), &index); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = BuildSparseFloatVecIndex( + index, + NB, + kTestSparseDim, + static_cast( + static_cast(xb_data.data()))); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = SerializeIndexToBinarySet(index, &binary_set); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = CreateIndexV0(dtype, + type_params_str.c_str(), + index_params_str.c_str(), + ©_index); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = LoadIndexFromBinarySet(copy_index, binary_set); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = DeleteIndex(index); + ASSERT_EQ(milvus::Success, status.error_code); + } + { + status = DeleteIndex(copy_index); + ASSERT_EQ(milvus::Success, status.error_code); + } + { DeleteBinarySet(binary_set); } +} + TEST(Float16VecIndex, All) { auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ; auto metric_type = knowhere::metric::L2; diff --git a/internal/core/unittest/test_index_wrapper.cpp b/internal/core/unittest/test_index_wrapper.cpp index 1b5de55a2b..22529f31d5 100644 --- a/internal/core/unittest/test_index_wrapper.cpp +++ b/internal/core/unittest/test_index_wrapper.cpp @@ -59,35 +59,23 @@ class IndexWrapperTest : public ::testing::TestWithParam { search_conf = generate_search_conf(index_type, metric_type); - std::map is_binary_map = { - {knowhere::IndexEnum::INDEX_FAISS_IDMAP, false}, - {knowhere::IndexEnum::INDEX_FAISS_IVFPQ, false}, - {knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, false}, - {knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, false}, - {knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, true}, - {knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, true}, - {knowhere::IndexEnum::INDEX_HNSW, false}, + std::map index_to_vec_type = { + {knowhere::IndexEnum::INDEX_FAISS_IDMAP, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_FAISS_IVFPQ, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, + DataType::VECTOR_BINARY}, + {knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, + DataType::VECTOR_BINARY}, + {knowhere::IndexEnum::INDEX_HNSW, DataType::VECTOR_FLOAT}, + {knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + DataType::VECTOR_SPARSE_FLOAT}, + {knowhere::IndexEnum::INDEX_SPARSE_WAND, + DataType::VECTOR_SPARSE_FLOAT}, }; - is_binary = is_binary_map[index_type]; - if (is_binary) { - vec_field_data_type = DataType::VECTOR_BINARY; - } else { - vec_field_data_type = DataType::VECTOR_FLOAT; - } - - auto dataset = GenDataset(NB, metric_type, is_binary); - if (!is_binary) { - xb_data = dataset.get_col(milvus::FieldId(100)); - xb_dataset = knowhere::GenDataSet(NB, DIM, xb_data.data()); - xq_dataset = knowhere::GenDataSet( - NQ, DIM, xb_data.data() + DIM * query_offset); - } else { - xb_bin_data = dataset.get_col(milvus::FieldId(100)); - xb_dataset = knowhere::GenDataSet(NB, DIM, xb_bin_data.data()); - xq_dataset = knowhere::GenDataSet( - NQ, DIM, xb_bin_data.data() + DIM * query_offset); - } + vec_field_data_type = index_to_vec_type[index_type]; } void @@ -101,18 +89,13 @@ class IndexWrapperTest : public ::testing::TestWithParam { std::string type_params_str, index_params_str; Config config; milvus::Config search_conf; - bool is_binary; DataType vec_field_data_type; - knowhere::DataSetPtr xb_dataset; - FixedVector xb_data; - FixedVector xb_bin_data; - knowhere::DataSetPtr xq_dataset; - int64_t query_offset = 100; - int64_t NB = 10000; + int64_t query_offset = 1; + int64_t NB = 10; StorageConfig storage_config_; }; -INSTANTIATE_TEST_CASE_P( +INSTANTIATE_TEST_SUITE_P( IndexTypeParameters, IndexWrapperTest, ::testing::Values( @@ -126,7 +109,11 @@ INSTANTIATE_TEST_CASE_P( knowhere::metric::JACCARD), std::pair(knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, knowhere::metric::JACCARD), - std::pair(knowhere::IndexEnum::INDEX_HNSW, knowhere::metric::L2))); + std::pair(knowhere::IndexEnum::INDEX_HNSW, knowhere::metric::L2), + std::pair(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, + knowhere::metric::IP), + std::pair(knowhere::IndexEnum::INDEX_SPARSE_WAND, + knowhere::metric::IP))); TEST_P(IndexWrapperTest, BuildAndQuery) { milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100}; @@ -139,20 +126,29 @@ TEST_P(IndexWrapperTest, BuildAndQuery) { std::to_string(knowhere::Version::GetCurrentVersion().VersionNumber()); auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( vec_field_data_type, config, file_manager_context); - - auto dataset = GenDataset(NB, metric_type, is_binary); knowhere::DataSetPtr xb_dataset; - FixedVector bin_vecs; - FixedVector f_vecs; - if (is_binary) { - bin_vecs = dataset.get_col(milvus::FieldId(100)); + if (vec_field_data_type == DataType::VECTOR_BINARY) { + auto dataset = GenDataset(NB, metric_type, true); + auto bin_vecs = dataset.get_col(milvus::FieldId(100)); xb_dataset = knowhere::GenDataSet(NB, DIM, bin_vecs.data()); + ASSERT_NO_THROW(index->Build(xb_dataset)); + } else if (vec_field_data_type == DataType::VECTOR_SPARSE_FLOAT) { + auto dataset = GenDatasetWithDataType( + NB, metric_type, milvus::DataType::VECTOR_SPARSE_FLOAT); + auto sparse_vecs = dataset.get_col>( + milvus::FieldId(100)); + xb_dataset = + knowhere::GenDataSet(NB, kTestSparseDim, sparse_vecs.data()); + xb_dataset->SetIsSparse(true); + ASSERT_NO_THROW(index->Build(xb_dataset)); } else { - f_vecs = dataset.get_col(milvus::FieldId(100)); + // VECTOR_FLOAT + auto dataset = GenDataset(NB, metric_type, false); + auto f_vecs = dataset.get_col(milvus::FieldId(100)); xb_dataset = knowhere::GenDataSet(NB, DIM, f_vecs.data()); + ASSERT_NO_THROW(index->Build(xb_dataset)); } - ASSERT_NO_THROW(index->Build(xb_dataset)); auto binary_set = index->Serialize(); FixedVector index_files; for (auto& binary : binary_set.binary_map_) { @@ -164,21 +160,53 @@ TEST_P(IndexWrapperTest, BuildAndQuery) { vec_field_data_type, config, file_manager_context); auto vec_index = static_cast(copy_index.get()); - ASSERT_EQ(vec_index->dim(), DIM); + if (vec_field_data_type != DataType::VECTOR_SPARSE_FLOAT) { + ASSERT_EQ(vec_index->dim(), DIM); + } ASSERT_NO_THROW(vec_index->Load(binary_set)); + if (vec_field_data_type == DataType::VECTOR_SPARSE_FLOAT) { + // TODO(SPARSE): complete test in PR adding search/query to sparse + // float vector. + return; + } + milvus::SearchInfo search_info; search_info.topk_ = K; search_info.metric_type_ = metric_type; search_info.search_params_ = search_conf; - auto result = vec_index->Query(xq_dataset, search_info, nullptr); + std::unique_ptr result; + if (vec_field_data_type == DataType::VECTOR_FLOAT) { + auto dataset = GenDataset(NB, metric_type, false); + auto xb_data = dataset.get_col(milvus::FieldId(100)); + auto xb_dataset = knowhere::GenDataSet(NB, DIM, xb_data.data()); + auto xq_dataset = + knowhere::GenDataSet(NQ, DIM, xb_data.data() + DIM * query_offset); + result = vec_index->Query(xq_dataset, search_info, nullptr); + } else if (vec_field_data_type == DataType::VECTOR_SPARSE_FLOAT) { + auto dataset = GenDatasetWithDataType( + NQ, metric_type, milvus::DataType::VECTOR_SPARSE_FLOAT); + auto xb_data = dataset.get_col>( + milvus::FieldId(100)); + auto xq_dataset = + knowhere::GenDataSet(NQ, kTestSparseDim, xb_data.data()); + xq_dataset->SetIsSparse(true); + result = vec_index->Query(xq_dataset, search_info, nullptr); + } else { + auto dataset = GenDataset(NB, metric_type, true); + auto xb_bin_data = dataset.get_col(milvus::FieldId(100)); + auto xb_dataset = knowhere::GenDataSet(NB, DIM, xb_bin_data.data()); + auto xq_dataset = knowhere::GenDataSet( + NQ, DIM, xb_bin_data.data() + DIM * query_offset); + result = vec_index->Query(xq_dataset, search_info, nullptr); + } EXPECT_EQ(result->total_nq_, NQ); EXPECT_EQ(result->unity_topK_, K); EXPECT_EQ(result->distances_.size(), NQ * K); EXPECT_EQ(result->seg_offsets_.size(), NQ * K); - if (!is_binary) { + if (vec_field_data_type == DataType::VECTOR_FLOAT) { EXPECT_EQ(result->seg_offsets_[0], query_offset); } } diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index 3a8efc18d6..dd1cfdf68c 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -383,7 +383,7 @@ class IndexTest : public ::testing::TestWithParam { StorageConfig storage_config_; }; -INSTANTIATE_TEST_CASE_P( +INSTANTIATE_TEST_SUITE_P( IndexTypeParameters, IndexTest, ::testing::Values( @@ -990,7 +990,7 @@ TEST(Indexing, SearchDiskAnnWithInvalidParam) { // boost::filesystem::path mmap_file_path; //}; // -//INSTANTIATE_TEST_CASE_P( +//INSTANTIATE_TEST_SUITE_P( // IndexTypeParameters, // IndexTestV2, // testing::Combine( diff --git a/internal/core/unittest/test_range_search_sort.cpp b/internal/core/unittest/test_range_search_sort.cpp index ac8208c7a9..bc95badde0 100644 --- a/internal/core/unittest/test_range_search_sort.cpp +++ b/internal/core/unittest/test_range_search_sort.cpp @@ -157,12 +157,12 @@ class RangeSearchSortTest float dist_min = 0.0, dist_max = 100.0; }; -INSTANTIATE_TEST_CASE_P(RangeSearchSortParameters, - RangeSearchSortTest, - ::testing::Values(knowhere::metric::L2, - knowhere::metric::IP, - knowhere::metric::JACCARD, - knowhere::metric::HAMMING)); +INSTANTIATE_TEST_SUITE_P(RangeSearchSortParameters, + RangeSearchSortTest, + ::testing::Values(knowhere::metric::L2, + knowhere::metric::IP, + knowhere::metric::JACCARD, + knowhere::metric::HAMMING)); TEST_P(RangeSearchSortTest, CheckRangeSearchSort) { auto res = milvus::ReGenRangeSearchResult(dataset, TOPK, N, metric_type); diff --git a/internal/core/unittest/test_retrieve.cpp b/internal/core/unittest/test_retrieve.cpp index 2115b6086a..0139d2e7c1 100644 --- a/internal/core/unittest/test_retrieve.cpp +++ b/internal/core/unittest/test_retrieve.cpp @@ -15,7 +15,6 @@ #include "knowhere/comp/index_param.h" #include "query/Expr.h" #include "query/ExprImpl.h" -#include "segcore/ScalarIndex.h" #include "test_utils/DataGen.h" #include "exec/expression/Expr.h" #include "plan/PlanNode.h" @@ -30,32 +29,6 @@ RetrieveUsingDefaultOutputSize(SegmentInterface* segment, return segment->Retrieve(plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE); } -TEST(Retrieve, ScalarIndex) { - SUCCEED(); - auto index = std::make_unique(); - std::vector data; - int N = 1000; - auto req_ids = std::make_unique(); - auto req_ids_arr = req_ids->mutable_int_id(); - - for (int i = 0; i < N; ++i) { - data.push_back(i * 3 % N); - req_ids_arr->add_data(i); - } - index->append_data(data.data(), N, SegOffset(10000)); - index->build(); - - auto [res_ids, res_offsets] = index->do_search_ids(*req_ids); - auto res_ids_arr = res_ids->int_id(); - - for (int i = 0; i < N; ++i) { - auto res_offset = res_offsets[i].get() - 10000; - auto res_id = res_ids_arr.data(i); - auto std_id = (res_offset * 3 % N); - ASSERT_EQ(res_id, std_id); - } -} - TEST(Retrieve, AutoID) { auto schema = std::make_shared(); auto fid_64 = schema->AddDebugField("i64", DataType::INT64); diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index 81ff9f8d3d..ed247d3867 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -9,6 +9,10 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include +#include +#include + #include #include #include @@ -19,6 +23,8 @@ #include "common/Types.h" #include "common/Utils.h" #include "common/Exception.h" +#include "knowhere/sparse_utils.h" +#include "pb/schema.pb.h" #include "query/Utils.h" #include "test_utils/DataGen.h" @@ -131,8 +137,7 @@ TEST(Util, upper_bound) { std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; ConcurrentVector timestamps(1); - timestamps.grow_to_at_least(data.size()); - timestamps.set_data(0, data.data(), data.size()); + timestamps.set_data_raw(0, data.data(), data.size()); ASSERT_EQ(1, upper_bound(timestamps, 0, data.size(), 0)); ASSERT_EQ(5, upper_bound(timestamps, 0, data.size(), 4)); @@ -206,4 +211,4 @@ TEST(Util, get_common_prefix) { str2 = ""; common_prefix = milvus::GetCommonPrefix(str1, str2); EXPECT_STREQ(common_prefix.c_str(), ""); -} \ No newline at end of file +} diff --git a/internal/core/unittest/test_utils/Constants.h b/internal/core/unittest/test_utils/Constants.h index 190853a968..dfeae7b77f 100644 --- a/internal/core/unittest/test_utils/Constants.h +++ b/internal/core/unittest/test_utils/Constants.h @@ -13,3 +13,6 @@ constexpr int64_t TestChunkSize = 32 * 1024; constexpr char TestLocalPath[] = "/tmp/milvus/local_data/"; constexpr char TestRemotePath[] = "/tmp/milvus/remote_data"; + +constexpr int64_t kTestSparseDim = 10000; +constexpr float kTestSparseVectorDensity = 0.0003; diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 784446d510..2350cca978 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -16,7 +16,9 @@ #include #include #include +#include #include +#include #include "Constants.h" #include "common/EasyAssert.h" @@ -42,7 +44,7 @@ namespace milvus::segcore { struct GeneratedData { std::vector row_ids_; std::vector timestamps_; - InsertData* raw_; + InsertRecordProto* raw_; std::vector field_ids; SchemaPtr schema_; @@ -92,7 +94,8 @@ struct GeneratedData { } auto& field_meta = schema_->operator[](field_id); - if (field_meta.is_vector()) { + if (field_meta.is_vector() && + field_meta.get_data_type() != DataType::VECTOR_SPARSE_FLOAT) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { int len = raw_->num_rows() * field_meta.get_dim(); ret.resize(len); @@ -111,7 +114,6 @@ struct GeneratedData { std::copy_n(src_data, len, ret.data()); } else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) { - // int len = raw_->num_rows() * field_meta.get_dim() * sizeof(float16); int len = raw_->num_rows() * field_meta.get_dim(); ret.resize(len); auto src_data = reinterpret_cast( @@ -119,7 +121,6 @@ struct GeneratedData { std::copy_n(src_data, len, ret.data()); } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { - // int len = raw_->num_rows() * field_meta.get_dim() * sizeof(bfloat16); int len = raw_->num_rows() * field_meta.get_dim(); ret.resize(len); auto src_data = reinterpret_cast( @@ -131,7 +132,13 @@ struct GeneratedData { return std::move(ret); } - if constexpr (std::is_same_v) { + if constexpr (std::is_same_v>) { + auto sparse_float_array = + target_field_data.vectors().sparse_float_vector(); + auto rows = SparseBytesToRows(sparse_float_array.contents()); + std::copy_n(rows.get(), raw_->num_rows(), ret.data()); + } else if constexpr (std::is_same_v) { auto ret_data = reinterpret_cast(ret.data()); auto src_data = target_field_data.scalars().array_data().data(); std::copy(src_data.begin(), src_data.end(), ret_data); @@ -238,19 +245,61 @@ struct GeneratedData { int array_len); }; -inline GeneratedData -DataGen(SchemaPtr schema, - int64_t N, - uint64_t seed = 42, - uint64_t ts_offset = 0, - int repeat_count = 1, - int array_len = 10) { +inline std::unique_ptr[]> +GenerateRandomSparseFloatVector(size_t rows, + size_t cols, + float density, + int seed = 42) { + int32_t num_elements = static_cast(rows * cols * density); + + std::mt19937 rng(seed); + auto real_distrib = std::uniform_real_distribution(0, 1); + auto row_distrib = std::uniform_int_distribution(0, rows - 1); + auto col_distrib = std::uniform_int_distribution(0, cols - 1); + + std::vector> data(rows); + + for (int32_t i = 0; i < num_elements; ++i) { + auto row = row_distrib(rng); + while (data[row].size() == (size_t)cols) { + row = row_distrib(rng); + } + auto col = col_distrib(rng); + while (data[row].find(col) != data[row].end()) { + col = col_distrib(rng); + } + auto val = real_distrib(rng); + data[row][col] = val; + } + + auto tensor = std::make_unique[]>(rows); + + for (int32_t i = 0; i < rows; ++i) { + if (data[i].size() == 0) { + continue; + } + knowhere::sparse::SparseRow row(data[i].size()); + size_t j = 0; + for (auto& [idx, val] : data[i]) { + row.set_at(j++, idx, val); + } + tensor[i] = std::move(row); + } + return tensor; +} + +inline GeneratedData DataGen(SchemaPtr schema, + int64_t N, + uint64_t seed = 42, + uint64_t ts_offset = 0, + int repeat_count = 1, + int array_len = 10) { using std::vector; std::default_random_engine er(seed); std::normal_distribution<> distr(0, 1); int offset = 0; - auto insert_data = std::make_unique(); + auto insert_data = std::make_unique(); auto insert_cols = [&insert_data]( auto& data, int64_t count, auto& field_meta) { auto array = milvus::segcore::CreateDataArrayFrom( @@ -309,6 +358,15 @@ DataGen(SchemaPtr schema, insert_cols(final, N, field_meta); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + auto res = GenerateRandomSparseFloatVector( + N, kTestSparseDim, kTestSparseVectorDensity, seed); + auto array = milvus::segcore::CreateDataArrayFrom( + res.get(), N, field_meta); + insert_data->mutable_fields_data()->AddAllocated( + array.release()); + break; + } case DataType::VECTOR_BFLOAT16: { auto dim = field_meta.get_dim(); @@ -526,7 +584,7 @@ DataGenForJsonArray(SchemaPtr schema, std::default_random_engine er(seed); std::normal_distribution<> distr(0, 1); - auto insert_data = std::make_unique(); + auto insert_data = std::make_unique(); auto insert_cols = [&insert_data]( auto& data, int64_t count, auto& field_meta) { auto array = milvus::segcore::CreateDataArrayFrom( @@ -777,6 +835,23 @@ CreateBFloat16PlaceholderGroupFromBlob(int64_t num_queries, return raw_group; } +inline auto +CreateSparseFloatPlaceholderGroup(int64_t num_queries, int64_t seed = 42) { + namespace ser = milvus::proto::common; + ser::PlaceholderGroup raw_group; + auto value = raw_group.add_placeholders(); + + value->set_tag("$0"); + value->set_type(ser::PlaceholderType::SparseFloatVector); + auto sparse_vecs = GenerateRandomSparseFloatVector( + num_queries, kTestSparseDim, kTestSparseVectorDensity, seed); + for (int i = 0; i < num_queries; ++i) { + value->add_values(sparse_vecs[i].data(), + sparse_vecs[i].data_byte_size()); + } + return raw_group; +} + inline auto SearchResultToVector(const SearchResult& sr) { int64_t num_queries = sr.total_nq_; @@ -850,6 +925,12 @@ CreateFieldDataFromDataArray(ssize_t raw_count, createFieldData(raw_data, DataType::VECTOR_BFLOAT16, dim); break; } + case DataType::VECTOR_SPARSE_FLOAT: { + auto sparse_float_array = data->vectors().sparse_float_vector(); + auto rows = SparseBytesToRows(sparse_float_array.contents()); + createFieldData(rows.get(), DataType::VECTOR_SPARSE_FLOAT, 0); + break; + } default: { PanicInfo(Unsupported, "unsupported"); } diff --git a/internal/core/unittest/test_utils/c_api_test_utils.h b/internal/core/unittest/test_utils/c_api_test_utils.h index e57cb2615e..1e9c975fe6 100644 --- a/internal/core/unittest/test_utils/c_api_test_utils.h +++ b/internal/core/unittest/test_utils/c_api_test_utils.h @@ -37,31 +37,6 @@ using namespace milvus; using namespace milvus::segcore; namespace { -const char* -get_default_schema_config() { - static std::string conf = R"(name: "default-collection" - fields: < - fieldID: 100 - name: "fakevec" - data_type: FloatVector - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID: 101 - name: "age" - data_type: Int64 - is_primary_key: true - >)"; - static std::string fake_conf = ""; - return conf.c_str(); -} std::string generate_max_float_query_data(int all_nq, int max_float_nq) { diff --git a/internal/core/unittest/test_utils/indexbuilder_test_utils.h b/internal/core/unittest/test_utils/indexbuilder_test_utils.h index b3c4dda99a..fc1f3b67fc 100644 --- a/internal/core/unittest/test_utils/indexbuilder_test_utils.h +++ b/internal/core/unittest/test_utils/indexbuilder_test_utils.h @@ -98,6 +98,11 @@ generate_build_conf(const milvus::IndexType& index_type, {milvus::index::DISK_ANN_BUILD_DRAM_BUDGET, std::to_string(32)}, {milvus::index::DISK_ANN_BUILD_THREAD_NUM, std::to_string(2)}, }; + } else if (index_type == knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || + index_type == knowhere::IndexEnum::INDEX_SPARSE_WAND) { + return knowhere::Json{ + {knowhere::meta::METRIC_TYPE, metric_type}, + }; } return knowhere::Json(); } @@ -235,6 +240,10 @@ GenDatasetWithDataType(int64_t N, schema->AddDebugField( "fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type); return milvus::segcore::DataGen(schema, N); + } else if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) { + schema->AddDebugField( + "fakevec", milvus::DataType::VECTOR_SPARSE_FLOAT, 0, metric_type); + return milvus::segcore::DataGen(schema, N); } else { schema->AddDebugField( "fakebinvec", milvus::DataType::VECTOR_BINARY, dim, metric_type);