mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Optimize bitset usage (#26096)
Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
parent
0e103efc5d
commit
bafb183a2b
@ -188,15 +188,17 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
|
||||
return;
|
||||
}
|
||||
|
||||
bitset_holder.flip();
|
||||
bool false_filtered_out = false;
|
||||
if (GetExprUsePkIndex() && IsTermExpr(node.predicate_.value().get())) {
|
||||
segment->search_ids_filter(
|
||||
segment->timestamp_filter(
|
||||
bitset_holder, expr_cached_pk_id_offsets_, timestamp_);
|
||||
} else {
|
||||
segment->search_ids_filter(bitset_holder, timestamp_);
|
||||
bitset_holder.flip();
|
||||
false_filtered_out = true;
|
||||
segment->timestamp_filter(bitset_holder, timestamp_);
|
||||
}
|
||||
retrieve_result.result_offsets_ =
|
||||
segment->find_first(node.limit_, bitset_holder);
|
||||
segment->find_first(node.limit_, bitset_holder, false_filtered_out);
|
||||
retrieve_result_opt_ = std::move(retrieve_result);
|
||||
}
|
||||
|
||||
|
@ -55,7 +55,9 @@ class OffsetMap {
|
||||
using OffsetType = int64_t;
|
||||
// TODO: in fact, we can retrieve the pk here. Not sure which way is more efficient.
|
||||
virtual std::vector<OffsetType>
|
||||
find_first(int64_t limit, const BitsetType& bitset) const = 0;
|
||||
find_first(int64_t limit,
|
||||
const BitsetType& bitset,
|
||||
bool false_filtered_out) const = 0;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
@ -85,29 +87,36 @@ class OffsetOrderedMap : public OffsetMap {
|
||||
}
|
||||
|
||||
std::vector<OffsetType>
|
||||
find_first(int64_t limit, const BitsetType& bitset) const override {
|
||||
find_first(int64_t limit,
|
||||
const BitsetType& bitset,
|
||||
bool false_filtered_out) const override {
|
||||
if (limit == Unlimited || limit == NoLimit) {
|
||||
limit = map_.size();
|
||||
}
|
||||
|
||||
// TODO: we can't retrieve pk by offset very conveniently.
|
||||
// Selectivity should be done outside.
|
||||
return find_first_by_index(limit, bitset);
|
||||
return find_first_by_index(limit, bitset, false_filtered_out);
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<OffsetType>
|
||||
find_first_by_index(int64_t limit, const BitsetType& bitset) const {
|
||||
find_first_by_index(int64_t limit,
|
||||
const BitsetType& bitset,
|
||||
bool false_filtered_out) const {
|
||||
std::vector<int64_t> seg_offsets;
|
||||
seg_offsets.reserve(limit);
|
||||
int64_t hit_num = 0; // avoid counting the number everytime.
|
||||
auto cnt = bitset.count();
|
||||
if (!false_filtered_out) {
|
||||
cnt = bitset.size() - bitset.count();
|
||||
}
|
||||
for (auto it = map_.begin(); it != map_.end(); it++) {
|
||||
if (hit_num >= limit || hit_num >= cnt) {
|
||||
break;
|
||||
}
|
||||
for (auto seg_offset : it->second) {
|
||||
if (bitset[seg_offset]) {
|
||||
if (!(bitset[seg_offset] ^ false_filtered_out)) {
|
||||
seg_offsets.push_back(seg_offset);
|
||||
hit_num++;
|
||||
}
|
||||
@ -163,7 +172,9 @@ class OffsetOrderedArray : public OffsetMap {
|
||||
}
|
||||
|
||||
std::vector<OffsetType>
|
||||
find_first(int64_t limit, const BitsetType& bitset) const override {
|
||||
find_first(int64_t limit,
|
||||
const BitsetType& bitset,
|
||||
bool false_filtered_out) const override {
|
||||
check_search();
|
||||
|
||||
if (limit == Unlimited || limit == NoLimit) {
|
||||
@ -172,21 +183,26 @@ class OffsetOrderedArray : public OffsetMap {
|
||||
|
||||
// TODO: we can't retrieve pk by offset very conveniently.
|
||||
// Selectivity should be done outside.
|
||||
return find_first_by_index(limit, bitset);
|
||||
return find_first_by_index(limit, bitset, false_filtered_out);
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<OffsetType>
|
||||
find_first_by_index(int64_t limit, const BitsetType& bitset) const {
|
||||
find_first_by_index(int64_t limit,
|
||||
const BitsetType& bitset,
|
||||
bool false_filtered_out) const {
|
||||
std::vector<int64_t> seg_offsets;
|
||||
seg_offsets.reserve(limit);
|
||||
int64_t hit_num = 0; // avoid counting the number everytime.
|
||||
auto cnt = bitset.count();
|
||||
if (!false_filtered_out) {
|
||||
cnt = bitset.size() - bitset.count();
|
||||
}
|
||||
for (auto it = array_.begin(); it != array_.end(); it++) {
|
||||
if (hit_num >= limit || hit_num >= cnt) {
|
||||
break;
|
||||
}
|
||||
if (bitset[it->second]) {
|
||||
if (!(bitset[it->second] ^ false_filtered_out)) {
|
||||
seg_offsets.push_back(it->second);
|
||||
hit_num++;
|
||||
}
|
||||
|
@ -235,8 +235,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
}
|
||||
|
||||
std::vector<OffsetMap::OffsetType>
|
||||
find_first(int64_t limit, const BitsetType& bitset) const override {
|
||||
return insert_record_.pk2offset_->find_first(limit, bitset);
|
||||
find_first(int64_t limit,
|
||||
const BitsetType& bitset,
|
||||
bool false_filtered_out) const override {
|
||||
return insert_record_.pk2offset_->find_first(
|
||||
limit, bitset, false_filtered_out);
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -179,31 +179,39 @@ SegmentInternalInterface::get_real_count() const {
|
||||
}
|
||||
|
||||
void
|
||||
SegmentInternalInterface::search_ids_filter(BitsetType& bitset,
|
||||
Timestamp timestamp) const {
|
||||
SegmentInternalInterface::timestamp_filter(BitsetType& bitset,
|
||||
Timestamp timestamp) const {
|
||||
auto& timestamps = get_timestamps();
|
||||
for (int offset = bitset.find_first(); offset < bitset.size();
|
||||
offset = bitset.find_next(offset)) {
|
||||
auto cnt = bitset.size();
|
||||
if (timestamps[cnt - 1] <= timestamp) {
|
||||
// no need to filter out anything.
|
||||
return;
|
||||
}
|
||||
|
||||
auto pilot = upper_bound(timestamps, 0, cnt, timestamp);
|
||||
// offset bigger than pilot should be filtered out.
|
||||
for (int offset = pilot; offset < cnt; offset = bitset.find_next(offset)) {
|
||||
if (offset == BitsetType::npos) {
|
||||
return;
|
||||
}
|
||||
// You can't see an entity which is inserted after the point when you search.
|
||||
if (timestamps[offset] > timestamp) {
|
||||
bitset[offset] = false;
|
||||
}
|
||||
bitset[offset] = false;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SegmentInternalInterface::search_ids_filter(BitsetType& bitset,
|
||||
const std::vector<int64_t>& offsets,
|
||||
Timestamp timestamp) const {
|
||||
BitsetType bitset_copy = bitset;
|
||||
bitset.reset();
|
||||
SegmentInternalInterface::timestamp_filter(BitsetType& bitset,
|
||||
const std::vector<int64_t>& offsets,
|
||||
Timestamp timestamp) const {
|
||||
auto& timestamps = get_timestamps();
|
||||
auto cnt = bitset.size();
|
||||
if (timestamps[cnt - 1] <= timestamp) {
|
||||
// no need to filter out anything.
|
||||
return;
|
||||
}
|
||||
|
||||
// point query, faster than binary search.
|
||||
for (auto& offset : offsets) {
|
||||
// You can't see an entity which is inserted after the point when you search.
|
||||
if (bitset_copy[offset] && timestamps[offset] <= timestamp) {
|
||||
if (timestamps[offset] > timestamp) {
|
||||
bitset.set(offset, true);
|
||||
}
|
||||
}
|
||||
|
@ -186,16 +186,47 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
virtual std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
|
||||
search_ids(const IdArray& id_array, Timestamp timestamp) const = 0;
|
||||
|
||||
/**
|
||||
* Apply timestamp filtering on bitset, the query can't see an entity whose
|
||||
* timestamp is bigger than the timestamp of query.
|
||||
*
|
||||
* @param bitset The final bitset after scalar filtering and delta filtering,
|
||||
* `false` means that the entity will be filtered out.
|
||||
* @param timestamp The timestamp of query.
|
||||
*/
|
||||
void
|
||||
search_ids_filter(BitsetType& bitset, Timestamp timestamp) const;
|
||||
timestamp_filter(BitsetType& bitset, Timestamp timestamp) const;
|
||||
|
||||
/**
|
||||
* Apply timestamp filtering on bitset, the query can't see an entity whose
|
||||
* timestamp is bigger than the timestamp of query. The passed offsets are
|
||||
* all candidate entities.
|
||||
*
|
||||
* @param bitset The final bitset after scalar filtering and delta filtering,
|
||||
* `true` means that the entity will be filtered out.
|
||||
* @param offsets The segment offsets of all candidates.
|
||||
* @param timestamp The timestamp of query.
|
||||
*/
|
||||
void
|
||||
search_ids_filter(BitsetType& bitset,
|
||||
const std::vector<int64_t>& offsets,
|
||||
Timestamp timestamp) const;
|
||||
timestamp_filter(BitsetType& bitset,
|
||||
const std::vector<int64_t>& offsets,
|
||||
Timestamp timestamp) const;
|
||||
|
||||
/**
|
||||
* Sort all candidates in ascending order, and then return the limit smallest.
|
||||
* Bitset is used to check if the candidate will be filtered out. `false_filtered_out`
|
||||
* determines how to filter out candidates. If `false_filtered_out` is true, we will
|
||||
* filter all candidates whose related bit is false.
|
||||
*
|
||||
* @param limit
|
||||
* @param bitset
|
||||
* @param false_filtered_out
|
||||
* @return All candidates offsets.
|
||||
*/
|
||||
virtual std::vector<OffsetMap::OffsetType>
|
||||
find_first(int64_t limit, const BitsetType& bitset) const = 0;
|
||||
find_first(int64_t limit,
|
||||
const BitsetType& bitset,
|
||||
bool false_filtered_out) const = 0;
|
||||
|
||||
protected:
|
||||
// internal API: return chunk_data in span
|
||||
|
@ -111,8 +111,11 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
const Timestamp* timestamps) override;
|
||||
|
||||
std::vector<OffsetMap::OffsetType>
|
||||
find_first(int64_t limit, const BitsetType& bitset) const override {
|
||||
return insert_record_.pk2offset_->find_first(limit, bitset);
|
||||
find_first(int64_t limit,
|
||||
const BitsetType& bitset,
|
||||
bool false_filtered_out) const override {
|
||||
return insert_record_.pk2offset_->find_first(
|
||||
limit, bitset, false_filtered_out);
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -588,4 +588,21 @@ LoadFieldDatasFromRemote(std::vector<std::string>& remote_files,
|
||||
|
||||
channel->close();
|
||||
}
|
||||
|
||||
int64_t
|
||||
upper_bound(const ConcurrentVector<Timestamp>& timestamps,
|
||||
int64_t first,
|
||||
int64_t last,
|
||||
Timestamp value) {
|
||||
int64_t mid;
|
||||
while (first < last) {
|
||||
mid = first + (last - first) / 2;
|
||||
if (value >= timestamps[mid]) {
|
||||
first = mid + 1;
|
||||
} else {
|
||||
last = mid;
|
||||
}
|
||||
}
|
||||
return first;
|
||||
}
|
||||
} // namespace milvus::segcore
|
||||
|
@ -9,6 +9,8 @@
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <exception>
|
||||
#include <memory>
|
||||
@ -151,4 +153,20 @@ ReverseDataFromIndex(const index::IndexBase* index,
|
||||
void
|
||||
LoadFieldDatasFromRemote(std::vector<std::string>& remote_files,
|
||||
storage::FieldDataChannelPtr channel);
|
||||
|
||||
/**
|
||||
* Returns an index pointing to the first element in the range [first, last) such that `value < element` is true
|
||||
* (i.e. that is strictly greater than value), or last if no such element is found.
|
||||
*
|
||||
* @param timestamps
|
||||
* @param first
|
||||
* @param last
|
||||
* @param value
|
||||
* @return The index of answer, last will be returned if no timestamp is bigger than the value.
|
||||
*/
|
||||
int64_t
|
||||
upper_bound(const ConcurrentVector<Timestamp>& timestamps,
|
||||
int64_t first,
|
||||
int64_t last,
|
||||
Timestamp value);
|
||||
} // namespace milvus::segcore
|
||||
|
@ -68,7 +68,7 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
|
||||
std::vector<int64_t> offsets;
|
||||
|
||||
// not sealed.
|
||||
ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {}));
|
||||
ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {}, true));
|
||||
|
||||
// insert 10 entities.
|
||||
int num = 10;
|
||||
@ -83,12 +83,12 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
|
||||
// all is satisfied.
|
||||
BitsetType all(num);
|
||||
all.set();
|
||||
offsets = this->map_.find_first(num / 2, all);
|
||||
offsets = this->map_.find_first(num / 2, all, true);
|
||||
ASSERT_EQ(num / 2, offsets.size());
|
||||
for (int i = 1; i < offsets.size(); i++) {
|
||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||
}
|
||||
offsets = this->map_.find_first(Unlimited, all);
|
||||
offsets = this->map_.find_first(Unlimited, all, true);
|
||||
ASSERT_EQ(num, offsets.size());
|
||||
for (int i = 1; i < offsets.size(); i++) {
|
||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||
@ -97,9 +97,9 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
|
||||
// none is satisfied.
|
||||
BitsetType none(num);
|
||||
none.reset();
|
||||
offsets = this->map_.find_first(num / 2, none);
|
||||
offsets = this->map_.find_first(num / 2, none, true);
|
||||
ASSERT_EQ(0, offsets.size());
|
||||
offsets = this->map_.find_first(NoLimit, none);
|
||||
offsets = this->map_.find_first(NoLimit, none, true);
|
||||
ASSERT_EQ(0, offsets.size());
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,7 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
|
||||
std::vector<int64_t> offsets;
|
||||
|
||||
// no data.
|
||||
offsets = this->map_.find_first(Unlimited, {});
|
||||
offsets = this->map_.find_first(Unlimited, {}, true);
|
||||
ASSERT_EQ(0, offsets.size());
|
||||
|
||||
// insert 10 entities.
|
||||
@ -76,12 +76,12 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
|
||||
// all is satisfied.
|
||||
BitsetType all(num);
|
||||
all.set();
|
||||
offsets = this->map_.find_first(num / 2, all);
|
||||
offsets = this->map_.find_first(num / 2, all, true);
|
||||
ASSERT_EQ(num / 2, offsets.size());
|
||||
for (int i = 1; i < offsets.size(); i++) {
|
||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||
}
|
||||
offsets = this->map_.find_first(Unlimited, all);
|
||||
offsets = this->map_.find_first(Unlimited, all, true);
|
||||
ASSERT_EQ(num, offsets.size());
|
||||
for (int i = 1; i < offsets.size(); i++) {
|
||||
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
|
||||
@ -90,9 +90,9 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
|
||||
// none is satisfied.
|
||||
BitsetType none(num);
|
||||
none.reset();
|
||||
offsets = this->map_.find_first(num / 2, none);
|
||||
offsets = this->map_.find_first(num / 2, none, true);
|
||||
ASSERT_EQ(0, offsets.size());
|
||||
offsets = this->map_.find_first(NoLimit, none);
|
||||
offsets = this->map_.find_first(NoLimit, none, true);
|
||||
ASSERT_EQ(0, offsets.size());
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "common/Utils.h"
|
||||
#include "query/Utils.h"
|
||||
#include "test_utils/DataGen.h"
|
||||
#include "common/Types.h"
|
||||
|
||||
TEST(Util, StringMatch) {
|
||||
using namespace milvus;
|
||||
@ -117,3 +118,18 @@ TEST(Util, OutOfRange) {
|
||||
ASSERT_TRUE(out_of_range<int32_t>(
|
||||
static_cast<int64_t>(std::numeric_limits<int32_t>::min()) - 1));
|
||||
}
|
||||
|
||||
TEST(Util, upper_bound) {
|
||||
using milvus::Timestamp;
|
||||
using milvus::segcore::ConcurrentVector;
|
||||
using milvus::segcore::upper_bound;
|
||||
|
||||
std::vector<Timestamp> data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
||||
ConcurrentVector<Timestamp> timestamps(1);
|
||||
timestamps.grow_to_at_least(data.size());
|
||||
timestamps.set_data(0, data.data(), data.size());
|
||||
|
||||
ASSERT_EQ(1, upper_bound(timestamps, 0, data.size(), 0));
|
||||
ASSERT_EQ(5, upper_bound(timestamps, 0, data.size(), 4));
|
||||
ASSERT_EQ(10, upper_bound(timestamps, 0, data.size(), 10));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user