From 86687bd8ed58b3f481224d5988e4660420a9976b Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 28 Oct 2024 15:19:30 +0800 Subject: [PATCH] enhance: Refine code for get_deleted_bitmap (#36819) issue: #33744 Check whether the PK is truly sorted in the debug model. --------- Signed-off-by: Cai Zhang --- internal/core/src/segcore/CMakeLists.txt | 8 + .../core/src/segcore/SegmentSealedImpl.cpp | 137 +++++++----------- internal/core/src/segcore/SegmentSealedImpl.h | 6 - internal/core/src/segcore/Utils.h | 18 ++- 4 files changed, 74 insertions(+), 95 deletions(-) diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index 63eec8e63d..e2674454ac 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -12,3 +12,11 @@ add_source_at_current_directory_recursively() add_library(milvus_segcore OBJECT ${SOURCE_FILES}) + +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + set(CHECK_SORTED ON) +else() + set(CHECK_SORTED OFF) +endif() + +add_definitions(-DCHECK_SORTED=${CHECK_SORTED}) \ No newline at end of file diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index dea5741918..4371735fa5 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -54,6 +54,41 @@ namespace milvus::segcore { +#ifdef CHECK_SORTED +#define ASSERT_COLUMN_ORDERED(data_type, column) \ + { \ + switch (data_type) { \ + case DataType::INT64: { \ + auto col = \ + std::dynamic_pointer_cast(column); \ + auto pks = reinterpret_cast(col->Data()); \ + for (int i = 1; i < col->NumRows(); ++i) { \ + assert(pks[i - 1] <= pks[i] && \ + "INT64 Column is not ordered!"); \ + } \ + break; \ + } \ + case DataType::VARCHAR: { \ + auto col = std::dynamic_pointer_cast< \ + SingleChunkVariableColumn>(column); \ + auto pks = col->Views(); \ + for (int i = 1; i < col->NumRows(); ++i) { \ + assert(pks[i - 1] <= pks[i] && \ + "VARCHAR Column is not ordered!"); \ + } \ + break; \ + } \ + default: { \ + PanicInfo(DataTypeInvalid, \ + fmt::format("unsupported primary key data type", \ + data_type)); \ + } \ + } \ + } +#else +#define ASSERT_COLUMN_ORDERED(data_type, column) ((void)0) +#endif + static inline void set_bit(BitsetType& bitset, FieldId field_id, bool flag = true) { auto pos = field_id.get() - START_USER_FIELDID; @@ -458,11 +493,15 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { // set pks to offset // if the segments are already sorted by pk, there is no need to build a pk offset index. // it can directly perform a binary search on the pk column. - if (schema_->get_primary_field_id() == field_id && !is_sorted_by_pk_) { - AssertInfo(field_id.get() != -1, "Primary key is -1"); - AssertInfo(insert_record_.empty_pks(), "already exists"); - insert_record_.insert_pks(data_type, column); - insert_record_.seal_pks(); + if (schema_->get_primary_field_id() == field_id) { + if (!is_sorted_by_pk_) { + AssertInfo(field_id.get() != -1, "Primary key is -1"); + AssertInfo(insert_record_.empty_pks(), "already exists"); + insert_record_.insert_pks(data_type, column); + insert_record_.seal_pks(); + } else { + ASSERT_COLUMN_ORDERED(data_type, column); + } } bool use_temp_index = false; @@ -889,74 +928,6 @@ SegmentSealedImpl::search_pk(const PkType& pk, int64_t insert_barrier) const { return pk_offsets; } -std::shared_ptr -SegmentSealedImpl::get_deleted_bitmap_s(int64_t del_barrier, - int64_t insert_barrier, - DeletedRecord& delete_record, - Timestamp query_timestamp) const { - // if insert_barrier and del_barrier have not changed, use cache data directly - bool hit_cache = false; - int64_t old_del_barrier = 0; - auto current = delete_record.clone_lru_entry( - insert_barrier, del_barrier, old_del_barrier, hit_cache); - if (hit_cache) { - return current; - } - - auto bitmap = current->bitmap_ptr; - - int64_t start, end; - if (del_barrier < old_del_barrier) { - // in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp - // so these deletion records do not take effect in query/search - // so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0 - // for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0] - start = del_barrier; - end = old_del_barrier; - } else { - // the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier] - // for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0] - start = old_del_barrier; - end = del_barrier; - } - - // Avoid invalid calculations when there are a lot of repeated delete pks - std::unordered_map delete_timestamps; - for (auto del_index = start; del_index < end; ++del_index) { - auto pk = delete_record.pks()[del_index]; - auto timestamp = delete_record.timestamps()[del_index]; - - delete_timestamps[pk] = timestamp > delete_timestamps[pk] - ? timestamp - : delete_timestamps[pk]; - } - - for (auto& [pk, timestamp] : delete_timestamps) { - auto segOffsets = search_pk(pk, insert_barrier); - for (auto offset : segOffsets) { - int64_t insert_row_offset = offset.get(); - - // The deletion record do not take effect in search/query, - // and reset bitmap to 0 - if (timestamp > query_timestamp) { - bitmap->reset(insert_row_offset); - continue; - } - // Insert after delete with same pk, delete will not task effect on this insert record, - // and reset bitmap to 0 - if (insert_record_.timestamps_[offset.get()] >= timestamp) { - bitmap->reset(insert_row_offset); - continue; - } - // insert data corresponding to the insert_row_offset will be ignored in search/query - bitmap->set(insert_row_offset); - } - } - - delete_record.insert_lru_entry(current); - return current; -} - void SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset, int64_t ins_barrier, @@ -968,16 +939,16 @@ SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset, auto bitmap_holder = std::shared_ptr(); - if (!is_sorted_by_pk_) { - bitmap_holder = get_deleted_bitmap(del_barrier, - ins_barrier, - deleted_record_, - insert_record_, - timestamp); - } else { - bitmap_holder = get_deleted_bitmap_s( - del_barrier, ins_barrier, deleted_record_, timestamp); - } + auto search_fn = [this](const PkType& pk, int64_t barrier) { + return this->search_pk(pk, barrier); + }; + bitmap_holder = get_deleted_bitmap(del_barrier, + ins_barrier, + deleted_record_, + insert_record_, + timestamp, + is_sorted_by_pk_, + search_fn); if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { return; diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 1c07c1047a..d5d6339925 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -121,12 +121,6 @@ class SegmentSealedImpl : public SegmentSealed { std::vector search_pk(const PkType& pk, int64_t insert_barrier) const; - std::shared_ptr - get_deleted_bitmap_s(int64_t del_barrier, - int64_t insert_barrier, - DeletedRecord& delete_record, - Timestamp query_timestamp) const; - std::unique_ptr get_vector(FieldId field_id, const int64_t* ids, diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index 226e0da644..ed6336aa9d 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -110,11 +110,15 @@ MergeDataArray(std::vector& merge_bases, template std::shared_ptr -get_deleted_bitmap(int64_t del_barrier, - int64_t insert_barrier, - DeletedRecord& delete_record, - const InsertRecord& insert_record, - Timestamp query_timestamp) { +get_deleted_bitmap( + int64_t del_barrier, + int64_t insert_barrier, + DeletedRecord& delete_record, + const InsertRecord& insert_record, + Timestamp query_timestamp, + bool is_sorted_by_pk = false, + const std::function(const PkType&, int64_t)>& + search_fn = nullptr) { // if insert_barrier and del_barrier have not changed, use cache data directly bool hit_cache = false; int64_t old_del_barrier = 0; @@ -153,7 +157,9 @@ get_deleted_bitmap(int64_t del_barrier, } for (auto& [pk, timestamp] : delete_timestamps) { - auto segOffsets = insert_record.search_pk(pk, insert_barrier); + auto segOffsets = is_sorted_by_pk + ? search_fn(pk, insert_barrier) + : insert_record.search_pk(pk, insert_barrier); for (auto offset : segOffsets) { int64_t insert_row_offset = offset.get();