mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
remove deprecated (#6623)
* remove deprecated Signed-off-by: fluorinedog <fluorinedog@gmail.com> * fix timeout Signed-off-by: fluorinedog <fluorinedog@gmail.com>
This commit is contained in:
parent
b9f5949680
commit
234954931f
@ -23,8 +23,6 @@
|
||||
|
||||
namespace milvus {
|
||||
namespace segcore {
|
||||
// using engine::DataChunk;
|
||||
// using engine::DataChunkPtr;
|
||||
using SearchResult = milvus::SearchResult;
|
||||
struct RowBasedRawData {
|
||||
void* raw_data; // schema
|
||||
@ -41,14 +39,6 @@ int
|
||||
TestABI();
|
||||
|
||||
class SegmentGrowing : public SegmentInternalInterface {
|
||||
public:
|
||||
// definitions
|
||||
enum class SegmentState {
|
||||
Invalid = 0,
|
||||
Open, // able to insert data
|
||||
Closed // able to build index
|
||||
};
|
||||
|
||||
public:
|
||||
virtual void
|
||||
debug_disable_small_index() = 0;
|
||||
@ -76,18 +66,7 @@ class SegmentGrowing : public SegmentInternalInterface {
|
||||
virtual Status
|
||||
Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0;
|
||||
|
||||
virtual Status
|
||||
LoadIndexing(const LoadIndexInfo& info) = 0;
|
||||
|
||||
// stop receive insert requests
|
||||
virtual Status
|
||||
Close() = 0;
|
||||
|
||||
public:
|
||||
// feature not implemented
|
||||
virtual SegmentState
|
||||
get_state() const = 0;
|
||||
|
||||
virtual ssize_t
|
||||
get_deleted_count() const = 0;
|
||||
};
|
||||
|
@ -240,18 +240,6 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin,
|
||||
// return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentGrowingImpl::Close() {
|
||||
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
|
||||
PanicInfo("insert not ready");
|
||||
}
|
||||
if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) {
|
||||
PanicInfo("delete not ready");
|
||||
}
|
||||
state_ = SegmentState::Closed;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t
|
||||
SegmentGrowingImpl::GetMemoryUsageInBytes() const {
|
||||
int64_t total_bytes = 0;
|
||||
@ -263,17 +251,6 @@ SegmentGrowingImpl::GetMemoryUsageInBytes() const {
|
||||
return total_bytes;
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentGrowingImpl::LoadIndexing(const LoadIndexInfo& info) {
|
||||
auto field_offset = schema_->get_offset(FieldId(info.field_id));
|
||||
|
||||
Assert(info.index_params.count("metric_type"));
|
||||
auto metric_type_str = info.index_params.at("metric_type");
|
||||
|
||||
sealed_indexing_record_.append_field_indexing(field_offset, GetMetricType(metric_type_str), info.index);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
SpanBase
|
||||
SegmentGrowingImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const {
|
||||
auto vec = get_insert_record().get_field_data_base(field_offset);
|
||||
|
@ -63,11 +63,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
Status
|
||||
Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;
|
||||
|
||||
// stop receive insert requests
|
||||
// will move data to immutable vector or something
|
||||
Status
|
||||
Close() override;
|
||||
|
||||
int64_t
|
||||
GetMemoryUsageInBytes() const override;
|
||||
|
||||
@ -106,6 +101,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
return indexing_record_.get_finished_ack();
|
||||
}
|
||||
|
||||
// deprecated
|
||||
const knowhere::Index*
|
||||
chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const final {
|
||||
return indexing_record_.get_field_indexing(field_offset).get_chunk_indexing(chunk_id);
|
||||
@ -117,6 +113,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
}
|
||||
|
||||
public:
|
||||
// only for debug
|
||||
void
|
||||
debug_disable_small_index() override {
|
||||
debug_disable_small_index_ = true;
|
||||
@ -127,11 +124,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
return record_.ack_responder_.GetAck();
|
||||
}
|
||||
|
||||
SegmentState
|
||||
get_state() const override {
|
||||
return state_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
ssize_t
|
||||
get_deleted_count() const override {
|
||||
return 0;
|
||||
@ -160,9 +152,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
void
|
||||
bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override;
|
||||
|
||||
Status
|
||||
LoadIndexing(const LoadIndexInfo& info) override;
|
||||
|
||||
public:
|
||||
friend std::unique_ptr<SegmentGrowing>
|
||||
CreateGrowingSegment(SchemaPtr schema, const SegcoreConfig& segcore_config);
|
||||
@ -216,7 +205,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
private:
|
||||
SegcoreConfig segcore_config_;
|
||||
SchemaPtr schema_;
|
||||
std::atomic<SegmentState> state_ = SegmentState::Open;
|
||||
|
||||
InsertRecord record_;
|
||||
DeletedRecord deleted_record_;
|
||||
|
@ -1,64 +0,0 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
#if 0
|
||||
#include "IndexMeta.h"
|
||||
#include <mutex>
|
||||
#include <cassert>
|
||||
namespace milvus::segcore {
|
||||
|
||||
Status
|
||||
IndexMeta::AddEntry(
|
||||
const std::string& index_name, const std::string& field_name_, IndexType type, IndexMode mode, IndexConfig config) {
|
||||
auto field_name = FieldName(field_name_);
|
||||
Entry entry{index_name, field_name, type, mode, std::move(config)};
|
||||
VerifyEntry(entry);
|
||||
|
||||
if (entries_.count(index_name)) {
|
||||
throw std::invalid_argument("duplicate index_name");
|
||||
}
|
||||
// TODO: support multiple indexes for single field
|
||||
Assert(!lookups_.count(field_name));
|
||||
lookups_[field_name] = index_name;
|
||||
entries_[index_name] = std::move(entry);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
IndexMeta::DropEntry(const std::string& index_name) {
|
||||
Assert(entries_.count(index_name));
|
||||
auto entry = std::move(entries_[index_name]);
|
||||
if (lookups_[entry.field_name] == index_name) {
|
||||
lookups_.erase(entry.field_name);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void
|
||||
IndexMeta::VerifyEntry(const Entry& entry) {
|
||||
auto is_mode_valid = std::set{IndexMode::MODE_CPU, IndexMode::MODE_GPU}.count(entry.mode);
|
||||
if (!is_mode_valid) {
|
||||
throw std::invalid_argument("invalid mode");
|
||||
}
|
||||
|
||||
auto& schema = *schema_;
|
||||
auto& field_meta = schema[entry.field_name];
|
||||
// TODO checking
|
||||
if (field_meta.is_vector()) {
|
||||
Assert(entry.type == knowhere::IndexEnum::INDEX_FAISS_IVFPQ);
|
||||
} else {
|
||||
Assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
#endif
|
@ -1,79 +0,0 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
//
|
||||
//#include <shared_mutex>
|
||||
//
|
||||
//#include "common/Schema.h"
|
||||
// #include "segcore/SegmentBase.h"
|
||||
#if 0
|
||||
#include "common/Schema.h"
|
||||
#include "knowhere/index/IndexType.h"
|
||||
#include "knowhere/common/Config.h"
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
namespace milvus::segcore {
|
||||
// TODO: this is
|
||||
class IndexMeta {
|
||||
public:
|
||||
explicit IndexMeta(SchemaPtr schema) : schema_(schema) {
|
||||
}
|
||||
using IndexType = knowhere::IndexType;
|
||||
using IndexMode = knowhere::IndexMode;
|
||||
using IndexConfig = knowhere::Config;
|
||||
|
||||
struct Entry {
|
||||
std::string index_name;
|
||||
FieldName field_name;
|
||||
IndexType type;
|
||||
IndexMode mode;
|
||||
IndexConfig config;
|
||||
};
|
||||
|
||||
Status
|
||||
AddEntry(const std::string& index_name,
|
||||
const std::string& field_name,
|
||||
IndexType type,
|
||||
IndexMode mode,
|
||||
IndexConfig config);
|
||||
|
||||
Status
|
||||
DropEntry(const std::string& index_name);
|
||||
|
||||
const std::map<std::string, Entry>&
|
||||
get_entries() {
|
||||
return entries_;
|
||||
}
|
||||
|
||||
const Entry&
|
||||
lookup_by_field(const FieldName& field_name) {
|
||||
AssertInfo(lookups_.count(field_name), field_name.get());
|
||||
auto index_name = lookups_.at(field_name);
|
||||
AssertInfo(entries_.count(index_name), index_name);
|
||||
return entries_.at(index_name);
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
VerifyEntry(const Entry& entry);
|
||||
|
||||
private:
|
||||
SchemaPtr schema_;
|
||||
std::map<std::string, Entry> entries_; // index_name => Entry
|
||||
std::map<FieldName, std::string> lookups_; // field_name => index_name
|
||||
};
|
||||
|
||||
using IndexMetaPtr = std::shared_ptr<IndexMeta>;
|
||||
} // namespace milvus::segcore
|
||||
|
||||
#endif
|
@ -1,464 +0,0 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
#if 0
|
||||
|
||||
#include <segcore/deprecated/SegmentNaive.h>
|
||||
#include <random>
|
||||
#include <algorithm>
|
||||
#include <numeric>
|
||||
#include <thread>
|
||||
#include <queue>
|
||||
|
||||
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
|
||||
#include <knowhere/index/vector_index/VecIndexFactory.h>
|
||||
#include <faiss/utils/distances.h>
|
||||
#include <faiss/utils/BitsetView.h>
|
||||
#include "segcore/Reduce.h"
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
int64_t
|
||||
SegmentNaive::PreInsert(int64_t size) {
|
||||
auto reserved_begin = record_.reserved.fetch_add(size);
|
||||
return reserved_begin;
|
||||
}
|
||||
|
||||
int64_t
|
||||
SegmentNaive::PreDelete(int64_t size) {
|
||||
auto reserved_begin = deleted_record_.reserved.fetch_add(size);
|
||||
return reserved_begin;
|
||||
}
|
||||
|
||||
auto
|
||||
SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force)
|
||||
-> std::shared_ptr<DeletedRecord::TmpBitmap> {
|
||||
auto old = deleted_record_.get_lru_entry();
|
||||
|
||||
if (!force || old->bitmap_ptr->count() == insert_barrier) {
|
||||
if (old->del_barrier == del_barrier) {
|
||||
return old;
|
||||
}
|
||||
}
|
||||
|
||||
auto current = old->clone(insert_barrier);
|
||||
current->del_barrier = del_barrier;
|
||||
|
||||
auto bitmap = current->bitmap_ptr;
|
||||
if (del_barrier < old->del_barrier) {
|
||||
for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) {
|
||||
// get uid in delete logs
|
||||
auto uid = deleted_record_.uids_[del_index];
|
||||
// map uid to corrensponding offsets, select the max one, which should be the target
|
||||
// the max one should be closest to query_timestamp, so the delete log should refer to it
|
||||
int64_t the_offset = -1;
|
||||
auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = iter->second;
|
||||
if (record_.timestamps_[offset] < query_timestamp) {
|
||||
Assert(offset < insert_barrier);
|
||||
the_offset = std::max(the_offset, offset);
|
||||
}
|
||||
}
|
||||
// if not found, skip
|
||||
if (the_offset == -1) {
|
||||
continue;
|
||||
}
|
||||
// otherwise, clear the flag
|
||||
bitmap->clear(the_offset);
|
||||
}
|
||||
return current;
|
||||
} else {
|
||||
for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) {
|
||||
// get uid in delete logs
|
||||
auto uid = deleted_record_.uids_[del_index];
|
||||
// map uid to corrensponding offsets, select the max one, which should be the target
|
||||
// the max one should be closest to query_timestamp, so the delete log should refer to it
|
||||
int64_t the_offset = -1;
|
||||
auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = iter->second;
|
||||
if (offset >= insert_barrier) {
|
||||
continue;
|
||||
}
|
||||
if (record_.timestamps_[offset] < query_timestamp) {
|
||||
Assert(offset < insert_barrier);
|
||||
the_offset = std::max(the_offset, offset);
|
||||
}
|
||||
}
|
||||
|
||||
// if not found, skip
|
||||
if (the_offset == -1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// otherwise, set the flag
|
||||
bitmap->set(the_offset);
|
||||
}
|
||||
this->deleted_record_.insert_lru_entry(current);
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentNaive::Insert(int64_t reserved_begin,
|
||||
int64_t size,
|
||||
const int64_t* uids_raw,
|
||||
const Timestamp* timestamps_raw,
|
||||
const RowBasedRawData& entities_raw) {
|
||||
Assert(entities_raw.count == size);
|
||||
if (entities_raw.sizeof_per_row != schema_->get_total_sizeof()) {
|
||||
std::string msg = "entity length = " + std::to_string(entities_raw.sizeof_per_row) +
|
||||
", schema length = " + std::to_string(schema_->get_total_sizeof());
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
auto raw_data = reinterpret_cast<const char*>(entities_raw.raw_data);
|
||||
// std::vector<char> entities(raw_data, raw_data + size * len_per_row);
|
||||
|
||||
auto len_per_row = entities_raw.sizeof_per_row;
|
||||
std::vector<std::tuple<Timestamp, idx_t, int64_t>> ordering;
|
||||
ordering.resize(size);
|
||||
// #pragma omp parallel for
|
||||
for (int i = 0; i < size; ++i) {
|
||||
ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i], i);
|
||||
}
|
||||
std::sort(ordering.begin(), ordering.end());
|
||||
auto sizeof_infos = schema_->get_sizeof_infos();
|
||||
std::vector<int> offset_infos(schema_->size() + 1, 0);
|
||||
std::partial_sum(sizeof_infos.begin(), sizeof_infos.end(), offset_infos.begin() + 1);
|
||||
std::vector<std::vector<char>> entities(schema_->size());
|
||||
|
||||
for (int fid = 0; fid < schema_->size(); ++fid) {
|
||||
auto len = sizeof_infos[fid];
|
||||
entities[fid].resize(len * size);
|
||||
}
|
||||
|
||||
std::vector<idx_t> uids(size);
|
||||
std::vector<Timestamp> timestamps(size);
|
||||
// #pragma omp parallel for
|
||||
for (int index = 0; index < size; ++index) {
|
||||
auto [t, uid, order_index] = ordering[index];
|
||||
timestamps[index] = t;
|
||||
uids[index] = uid;
|
||||
for (int fid = 0; fid < schema_->size(); ++fid) {
|
||||
auto len = sizeof_infos[fid];
|
||||
auto offset = offset_infos[fid];
|
||||
auto src = raw_data + offset + order_index * len_per_row;
|
||||
auto dst = entities[fid].data() + index * len;
|
||||
memcpy(dst, src, len);
|
||||
}
|
||||
}
|
||||
|
||||
record_.timestamps_.set_data(reserved_begin, timestamps.data(), size);
|
||||
record_.uids_.set_data(reserved_begin, uids.data(), size);
|
||||
for (int fid = 0; fid < schema_->size(); ++fid) {
|
||||
record_.entity_vec_[fid]->set_data_raw(reserved_begin, entities[fid].data(), size);
|
||||
}
|
||||
|
||||
for (int i = 0; i < uids.size(); ++i) {
|
||||
auto uid = uids[i];
|
||||
// NOTE: this must be the last step, cannot be put above
|
||||
uid2offset_.insert(std::make_pair(uid, reserved_begin + i));
|
||||
}
|
||||
|
||||
record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
|
||||
return Status::OK();
|
||||
|
||||
// std::thread go(executor, std::move(uids), std::move(timestamps), std::move(entities));
|
||||
// go.detach();
|
||||
// const auto& schema = *schema_;
|
||||
// auto record_ptr = GetMutableRecord();
|
||||
// Assert(record_ptr);
|
||||
// auto& record = *record_ptr;
|
||||
// auto data_chunk = ColumnBasedDataChunk::from(row_values, schema);
|
||||
//
|
||||
// // TODO: use shared_lock for better concurrency
|
||||
// std::lock_guard lck(mutex_);
|
||||
// Assert(state_ == SegmentState::Open);
|
||||
// auto ack_id = ack_count_.load();
|
||||
// record.uids_.grow_by(row_ids, row_ids + size);
|
||||
// for (int64_t i = 0; i < size; ++i) {
|
||||
// auto key = row_ids[i];
|
||||
// auto internal_index = i + ack_id;
|
||||
// internal_indexes_[key] = internal_index;
|
||||
// }
|
||||
// record.timestamps_.grow_by(timestamps, timestamps + size);
|
||||
// for (int fid = 0; fid < schema.size(); ++fid) {
|
||||
// auto field = schema[fid];
|
||||
// auto total_len = field.get_sizeof() * size / sizeof(float);
|
||||
// auto source_vec = data_chunk.entity_vecs[fid];
|
||||
// record.entity_vecs_[fid].grow_by(source_vec.data(), source_vec.data() + total_len);
|
||||
// }
|
||||
//
|
||||
// // finish insert
|
||||
// ack_count_ += size;
|
||||
// return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t* uids_raw, const Timestamp* timestamps_raw) {
|
||||
std::vector<std::tuple<Timestamp, idx_t>> ordering;
|
||||
ordering.resize(size);
|
||||
// #pragma omp parallel for
|
||||
for (int i = 0; i < size; ++i) {
|
||||
ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i]);
|
||||
}
|
||||
std::sort(ordering.begin(), ordering.end());
|
||||
std::vector<idx_t> uids(size);
|
||||
std::vector<Timestamp> timestamps(size);
|
||||
// #pragma omp parallel for
|
||||
for (int index = 0; index < size; ++index) {
|
||||
auto [t, uid] = ordering[index];
|
||||
timestamps[index] = t;
|
||||
uids[index] = uid;
|
||||
}
|
||||
deleted_record_.timestamps_.set_data(reserved_begin, timestamps.data(), size);
|
||||
deleted_record_.uids_.set_data(reserved_begin, uids.data(), size);
|
||||
deleted_record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
|
||||
return Status::OK();
|
||||
// for (int i = 0; i < size; ++i) {
|
||||
// auto key = row_ids[i];
|
||||
// auto time = timestamps[i];
|
||||
// delete_logs_.insert(std::make_pair(key, time));
|
||||
// }
|
||||
// return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
|
||||
auto ins_barrier = get_barrier(record_, timestamp);
|
||||
auto del_barrier = get_barrier(deleted_record_, timestamp);
|
||||
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier, true);
|
||||
Assert(bitmap_holder);
|
||||
Assert(bitmap_holder->bitmap_ptr->count() == ins_barrier);
|
||||
|
||||
auto field_name = FieldName(query_info->field_name);
|
||||
auto field_offset = schema_->get_offset(field_name);
|
||||
auto& field = schema_->operator[](field_name);
|
||||
|
||||
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
auto dim = field.get_dim();
|
||||
auto bitmap = bitmap_holder->bitmap_ptr;
|
||||
auto topK = query_info->topK;
|
||||
auto num_queries = query_info->num_queries;
|
||||
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<FloatVector>>(record_.entity_vec_.at(field_offset));
|
||||
auto index_entry = index_meta_->lookup_by_field(field_name);
|
||||
auto conf = index_entry.config;
|
||||
|
||||
conf[milvus::knowhere::meta::TOPK] = query_info->topK;
|
||||
{
|
||||
auto count = 0;
|
||||
for (int i = 0; i < bitmap->count(); ++i) {
|
||||
if (bitmap->test(i)) {
|
||||
++count;
|
||||
}
|
||||
}
|
||||
std::cout << "fuck " << count << std::endl;
|
||||
}
|
||||
|
||||
auto indexing = std::static_pointer_cast<knowhere::VecIndex>(indexings_[index_entry.index_name]);
|
||||
auto ds = knowhere::GenDataset(query_info->num_queries, dim, query_info->query_raw_data.data());
|
||||
auto final = indexing->Query(ds, conf, bitmap);
|
||||
|
||||
auto ids = final->Get<idx_t*>(knowhere::meta::IDS);
|
||||
auto distances = final->Get<float*>(knowhere::meta::DISTANCE);
|
||||
|
||||
auto total_num = num_queries * topK;
|
||||
result.result_distances_.resize(total_num);
|
||||
|
||||
result.num_queries_ = num_queries;
|
||||
result.topK_ = topK;
|
||||
|
||||
std::copy_n(ids, total_num, result.internal_seg_offsets_.data());
|
||||
std::copy_n(distances, total_num, result.result_distances_.data());
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentNaive::QueryBruteForceImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) {
|
||||
PanicInfo("deprecated");
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentNaive::QuerySlowImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
|
||||
auto ins_barrier = get_barrier(record_, timestamp);
|
||||
auto del_barrier = get_barrier(deleted_record_, timestamp);
|
||||
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
|
||||
Assert(bitmap_holder);
|
||||
auto field_name = FieldName(query_info->field_name);
|
||||
auto& field = schema_->operator[](field_name);
|
||||
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
|
||||
auto dim = field.get_dim();
|
||||
auto bitmap = bitmap_holder->bitmap_ptr;
|
||||
auto topK = query_info->topK;
|
||||
auto num_queries = query_info->num_queries;
|
||||
// TODO: optimize
|
||||
auto field_offset = schema_->get_offset(field_name);
|
||||
Assert(field_offset < record_.entity_vec_.size());
|
||||
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<FloatVector>>(record_.entity_vec_.at(field_offset));
|
||||
std::vector<std::priority_queue<std::pair<float, int>>> records(num_queries);
|
||||
|
||||
auto get_L2_distance = [dim](const float* a, const float* b) {
|
||||
float L2_distance = 0;
|
||||
for (auto i = 0; i < dim; ++i) {
|
||||
auto d = a[i] - b[i];
|
||||
L2_distance += d * d;
|
||||
}
|
||||
return L2_distance;
|
||||
};
|
||||
|
||||
for (int64_t i = 0; i < ins_barrier; ++i) {
|
||||
if (i < bitmap->count() && bitmap->test(i)) {
|
||||
continue;
|
||||
}
|
||||
auto element = vec_ptr->get_element(i);
|
||||
for (auto query_id = 0; query_id < num_queries; ++query_id) {
|
||||
auto query_blob = query_info->query_raw_data.data() + query_id * dim;
|
||||
auto dis = get_L2_distance(query_blob, element);
|
||||
auto& record = records[query_id];
|
||||
if (record.size() < topK) {
|
||||
record.emplace(dis, i);
|
||||
} else if (record.top().first > dis) {
|
||||
record.emplace(dis, i);
|
||||
record.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result.num_queries_ = num_queries;
|
||||
result.topK_ = topK;
|
||||
auto row_num = topK * num_queries;
|
||||
|
||||
result.internal_seg_offsets_.resize(row_num);
|
||||
result.result_distances_.resize(row_num);
|
||||
|
||||
for (int q_id = 0; q_id < num_queries; ++q_id) {
|
||||
// reverse
|
||||
for (int i = 0; i < topK; ++i) {
|
||||
auto dst_id = topK - 1 - i + q_id * topK;
|
||||
auto [dis, offset] = records[q_id].top();
|
||||
records[q_id].pop();
|
||||
result.internal_seg_offsets_[dst_id] = offset;
|
||||
result.result_distances_[dst_id] = dis;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentNaive::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
|
||||
// TODO: enable delete
|
||||
// TODO: enable index
|
||||
// TODO: remove mock
|
||||
if (query_info == nullptr) {
|
||||
query_info = std::make_shared<query::QueryDeprecated>();
|
||||
query_info->field_name = "fakevec";
|
||||
query_info->topK = 10;
|
||||
query_info->num_queries = 1;
|
||||
|
||||
auto dim = schema_->operator[](FieldName("fakevec")).get_dim();
|
||||
std::default_random_engine e(42);
|
||||
std::uniform_real_distribution<> dis(0.0, 1.0);
|
||||
query_info->query_raw_data.resize(query_info->num_queries * dim);
|
||||
for (auto& x : query_info->query_raw_data) {
|
||||
x = dis(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (index_ready_) {
|
||||
return QueryImpl(query_info, timestamp, result);
|
||||
} else {
|
||||
return QueryBruteForceImpl(query_info, timestamp, result);
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentNaive::Close() {
|
||||
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {
|
||||
PanicInfo("insert not ready");
|
||||
}
|
||||
if (this->deleted_record_.reserved != this->deleted_record_.ack_responder_.GetAck()) {
|
||||
PanicInfo("delete not ready");
|
||||
}
|
||||
state_ = SegmentState::Closed;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
template <typename Type>
|
||||
knowhere::IndexPtr
|
||||
SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry& entry) {
|
||||
PanicInfo("deprecated");
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentNaive::BuildIndex(IndexMetaPtr remote_index_meta) {
|
||||
if (remote_index_meta == nullptr) {
|
||||
std::cout << "WARN: Null index ptr is detected, use default index" << std::endl;
|
||||
|
||||
int dim = 0;
|
||||
std::string index_field_name;
|
||||
|
||||
for (auto& field : schema_->get_fields()) {
|
||||
if (field.get_data_type() == DataType::VECTOR_FLOAT) {
|
||||
dim = field.get_dim();
|
||||
index_field_name = field.get_name().get();
|
||||
}
|
||||
}
|
||||
|
||||
Assert(dim != 0);
|
||||
Assert(!index_field_name.empty());
|
||||
|
||||
auto index_meta = std::make_shared<IndexMeta>(schema_);
|
||||
// TODO: this is merge of query conf and insert conf
|
||||
// TODO: should be splitted into multiple configs
|
||||
auto conf = milvus::knowhere::Config{
|
||||
{milvus::knowhere::meta::DIM, dim}, {milvus::knowhere::IndexParams::nlist, 100},
|
||||
{milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4},
|
||||
{milvus::knowhere::IndexParams::nbits, 8}, {milvus::knowhere::Metric::TYPE, milvus::knowhere::Metric::L2},
|
||||
{milvus::knowhere::meta::DEVICEID, 0},
|
||||
};
|
||||
index_meta->AddEntry("fakeindex", index_field_name, knowhere::IndexEnum::INDEX_FAISS_IVFPQ,
|
||||
knowhere::IndexMode::MODE_CPU, conf);
|
||||
remote_index_meta = index_meta;
|
||||
}
|
||||
|
||||
if (record_.ack_responder_.GetAck() < 1024 * 4) {
|
||||
return Status(SERVER_BUILD_INDEX_ERROR, "too few elements");
|
||||
}
|
||||
|
||||
index_meta_ = remote_index_meta;
|
||||
for (auto& [index_name, entry] : index_meta_->get_entries()) {
|
||||
Assert(entry.index_name == index_name);
|
||||
const auto& field = (*schema_)[entry.field_name];
|
||||
|
||||
if (field.is_vector()) {
|
||||
Assert(field.get_data_type() == engine::DataType::VECTOR_FLOAT);
|
||||
auto index_ptr = BuildVecIndexImpl<float>(entry);
|
||||
indexings_[index_name] = index_ptr;
|
||||
} else {
|
||||
throw std::runtime_error("unimplemented");
|
||||
}
|
||||
}
|
||||
|
||||
index_ready_ = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t
|
||||
SegmentNaive::GetMemoryUsageInBytes() {
|
||||
PanicInfo("Deprecated");
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
#endif
|
@ -1,167 +0,0 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
#if 0
|
||||
|
||||
#include <tbb/concurrent_priority_queue.h>
|
||||
#include <tbb/concurrent_unordered_map.h>
|
||||
#include <tbb/concurrent_vector.h>
|
||||
|
||||
#include <knowhere/index/vector_index/VecIndex.h>
|
||||
|
||||
#include "query/deprecated/GeneralQuery.h"
|
||||
#include "utils/Status.h"
|
||||
#include "exceptions/EasyAssert.h"
|
||||
#include "segcore/SegmentBase.h"
|
||||
#include "segcore/AckResponder.h"
|
||||
#include "segcore/ConcurrentVector.h"
|
||||
#include "segcore/DeletedRecord.h"
|
||||
#include "segcore/InsertRecord.h"
|
||||
#include <memory>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace milvus::segcore {
|
||||
class SegmentNaive : public SegmentBase {
|
||||
public:
|
||||
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
|
||||
|
||||
int64_t
|
||||
PreInsert(int64_t size) override;
|
||||
|
||||
// TODO: originally, id should be put into data_chunk
|
||||
// TODO: Is it ok to put them the other side?
|
||||
Status
|
||||
Insert(int64_t reserverd_offset,
|
||||
int64_t size,
|
||||
const int64_t* row_ids,
|
||||
const Timestamp* timestamps,
|
||||
const RowBasedRawData& values) override;
|
||||
|
||||
int64_t
|
||||
PreDelete(int64_t size) override;
|
||||
|
||||
// TODO: add id into delete log, possibly bitmap
|
||||
Status
|
||||
Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;
|
||||
|
||||
private:
|
||||
// NOTE: now deprecated, remains for further copy out
|
||||
Status
|
||||
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results);
|
||||
|
||||
public:
|
||||
Status
|
||||
Search(const query::Plan* Plan,
|
||||
const query::PlaceholderGroup* placeholder_groups[],
|
||||
const Timestamp timestamps[],
|
||||
int num_groups,
|
||||
QueryResult& results) override {
|
||||
PanicInfo("unimplemented");
|
||||
}
|
||||
|
||||
// stop receive insert requests
|
||||
// will move data to immutable vector or something
|
||||
Status
|
||||
Close() override;
|
||||
|
||||
// using IndexType = knowhere::IndexType;
|
||||
// using IndexMode = knowhere::IndexMode;
|
||||
// using IndexConfig = knowhere::Config;
|
||||
// BuildIndex With Paramaters, must with Frozen State
|
||||
// NOTE: index_params contains serveral policies for several index
|
||||
// TODO: currently, index has to be set at startup, and can't be modified
|
||||
// AddIndex and DropIndex will be added later
|
||||
Status
|
||||
BuildIndex(IndexMetaPtr index_meta) override;
|
||||
|
||||
Status
|
||||
LoadIndexing(const LoadIndexInfo& info) override {
|
||||
PanicInfo("unimplemented");
|
||||
}
|
||||
|
||||
Status
|
||||
FillTargetEntry(const query::Plan* Plan, QueryResult& results) override {
|
||||
PanicInfo("unimplemented");
|
||||
}
|
||||
|
||||
Status
|
||||
DropRawData(std::string_view field_name) override {
|
||||
// TODO: NO-OP
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) override {
|
||||
// TODO: NO-OP
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t
|
||||
GetMemoryUsageInBytes() override;
|
||||
|
||||
public:
|
||||
ssize_t
|
||||
get_row_count() const override {
|
||||
return record_.ack_responder_.GetAck();
|
||||
}
|
||||
|
||||
SegmentState
|
||||
get_state() const override {
|
||||
return state_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
ssize_t
|
||||
get_deleted_count() const override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public:
|
||||
friend std::unique_ptr<SegmentBase>
|
||||
CreateSegment(SchemaPtr schema);
|
||||
|
||||
static constexpr int64_t deprecated_fixed_chunk_size = 32 * 1024;
|
||||
explicit SegmentNaive(const SchemaPtr& schema) : schema_(schema), record_(*schema, deprecated_fixed_chunk_size) {
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<DeletedRecord::TmpBitmap>
|
||||
get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false);
|
||||
|
||||
Status
|
||||
QueryImpl(query::QueryDeprecatedPtr query, Timestamp timestamp, QueryResult& results);
|
||||
|
||||
Status
|
||||
QuerySlowImpl(query::QueryDeprecatedPtr query, Timestamp timestamp, QueryResult& results);
|
||||
|
||||
Status
|
||||
QueryBruteForceImpl(query::QueryDeprecatedPtr query, Timestamp timestamp, QueryResult& results);
|
||||
|
||||
template <typename Type>
|
||||
knowhere::IndexPtr
|
||||
BuildVecIndexImpl(const IndexMeta::Entry& entry);
|
||||
|
||||
private:
|
||||
SchemaPtr schema_;
|
||||
std::atomic<SegmentState> state_ = SegmentState::Open;
|
||||
InsertRecord record_;
|
||||
DeletedRecord deleted_record_;
|
||||
|
||||
std::atomic<bool> index_ready_ = false;
|
||||
IndexMetaPtr index_meta_;
|
||||
std::unordered_map<std::string, knowhere::IndexPtr> indexings_; // index_name => indexing
|
||||
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
|
||||
};
|
||||
} // namespace milvus::segcore
|
||||
|
||||
#endif
|
@ -238,49 +238,6 @@ DropSealedSegmentIndex(CSegmentInterface c_segment, int64_t field_id) {
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////// deprecated interfaces //////////////////////////////
|
||||
CStatus
|
||||
UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info) {
|
||||
auto status = CStatus();
|
||||
try {
|
||||
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
|
||||
auto segment = dynamic_cast<milvus::segcore::SegmentGrowing*>(segment_interface);
|
||||
AssertInfo(segment != nullptr, "segment conversion failed");
|
||||
auto load_index_info = (LoadIndexInfo*)c_load_index_info;
|
||||
segment->LoadIndexing(*load_index_info);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(UnexpectedError, e.what());
|
||||
}
|
||||
}
|
||||
|
||||
CStatus
|
||||
LoadSealedSegmentMeta(CSegmentInterface c_segment, CProto LoadSegmentMetaProto) {
|
||||
try {
|
||||
auto segment_raw = (const milvus::segcore::SegmentGrowing*)c_segment;
|
||||
auto segment = dynamic_cast<const milvus::segcore::SegmentSealed*>(segment_raw);
|
||||
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
// TODO
|
||||
return milvus::FailureCStatus(UnexpectedError, e.what());
|
||||
}
|
||||
}
|
||||
|
||||
int
|
||||
Close(CSegmentInterface c_segment) {
|
||||
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
|
||||
auto status = segment->Close();
|
||||
return status.code();
|
||||
}
|
||||
|
||||
bool
|
||||
IsOpened(CSegmentInterface c_segment) {
|
||||
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
|
||||
auto status = segment->get_state();
|
||||
return status == milvus::segcore::SegmentGrowing::SegmentState::Open;
|
||||
}
|
||||
|
||||
CProtoResult
|
||||
GetEntityByIds(CSegmentInterface c_segment, CRetrievePlan c_plan, uint64_t timestamp) {
|
||||
try {
|
||||
|
@ -95,16 +95,6 @@ DropFieldData(CSegmentInterface c_segment, int64_t field_id);
|
||||
CStatus
|
||||
DropSealedSegmentIndex(CSegmentInterface c_segment, int64_t field_id);
|
||||
|
||||
////////////////////////////// deprecated interfaces //////////////////////////////
|
||||
CStatus
|
||||
UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info);
|
||||
|
||||
int
|
||||
Close(CSegmentInterface c_segment);
|
||||
|
||||
bool
|
||||
IsOpened(CSegmentInterface c_segment);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
@ -320,28 +320,6 @@ TEST(CApiTest, SearchTestWithExpr) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, IsOpenedTest) {
|
||||
auto collection = NewCollection(get_default_schema_config());
|
||||
auto segment = NewSegment(collection, 0, Growing);
|
||||
|
||||
auto is_opened = IsOpened(segment);
|
||||
assert(is_opened);
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, CloseTest) {
|
||||
auto collection = NewCollection(get_default_schema_config());
|
||||
auto segment = NewSegment(collection, 0, Growing);
|
||||
|
||||
auto status = Close(segment);
|
||||
assert(status == 0);
|
||||
|
||||
DeleteCollection(collection);
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, GetMemoryUsageInBytesTest) {
|
||||
auto collection = NewCollection(get_default_schema_config());
|
||||
auto segment = NewSegment(collection, 0, Growing);
|
||||
@ -868,7 +846,7 @@ TEST(CApiTest, LoadIndex_Search) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_Without_Predicate) {
|
||||
TEST(CApiTest, Indexing_Without_Predicate) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -970,11 +948,10 @@ TEST(CApiTest, UpdateSegmentIndex_Without_Predicate) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_raw_index_json = SearchResultToJson(*search_result_on_raw_index);
|
||||
@ -993,7 +970,7 @@ TEST(CApiTest, UpdateSegmentIndex_Without_Predicate) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_Expr_Without_Predicate) {
|
||||
TEST(CApiTest, Indexing_Expr_Without_Predicate) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -1090,11 +1067,10 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_Without_Predicate) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_raw_index_json = SearchResultToJson(*search_result_on_raw_index);
|
||||
@ -1113,7 +1089,7 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_Without_Predicate) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Range) {
|
||||
TEST(CApiTest, Indexing_With_float_Predicate_Range) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -1227,11 +1203,10 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Range) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_bigIndex = (*(SearchResult*)c_search_result_on_bigIndex);
|
||||
@ -1251,7 +1226,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Range) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_Expr_With_float_Predicate_Range) {
|
||||
TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -1266,11 +1241,13 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_With_float_Predicate_Range) {
|
||||
auto vec_col = dataset.get_col<float>(0);
|
||||
auto query_ptr = vec_col.data() + 420000 * DIM;
|
||||
|
||||
int64_t offset;
|
||||
PreInsert(segment, N, &offset);
|
||||
auto ins_res = Insert(segment, 0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_.raw_data,
|
||||
dataset.raw_.sizeof_per_row, dataset.raw_.count);
|
||||
assert(ins_res.error_code == Success);
|
||||
{
|
||||
int64_t offset;
|
||||
PreInsert(segment, N, &offset);
|
||||
auto ins_res = Insert(segment, 0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_.raw_data,
|
||||
dataset.raw_.sizeof_per_row, dataset.raw_.count);
|
||||
assert(ins_res.error_code == Success);
|
||||
}
|
||||
|
||||
const char* serialized_expr_plan = R"(vector_anns: <
|
||||
field_id: 100
|
||||
@ -1378,11 +1355,10 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_With_float_Predicate_Range) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_bigIndex = (*(SearchResult*)c_search_result_on_bigIndex);
|
||||
@ -1402,7 +1378,7 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_With_float_Predicate_Range) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Term) {
|
||||
TEST(CApiTest, Indexing_With_float_Predicate_Term) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -1515,11 +1491,10 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Term) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_bigIndex = (*(SearchResult*)c_search_result_on_bigIndex);
|
||||
@ -1539,7 +1514,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Term) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_Expr_With_float_Predicate_Term) {
|
||||
TEST(CApiTest, Indexing_Expr_With_float_Predicate_Term) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -1717,11 +1692,10 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_With_float_Predicate_Term) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_bigIndex = (*(SearchResult*)c_search_result_on_bigIndex);
|
||||
@ -1741,7 +1715,7 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_With_float_Predicate_Term) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Range) {
|
||||
TEST(CApiTest, Indexing_With_binary_Predicate_Range) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -1856,11 +1830,10 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Range) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_bigIndex = (*(SearchResult*)c_search_result_on_bigIndex);
|
||||
@ -1880,7 +1853,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Range) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_Expr_With_binary_Predicate_Range) {
|
||||
TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Range) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -2008,11 +1981,10 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_With_binary_Predicate_Range) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_bigIndex = (*(SearchResult*)c_search_result_on_bigIndex);
|
||||
@ -2032,7 +2004,7 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_With_binary_Predicate_Range) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Term) {
|
||||
TEST(CApiTest, Indexing_With_binary_Predicate_Term) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -2146,11 +2118,10 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Term) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
std::vector<CSearchResult> results;
|
||||
@ -2177,7 +2148,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Term) {
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, UpdateSegmentIndex_Expr_With_binary_Predicate_Term) {
|
||||
TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) {
|
||||
// insert data to segment
|
||||
constexpr auto DIM = 16;
|
||||
constexpr auto K = 5;
|
||||
@ -2356,11 +2327,10 @@ TEST(CApiTest, UpdateSegmentIndex_Expr_With_binary_Predicate_Term) {
|
||||
AppendFieldInfo(c_load_index_info, 100);
|
||||
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
|
||||
|
||||
status = UpdateSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
std::vector<CSearchResult> results;
|
||||
@ -2567,11 +2537,10 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) {
|
||||
status = LoadFieldData(segment, c_ts_field_data);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
status = UpdateSealedSegmentIndex(segment, c_load_index_info);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto sealed_segment = SealedCreator(schema, dataset, *(LoadIndexInfo*)c_load_index_info);
|
||||
CSearchResult c_search_result_on_bigIndex;
|
||||
auto res_after_load_index = Search(segment, plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
auto res_after_load_index =
|
||||
Search(sealed_segment.get(), plan, placeholderGroup, time, &c_search_result_on_bigIndex);
|
||||
assert(res_after_load_index.error_code == Success);
|
||||
|
||||
auto search_result_on_bigIndex = (*(SearchResult*)c_search_result_on_bigIndex);
|
||||
|
@ -109,10 +109,8 @@ TEST(Sealed, without_predicate) {
|
||||
load_info.index = indexing;
|
||||
load_info.index_params["metric_type"] = "L2";
|
||||
|
||||
segment->LoadIndexing(load_info);
|
||||
sr = SearchResult();
|
||||
|
||||
sr = segment->Search(plan.get(), *ph_group, time);
|
||||
auto sealed_segment = SealedCreator(schema, dataset, load_info);
|
||||
sr = sealed_segment->Search(plan.get(), *ph_group, time);
|
||||
|
||||
auto post_result = SearchResultToJson(sr);
|
||||
std::cout << ref_result.dump(1);
|
||||
@ -201,10 +199,8 @@ TEST(Sealed, with_predicate) {
|
||||
load_info.index = indexing;
|
||||
load_info.index_params["metric_type"] = "L2";
|
||||
|
||||
segment->LoadIndexing(load_info);
|
||||
sr = SearchResult();
|
||||
|
||||
sr = segment->Search(plan.get(), *ph_group, time);
|
||||
auto sealed_segment = SealedCreator(schema, dataset, load_info);
|
||||
sr = sealed_segment->Search(plan.get(), *ph_group, time);
|
||||
|
||||
auto post_sr = sr;
|
||||
for (int i = 0; i < num_queries; ++i) {
|
||||
|
@ -107,7 +107,6 @@ TEST(SegmentCoreTest, MockTest) {
|
||||
segment->Insert(offset, N, uids.data(), timestamps.data(), data_chunk);
|
||||
SearchResult search_result;
|
||||
// segment->Query(nullptr, 0, query_result);
|
||||
segment->Close();
|
||||
// segment->BuildIndex();
|
||||
int i = 0;
|
||||
i++;
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "segcore/SegmentSealed.h"
|
||||
#include "Constants.h"
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include "segcore/SegmentSealed.h"
|
||||
|
||||
#include <knowhere/index/vector_index/VecIndex.h>
|
||||
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
|
||||
@ -321,6 +322,14 @@ SealedLoader(const GeneratedData& dataset, SegmentSealed& seg) {
|
||||
}
|
||||
}
|
||||
|
||||
inline std::unique_ptr<SegmentSealed>
|
||||
SealedCreator(SchemaPtr schema, const GeneratedData& dataset, const LoadIndexInfo& index_info) {
|
||||
auto segment = CreateSealedSegment(schema);
|
||||
SealedLoader(dataset, *segment);
|
||||
segment->LoadIndex(index_info);
|
||||
return segment;
|
||||
}
|
||||
|
||||
inline knowhere::VecIndexPtr
|
||||
GenIndexing(int64_t N, int64_t dim, const float* vec) {
|
||||
// {knowhere::IndexParams::nprobe, 10},
|
||||
|
@ -72,7 +72,7 @@ func TestHistorical_GlobalSealedSegments(t *testing.T) {
|
||||
err := n.etcdKV.Save(segmentKey, segmentInfoStr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
time.Sleep(100 * time.Millisecond) // for etcd latency
|
||||
time.Sleep(500 * time.Millisecond) // for etcd latency
|
||||
segmentIDs = n.historical.getGlobalSegmentIDsByCollectionID(collectionID)
|
||||
assert.Equal(t, 1, len(segmentIDs))
|
||||
assert.Equal(t, segmentIDs[0], segmentID)
|
||||
|
Loading…
Reference in New Issue
Block a user