diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 246c70aa41..d5cdb7e686 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -72,9 +72,6 @@ using VectorArray = proto::schema::VectorField; using IdArray = proto::schema::IDs; using InsertData = proto::segcore::InsertRecord; using PkType = std::variant; -// tbb::concurrent_unordered_multimap equal_range too slow when multi repeated key -// using Pk2OffsetType = tbb::concurrent_unordered_multimap>; -using Pk2OffsetType = std::unordered_map, std::hash>; inline bool IsPrimaryKeyDataType(DataType data_type) { diff --git a/internal/core/src/segcore/InsertRecord.cpp b/internal/core/src/segcore/InsertRecord.cpp index 7961bbf2bf..10586c54ad 100644 --- a/internal/core/src/segcore/InsertRecord.cpp +++ b/internal/core/src/segcore/InsertRecord.cpp @@ -9,16 +9,30 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include #include "InsertRecord.h" namespace milvus::segcore { InsertRecord::InsertRecord(const Schema& schema, int64_t size_per_chunk) : row_ids_(size_per_chunk), timestamps_(size_per_chunk) { + std::optional pk_field_id = schema.get_primary_field_id(); + for (auto& field : schema) { auto field_id = field.first; auto& field_meta = field.second; - + if (pk2offset_ == nullptr && pk_field_id.has_value() && pk_field_id.value() == field_id) { + switch (field_meta.get_data_type()) { + case DataType::INT64: { + pk2offset_ = std::make_unique>(); + break; + } + case DataType::VARCHAR: { + pk2offset_ = std::make_unique>(); + break; + } + } + } if (field_meta.is_vector()) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { this->append_field_data(field_id, field_meta.get_dim(), size_per_chunk); diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index c6a4338f14..d17fdc5f70 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -24,6 +24,68 @@ namespace milvus::segcore { +class OffsetMap { + public: + virtual ~OffsetMap() = default; + + virtual std::vector + find_with_timestamp(const PkType pk, Timestamp timestamp, const ConcurrentVector& timestamps) const = 0; + + virtual std::vector + find_with_barrier(const PkType pk, int64_t barrier) const = 0; + + virtual void + insert(const PkType pk, int64_t offset) = 0; + + virtual bool + empty() const = 0; +}; + +template +class OffsetHashMap : public OffsetMap { + public: + std::vector + find_with_timestamp(const PkType pk, Timestamp timestamp, const ConcurrentVector& timestamps) const { + std::vector res_offsets; + auto offset_iter = map_.find(std::get(pk)); + if (offset_iter != map_.end()) { + for (auto offset : offset_iter->second) { + if (timestamps[offset] <= timestamp) { + res_offsets.push_back(SegOffset(offset)); + } + } + } + return res_offsets; + } + + std::vector + find_with_barrier(const PkType pk, int64_t barrier) const { + std::vector res_offsets; + auto offset_iter = map_.find(std::get(pk)); + if (offset_iter != map_.end()) { + for (auto offset : offset_iter->second) { + if (offset <= barrier) { + res_offsets.push_back(SegOffset(offset)); + } + } + } + return res_offsets; + } + + void + insert(const PkType pk, int64_t offset) { + map_[std::get(pk)].emplace_back(offset); + } + + bool + empty() const { + return map_.empty(); + } + + private: + std::unordered_map> map_; +}; + struct InsertRecord { ConcurrentVector timestamps_; ConcurrentVector row_ids_; @@ -36,52 +98,32 @@ struct InsertRecord { TimestampIndex timestamp_index_; // pks to row offset - Pk2OffsetType pk2offset_; + std::unique_ptr pk2offset_; explicit InsertRecord(const Schema& schema, int64_t size_per_chunk); std::vector search_pk(const PkType pk, Timestamp timestamp) const { std::shared_lock lck(shared_mutex_); - std::vector res_offsets; - auto offset_iter = pk2offset_.find(pk); - if (offset_iter != pk2offset_.end()) { - for (auto offset : offset_iter->second) { - if (timestamps_[offset] <= timestamp) { - res_offsets.push_back(SegOffset(offset)); - } - } - } - - return res_offsets; + return pk2offset_->find_with_timestamp(pk, timestamp, timestamps_); } std::vector search_pk(const PkType pk, int64_t insert_barrier) const { std::shared_lock lck(shared_mutex_); - std::vector res_offsets; - auto offset_iter = pk2offset_.find(pk); - if (offset_iter != pk2offset_.end()) { - for (auto offset : offset_iter->second) { - if (offset < insert_barrier) { - res_offsets.push_back(SegOffset(offset)); - } - } - } - - return res_offsets; + return pk2offset_->find_with_barrier(pk, insert_barrier); } void insert_pk(const PkType pk, int64_t offset) { std::lock_guard lck(shared_mutex_); - pk2offset_[pk].emplace_back(offset); + pk2offset_->insert(pk, offset); } bool empty_pks() const { std::shared_lock lck(shared_mutex_); - return pk2offset_.empty(); + return pk2offset_->empty(); } // get field data without knowing the type diff --git a/internal/core/unittest/test_segcore.cpp b/internal/core/unittest/test_segcore.cpp index 01da289b3c..81eb3be30f 100644 --- a/internal/core/unittest/test_segcore.cpp +++ b/internal/core/unittest/test_segcore.cpp @@ -79,3 +79,37 @@ TEST(SegmentCoreTest, SmallIndex) { schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); schema->AddDebugField("age", DataType::INT32); } + +TEST(OffsetMap, int64_t){ + using namespace milvus::segcore; + OffsetMap *map= new OffsetHashMap(); + map->insert(PkType(int64_t(10)), 3); + std::vector offset = map->find_with_barrier(PkType(int64_t(10)),10); + ASSERT_EQ(offset[0].get(), int64_t(3)); +} + +TEST(InsertRecordTest, int64_t){ + using namespace milvus::segcore; + auto schema = std::make_shared(); + schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + + auto record = milvus::segcore::InsertRecord(*schema, int64_t(32)); + record.insert_pk(PkType(int64_t(12)), int64_t(3)); + std::vector offset = record.search_pk(PkType(int64_t(12)), int64_t(10)); + ASSERT_EQ(offset[0].get(), int64_t(3)); +} + +TEST(InsertRecordTest, string){ + using namespace milvus::segcore; + auto schema = std::make_shared(); + schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("name", DataType::VARCHAR); + schema->set_primary_field_id(i64_fid); + + auto record = milvus::segcore::InsertRecord(*schema, int64_t(32)); + record.insert_pk(PkType(std::string("test")), int64_t(3)); + std::vector offset = record.search_pk(PkType(std::string("test")), int64_t(10)); + ASSERT_EQ(offset[0].get(), int64_t(3)); +} \ No newline at end of file