From 8a37d15f48d1fa3bb7fca044232185439a8fce3b Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Thu, 10 Sep 2020 15:06:20 +0800 Subject: [PATCH] Support Deleted Record Signed-off-by: FluorineDog --- core/src/dog_segment/SegmentNaive.cpp | 131 +++++++++++++++++++++----- core/src/dog_segment/SegmentNaive.h | 34 ++++++- 2 files changed, 142 insertions(+), 23 deletions(-) diff --git a/core/src/dog_segment/SegmentNaive.cpp b/core/src/dog_segment/SegmentNaive.cpp index 020ab58b6e..b807341123 100644 --- a/core/src/dog_segment/SegmentNaive.cpp +++ b/core/src/dog_segment/SegmentNaive.cpp @@ -37,7 +37,71 @@ SegmentNaive::PreInsert(int64_t size) { int64_t SegmentNaive::PreDelete(int64_t size) { - throw std::runtime_error("unimplemented"); + auto reserved_begin = deleted_record_.reserved.fetch_add(size); + return reserved_begin; +} + +auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier) -> std::shared_ptr { + auto old = deleted_record_.get_lru_entry(); + if(old->del_barrier == del_barrier) { + return old; + } + auto current = std::make_shared(*old); + auto& vec = current->bitmap; + + if(del_barrier < old->del_barrier) { + for(auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) { + // get uid in delete logs + auto uid = deleted_record_.uids_[del_index]; + // map uid to corrensponding offsets, select the max one, which should be the target + // the max one should be closest to query_timestamp, so the delete log should refer to it + int64_t the_offset = -1; + auto [iter_b, iter_e] = uid2offset_.equal_range(uid); + for(auto iter = iter_b; iter != iter_e; ++iter) { + auto offset = iter->second; + if(record_.timestamps_[offset] < query_timestamp) { + assert(offset < vec.size()); + the_offset = std::max(the_offset, offset); + } + } + // if not found, skip + if(the_offset == -1) { + continue; + } + // otherwise, clear the flag + vec[the_offset] = false; + } + return current; + } else { + for(auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) { + // get uid in delete logs + auto uid = deleted_record_.uids_[del_index]; + // map uid to corrensponding offsets, select the max one, which should be the target + // the max one should be closest to query_timestamp, so the delete log should refer to it + int64_t the_offset = -1; + auto [iter_b, iter_e] = uid2offset_.equal_range(uid); + for(auto iter = iter_b; iter != iter_e; ++iter) { + auto offset = iter->second; + if(offset >= insert_barrier){ + continue; + } + if(record_.timestamps_[offset] < query_timestamp) { + assert(offset < vec.size()); + the_offset = std::max(the_offset, offset); + } + } + + // if not found, skip + if(the_offset == -1) { + continue; + } + + // otherwise, set the flag + vec[the_offset] = true; + } + } + + return current; } Status @@ -88,7 +152,13 @@ SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t* uids_r record_.entity_vec_[fid]->set_data_raw(reserved_begin, entities[fid].data(), size); } - record_.ack_responder_.AddSegment(reserved_begin, size); + for(int i = 0; i < uids.size(); ++i) { + auto uid = uids[i]; + // NOTE: this must be the last step, cannot be put above + uid2offset_.insert(std::make_pair(uid, reserved_begin + i)); + } + + record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); return Status::OK(); // std::thread go(executor, std::move(uids), std::move(timestamps), std::move(entities)); @@ -123,8 +193,26 @@ SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t* uids_r } Status -SegmentNaive::Delete(int64_t reserved_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) { - throw std::runtime_error("unimplemented"); +SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t* uids_raw, const Timestamp* timestamps_raw) { + std::vector> ordering; + ordering.resize(size); + // #pragma omp parallel for + for (int i = 0; i < size; ++i) { + ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i]); + } + std::sort(ordering.begin(), ordering.end()); + std::vector uids(size); + std::vector timestamps(size); + // #pragma omp parallel for + for (int index = 0; index < size; ++index) { + auto [t, uid] = ordering[index]; + timestamps[index] = t; + uids[index] = uid; + } + deleted_record_.timestamps_.set_data(reserved_begin, timestamps.data(), size); + deleted_record_.uids_.set_data(reserved_begin, uids.data(), size); + deleted_record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); + return Status::OK(); // for (int i = 0; i < size; ++i) { // auto key = primary_keys[i]; // auto time = timestamps[i]; @@ -171,6 +259,22 @@ SegmentNaive::QueryImpl(const query::QueryPtr& query, Timestamp timestamp, Query // return Status::OK(); } +template +int64_t get_barrier(const RecordType& record, Timestamp timestamp) { + auto& vec = record.timestamps_; + int64_t beg = 0; + int64_t end = record.ack_responder_.GetAck(); + while (beg < end) { + auto mid = (beg + end) / 2; + if (vec[mid] < timestamp) { + end = mid; + } else { + beg = mid + 1; + } + } + return beg; +} + Status SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult& result) { // TODO: enable delete @@ -198,24 +302,7 @@ SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult auto topK = query_info->topK; auto num_queries = query_info->num_queries; - - int64_t barrier = [&] - { - auto& vec = record_.timestamps_; - int64_t beg = 0; - int64_t end = record_.ack_responder_.GetAck(); - while (beg < end) { - auto mid = (beg + end) / 2; - if (vec[mid] < timestamp) { - end = mid; - } else { - beg = mid + 1; - } - - } - return beg; - }(); - + auto barrier = get_barrier(record_, timestamp); if(topK > barrier) { topK = barrier; diff --git a/core/src/dog_segment/SegmentNaive.h b/core/src/dog_segment/SegmentNaive.h index 5b32ede440..176ebe4fd8 100644 --- a/core/src/dog_segment/SegmentNaive.h +++ b/core/src/dog_segment/SegmentNaive.h @@ -105,6 +105,38 @@ class SegmentNaive : public SegmentBase { Record(const Schema& schema); }; + tbb::concurrent_unordered_multimap uid2offset_; + + struct DeletedRecord { + std::atomic reserved = 0; + AckResponder ack_responder_; + ConcurrentVector timestamps_; + ConcurrentVector uids_; + struct TmpBitmap { + // Just for query + int64_t del_barrier = 0; + std::vector bitmap; + }; + std::shared_ptr lru_; + std::shared_mutex shared_mutex_; + + DeletedRecord(): lru_(std::make_shared()) {} + auto get_lru_entry() { + std::shared_lock lck(shared_mutex_); + return lru_; + } + void insert_lru_entry(std::shared_ptr new_entry) { + std::lock_guard lck(shared_mutex_); + if(new_entry->del_barrier <= lru_->del_barrier) { + // DO NOTHING + return; + } + lru_ = std::move(new_entry); + } + }; + + std::shared_ptr get_deleted_bitmap(int64_t barrier, Timestamp query_timestamp, int64_t insert_barrier); + Status QueryImpl(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results); @@ -134,7 +166,7 @@ class SegmentNaive : public SegmentBase { IndexMetaPtr index_meta_; std::atomic state_ = SegmentState::Open; Record record_; - + DeletedRecord deleted_record_; // tbb::concurrent_unordered_map internal_indexes_; // std::shared_ptr record_mutable_; // // to determined that if immutable data if available