diff --git a/internal/core/src/indexbuilder/IndexWrapper.cpp b/internal/core/src/indexbuilder/IndexWrapper.cpp index 157f2e1674..fcced635e0 100644 --- a/internal/core/src/indexbuilder/IndexWrapper.cpp +++ b/internal/core/src/indexbuilder/IndexWrapper.cpp @@ -38,94 +38,95 @@ IndexWrapper::IndexWrapper(const char* serialized_type_params, const char* seria Assert(index_ != nullptr); } +template // ugly here, ParamsT will just be MapParams later void -IndexWrapper::parse() { - namespace indexcgo = milvus::proto::indexcgo; +IndexWrapper::parse_impl(const std::string& serialized_params_str, knowhere::Config& conf) { bool deserialized_success; - indexcgo::TypeParams type_config; - deserialized_success = google::protobuf::TextFormat::ParseFromString(type_params_, &type_config); + ParamsT params; + deserialized_success = google::protobuf::TextFormat::ParseFromString(serialized_params_str, ¶ms); Assert(deserialized_success); - indexcgo::IndexParams index_config; - deserialized_success = google::protobuf::TextFormat::ParseFromString(index_params_, &index_config); - Assert(deserialized_success); - - for (auto i = 0; i < type_config.params_size(); ++i) { - const auto& type_param = type_config.params(i); - const auto& key = type_param.key(); - const auto& value = type_param.value(); - type_config_[key] = value; - config_[key] = value; - } - - for (auto i = 0; i < index_config.params_size(); ++i) { - const auto& index_param = index_config.params(i); - const auto& key = index_param.key(); - const auto& value = index_param.value(); - index_config_[key] = value; - config_[key] = value; + for (auto i = 0; i < params.params_size(); ++i) { + const auto& param = params.params(i); + const auto& key = param.key(); + const auto& value = param.value(); + conf[key] = value; } auto stoi_closure = [](const std::string& s) -> int { return std::stoi(s); }; /***************************** meta *******************************/ - check_parameter(milvus::knowhere::meta::DIM, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::meta::TOPK, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::meta::DIM, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::meta::TOPK, stoi_closure, std::nullopt); /***************************** IVF Params *******************************/ - check_parameter(milvus::knowhere::IndexParams::nprobe, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::nlist, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::m, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::nbits, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::nprobe, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::nlist, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::m, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::nbits, stoi_closure, std::nullopt); /************************** NSG Parameter **************************/ - check_parameter(milvus::knowhere::IndexParams::knng, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::search_length, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::out_degree, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::candidate, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::knng, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::search_length, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::out_degree, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::candidate, stoi_closure, std::nullopt); /************************** HNSW Params *****************************/ - check_parameter(milvus::knowhere::IndexParams::efConstruction, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::M, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::ef, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::efConstruction, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::M, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::ef, stoi_closure, std::nullopt); /************************** Annoy Params *****************************/ - check_parameter(milvus::knowhere::IndexParams::n_trees, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::search_k, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::n_trees, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::search_k, stoi_closure, std::nullopt); /************************** PQ Params *****************************/ - check_parameter(milvus::knowhere::IndexParams::PQM, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::PQM, stoi_closure, std::nullopt); /************************** NGT Params *****************************/ - check_parameter(milvus::knowhere::IndexParams::edge_size, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::edge_size, stoi_closure, std::nullopt); /************************** NGT Search Params *****************************/ - check_parameter(milvus::knowhere::IndexParams::epsilon, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::max_search_edges, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::epsilon, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::max_search_edges, stoi_closure, std::nullopt); /************************** NGT_PANNG Params *****************************/ - check_parameter(milvus::knowhere::IndexParams::forcedly_pruned_edge_size, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::selectively_pruned_edge_size, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::forcedly_pruned_edge_size, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::selectively_pruned_edge_size, stoi_closure, std::nullopt); /************************** NGT_ONNG Params *****************************/ - check_parameter(milvus::knowhere::IndexParams::outgoing_edge_size, stoi_closure, std::nullopt); - check_parameter(milvus::knowhere::IndexParams::incoming_edge_size, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::outgoing_edge_size, stoi_closure, std::nullopt); + check_parameter(conf, milvus::knowhere::IndexParams::incoming_edge_size, stoi_closure, std::nullopt); /************************** Serialize Params *******************************/ - check_parameter(milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, stoi_closure, std::optional{4}); + check_parameter(conf, milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, stoi_closure, std::optional{4}); +} + +void +IndexWrapper::parse() { + namespace indexcgo = milvus::proto::indexcgo; + + parse_impl(type_params_, type_config_); + parse_impl(index_params_, index_config_); + + config_.update(type_config_); // just like dict().update in Python, amazing + config_.update(index_config_); } template void -IndexWrapper::check_parameter(const std::string& key, std::function fn, std::optional default_v) { - if (!config_.contains(key)) { +IndexWrapper::check_parameter(knowhere::Config& conf, + const std::string& key, + std::function fn, + std::optional default_v) { + if (!conf.contains(key)) { if (default_v.has_value()) { - config_[key] = default_v.value(); + conf[key] = default_v.value(); } } else { - auto value = config_[key]; - config_[key] = fn(value); + auto value = conf[key]; + conf[key] = fn(value); } } @@ -257,5 +258,38 @@ IndexWrapper::get_index_type() { return type.has_value() ? type.value() : knowhere::IndexEnum::INDEX_FAISS_IVFPQ; } +std::unique_ptr +IndexWrapper::Query(const knowhere::DatasetPtr& dataset) { + return std::move(QueryImpl(dataset, config_)); +} + +std::unique_ptr +IndexWrapper::QueryWithParam(const knowhere::DatasetPtr& dataset, const char* serialized_search_params) { + namespace indexcgo = milvus::proto::indexcgo; + milvus::knowhere::Config search_conf; + parse_impl(std::string(serialized_search_params), search_conf); + + return std::move(QueryImpl(dataset, search_conf)); +} + +std::unique_ptr +IndexWrapper::QueryImpl(const knowhere::DatasetPtr& dataset, const knowhere::Config& conf) { + auto res = index_->Query(dataset, conf, nullptr); + auto ids = res->Get(milvus::knowhere::meta::IDS); + auto distances = res->Get(milvus::knowhere::meta::DISTANCE); + auto nq = dataset->Get(milvus::knowhere::meta::ROWS); + auto k = config_[milvus::knowhere::meta::TOPK].get(); + + auto query_res = std::make_unique(); + query_res->nq = nq; + query_res->topk = k; + query_res->ids.resize(nq * k); + query_res->distances.resize(nq * k); + memcpy(query_res->ids.data(), ids, sizeof(int64_t) * nq * k); + memcpy(query_res->distances.data(), distances, sizeof(float) * nq * k); + + return std::move(query_res); +} + } // namespace indexbuilder } // namespace milvus diff --git a/internal/core/src/indexbuilder/IndexWrapper.h b/internal/core/src/indexbuilder/IndexWrapper.h index f0d75da54c..65c6f149fe 100644 --- a/internal/core/src/indexbuilder/IndexWrapper.h +++ b/internal/core/src/indexbuilder/IndexWrapper.h @@ -12,6 +12,7 @@ #include #include #include +#include #include "knowhere/index/vector_index/VecIndex.h" namespace milvus { @@ -38,6 +39,19 @@ class IndexWrapper { void Load(const char* serialized_sliced_blob_buffer, int32_t size); + struct QueryResult { + std::vector ids; + std::vector distances; + int64_t nq; + int64_t topk; + }; + + std::unique_ptr + Query(const knowhere::DatasetPtr& dataset); + + std::unique_ptr + QueryWithParam(const knowhere::DatasetPtr& dataset, const char* serialized_search_params); + private: void parse(); @@ -54,10 +68,18 @@ class IndexWrapper { template void - check_parameter(const std::string& key, + check_parameter(knowhere::Config& conf, + const std::string& key, std::function fn, std::optional default_v = std::nullopt); + template + void + parse_impl(const std::string& serialized_params_str, knowhere::Config& conf); + + std::unique_ptr + QueryImpl(const knowhere::DatasetPtr& dataset, const knowhere::Config& conf); + public: void BuildWithIds(const knowhere::DatasetPtr& dataset); diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 26d766401f..217372700b 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -115,3 +115,153 @@ LoadFromSlicedBuffer(CIndex index, const char* serialized_sliced_blob_buffer, in } return status; } + +CStatus +QueryOnFloatVecIndex(CIndex index, int64_t float_value_num, const float* vectors, CIndexQueryResult* res) { + auto status = CStatus(); + try { + auto cIndex = (milvus::indexbuilder::IndexWrapper*)index; + auto dim = cIndex->dim(); + auto row_nums = float_value_num / dim; + auto query_ds = milvus::knowhere::GenDataset(row_nums, dim, vectors); + auto query_res = cIndex->Query(query_ds); + *res = query_res.release(); + + status.error_code = Success; + status.error_msg = ""; + } catch (std::runtime_error& e) { + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + } + return status; +} + +CStatus +QueryOnFloatVecIndexWithParam(CIndex index, + int64_t float_value_num, + const float* vectors, + const char* serialized_search_params, + CIndexQueryResult* res) { + auto status = CStatus(); + try { + auto cIndex = (milvus::indexbuilder::IndexWrapper*)index; + auto dim = cIndex->dim(); + auto row_nums = float_value_num / dim; + auto query_ds = milvus::knowhere::GenDataset(row_nums, dim, vectors); + auto query_res = cIndex->QueryWithParam(query_ds, serialized_search_params); + *res = query_res.release(); + + status.error_code = Success; + status.error_msg = ""; + } catch (std::runtime_error& e) { + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + } + return status; +} + +CStatus +QueryOnBinaryVecIndex(CIndex index, int64_t data_size, const uint8_t* vectors, CIndexQueryResult* res) { + auto status = CStatus(); + try { + auto cIndex = (milvus::indexbuilder::IndexWrapper*)index; + auto dim = cIndex->dim(); + auto row_nums = (data_size * 8) / dim; + auto query_ds = milvus::knowhere::GenDataset(row_nums, dim, vectors); + auto query_res = cIndex->Query(query_ds); + *res = query_res.release(); + + status.error_code = Success; + status.error_msg = ""; + } catch (std::runtime_error& e) { + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + } + return status; +} + +CStatus +QueryOnBinaryVecIndexWithParam(CIndex index, + int64_t data_size, + const uint8_t* vectors, + const char* serialized_search_params, + CIndexQueryResult* res) { + auto status = CStatus(); + try { + auto cIndex = (milvus::indexbuilder::IndexWrapper*)index; + auto dim = cIndex->dim(); + auto row_nums = (data_size * 8) / dim; + auto query_ds = milvus::knowhere::GenDataset(row_nums, dim, vectors); + auto query_res = cIndex->QueryWithParam(query_ds, serialized_search_params); + *res = query_res.release(); + + status.error_code = Success; + status.error_msg = ""; + } catch (std::runtime_error& e) { + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + } + return status; +} + +CStatus +CreateQueryResult(CIndexQueryResult* res) { + auto status = CStatus(); + try { + auto query_result = std::make_unique(); + *res = query_result.release(); + + status.error_code = Success; + status.error_msg = ""; + } catch (std::runtime_error& e) { + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + } + return status; +} + +int64_t +NqOfQueryResult(CIndexQueryResult res) { + auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res; + return c_res->nq; +} + +int64_t +TopkOfQueryResult(CIndexQueryResult res) { + auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res; + return c_res->topk; +} + +void +GetIdsOfQueryResult(CIndexQueryResult res, int64_t* ids) { + auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res; + auto nq = c_res->nq; + auto k = c_res->topk; + // TODO: how could we avoid memory copy every time when this called + memcpy(ids, c_res->ids.data(), sizeof(int64_t) * nq * k); +} + +void +GetDistancesOfQueryResult(CIndexQueryResult res, float* distances) { + auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res; + auto nq = c_res->nq; + auto k = c_res->topk; + // TODO: how could we avoid memory copy every time when this called + memcpy(distances, c_res->distances.data(), sizeof(float) * nq * k); +} + +CStatus +DeleteQueryResult(CIndexQueryResult res) { + auto status = CStatus(); + try { + auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res; + delete c_res; + + status.error_code = Success; + status.error_msg = ""; + } catch (std::runtime_error& e) { + status.error_code = UnexpectedException; + status.error_msg = strdup(e.what()); + } + return status; +} diff --git a/internal/core/src/indexbuilder/index_c.h b/internal/core/src/indexbuilder/index_c.h index 2e8809f3d6..8acc626a13 100644 --- a/internal/core/src/indexbuilder/index_c.h +++ b/internal/core/src/indexbuilder/index_c.h @@ -31,6 +31,7 @@ extern "C" { #include "common/status_c.h" typedef void* CIndex; +typedef void* CIndexQueryResult; // TODO: how could we pass map between go and c++ more efficiently? // Solution: using protobuf instead of json, this way significantly increase programming efficiency @@ -53,6 +54,44 @@ SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer); CStatus LoadFromSlicedBuffer(CIndex index, const char* serialized_sliced_blob_buffer, int32_t size); +CStatus +QueryOnFloatVecIndex(CIndex index, int64_t float_value_num, const float* vectors, CIndexQueryResult* res); + +CStatus +QueryOnFloatVecIndexWithParam(CIndex index, + int64_t float_value_num, + const float* vectors, + const char* serialized_search_params, + CIndexQueryResult* res); + +CStatus +QueryOnBinaryVecIndex(CIndex index, int64_t data_size, const uint8_t* vectors, CIndexQueryResult* res); + +CStatus +QueryOnBinaryVecIndexWithParam(CIndex index, + int64_t data_size, + const uint8_t* vectors, + const char* serialized_search_params, + CIndexQueryResult* res); + +CStatus +CreateQueryResult(CIndexQueryResult* res); + +int64_t +NqOfQueryResult(CIndexQueryResult res); + +int64_t +TopkOfQueryResult(CIndexQueryResult res); + +void +GetIdsOfQueryResult(CIndexQueryResult res, int64_t* ids); + +void +GetDistancesOfQueryResult(CIndexQueryResult res, float* distances); + +CStatus +DeleteQueryResult(CIndexQueryResult res); + #ifdef __cplusplus }; #endif diff --git a/internal/core/src/pb/index_cgo_msg.pb.cc b/internal/core/src/pb/index_cgo_msg.pb.cc index f79da963e1..834a834756 100644 --- a/internal/core/src/pb/index_cgo_msg.pb.cc +++ b/internal/core/src/pb/index_cgo_msg.pb.cc @@ -28,6 +28,10 @@ class IndexParamsDefaultTypeInternal { public: ::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed _instance; } _IndexParams_default_instance_; +class MapParamsDefaultTypeInternal { + public: + ::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed _instance; +} _MapParams_default_instance_; class BinaryDefaultTypeInternal { public: ::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed _instance; @@ -83,6 +87,21 @@ static void InitDefaultsscc_info_IndexParams_index_5fcgo_5fmsg_2eproto() { {{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_IndexParams_index_5fcgo_5fmsg_2eproto}, { &scc_info_KeyValuePair_common_2eproto.base,}}; +static void InitDefaultsscc_info_MapParams_index_5fcgo_5fmsg_2eproto() { + GOOGLE_PROTOBUF_VERIFY_VERSION; + + { + void* ptr = &::milvus::proto::indexcgo::_MapParams_default_instance_; + new (ptr) ::milvus::proto::indexcgo::MapParams(); + ::PROTOBUF_NAMESPACE_ID::internal::OnShutdownDestroyMessage(ptr); + } + ::milvus::proto::indexcgo::MapParams::InitAsDefaultInstance(); +} + +::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<1> scc_info_MapParams_index_5fcgo_5fmsg_2eproto = + {{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_MapParams_index_5fcgo_5fmsg_2eproto}, { + &scc_info_KeyValuePair_common_2eproto.base,}}; + static void InitDefaultsscc_info_TypeParams_index_5fcgo_5fmsg_2eproto() { GOOGLE_PROTOBUF_VERIFY_VERSION; @@ -98,7 +117,7 @@ static void InitDefaultsscc_info_TypeParams_index_5fcgo_5fmsg_2eproto() { {{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_TypeParams_index_5fcgo_5fmsg_2eproto}, { &scc_info_KeyValuePair_common_2eproto.base,}}; -static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_index_5fcgo_5fmsg_2eproto[4]; +static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_index_5fcgo_5fmsg_2eproto[5]; static constexpr ::PROTOBUF_NAMESPACE_ID::EnumDescriptor const** file_level_enum_descriptors_index_5fcgo_5fmsg_2eproto = nullptr; static constexpr ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor const** file_level_service_descriptors_index_5fcgo_5fmsg_2eproto = nullptr; @@ -116,6 +135,12 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_index_5fcgo_5fmsg_2eproto::off ~0u, // no _weak_field_map_ PROTOBUF_FIELD_OFFSET(::milvus::proto::indexcgo::IndexParams, params_), ~0u, // no _has_bits_ + PROTOBUF_FIELD_OFFSET(::milvus::proto::indexcgo::MapParams, _internal_metadata_), + ~0u, // no _extensions_ + ~0u, // no _oneof_case_ + ~0u, // no _weak_field_map_ + PROTOBUF_FIELD_OFFSET(::milvus::proto::indexcgo::MapParams, params_), + ~0u, // no _has_bits_ PROTOBUF_FIELD_OFFSET(::milvus::proto::indexcgo::Binary, _internal_metadata_), ~0u, // no _extensions_ ~0u, // no _oneof_case_ @@ -132,13 +157,15 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_index_5fcgo_5fmsg_2eproto::off static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { { 0, -1, sizeof(::milvus::proto::indexcgo::TypeParams)}, { 6, -1, sizeof(::milvus::proto::indexcgo::IndexParams)}, - { 12, -1, sizeof(::milvus::proto::indexcgo::Binary)}, - { 19, -1, sizeof(::milvus::proto::indexcgo::BinarySet)}, + { 12, -1, sizeof(::milvus::proto::indexcgo::MapParams)}, + { 18, -1, sizeof(::milvus::proto::indexcgo::Binary)}, + { 25, -1, sizeof(::milvus::proto::indexcgo::BinarySet)}, }; static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = { reinterpret_cast(&::milvus::proto::indexcgo::_TypeParams_default_instance_), reinterpret_cast(&::milvus::proto::indexcgo::_IndexParams_default_instance_), + reinterpret_cast(&::milvus::proto::indexcgo::_MapParams_default_instance_), reinterpret_cast(&::milvus::proto::indexcgo::_Binary_default_instance_), reinterpret_cast(&::milvus::proto::indexcgo::_BinarySet_default_instance_), }; @@ -148,29 +175,31 @@ const char descriptor_table_protodef_index_5fcgo_5fmsg_2eproto[] PROTOBUF_SECTIO "xcgo\032\014common.proto\"\?\n\nTypeParams\0221\n\006para" "ms\030\001 \003(\0132!.milvus.proto.common.KeyValueP" "air\"@\n\013IndexParams\0221\n\006params\030\001 \003(\0132!.mil" - "vus.proto.common.KeyValuePair\"$\n\006Binary\022" - "\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\014\"9\n\tBinarySet" - "\022,\n\005datas\030\001 \003(\0132\035.milvus.proto.indexcgo." - "BinaryBDZBgithub.com/zilliztech/milvus-d" - "istributed/internal/proto/indexcgopbb\006pr" - "oto3" + "vus.proto.common.KeyValuePair\">\n\tMapPara" + "ms\0221\n\006params\030\001 \003(\0132!.milvus.proto.common" + ".KeyValuePair\"$\n\006Binary\022\013\n\003key\030\001 \001(\t\022\r\n\005" + "value\030\002 \001(\014\"9\n\tBinarySet\022,\n\005datas\030\001 \003(\0132" + "\035.milvus.proto.indexcgo.BinaryBDZBgithub" + ".com/zilliztech/milvus-distributed/inter" + "nal/proto/indexcgopbb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_index_5fcgo_5fmsg_2eproto_deps[1] = { &::descriptor_table_common_2eproto, }; -static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_index_5fcgo_5fmsg_2eproto_sccs[4] = { +static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_index_5fcgo_5fmsg_2eproto_sccs[5] = { &scc_info_Binary_index_5fcgo_5fmsg_2eproto.base, &scc_info_BinarySet_index_5fcgo_5fmsg_2eproto.base, &scc_info_IndexParams_index_5fcgo_5fmsg_2eproto.base, + &scc_info_MapParams_index_5fcgo_5fmsg_2eproto.base, &scc_info_TypeParams_index_5fcgo_5fmsg_2eproto.base, }; static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_index_5fcgo_5fmsg_2eproto_once; static bool descriptor_table_index_5fcgo_5fmsg_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_index_5fcgo_5fmsg_2eproto = { - &descriptor_table_index_5fcgo_5fmsg_2eproto_initialized, descriptor_table_protodef_index_5fcgo_5fmsg_2eproto, "index_cgo_msg.proto", 364, - &descriptor_table_index_5fcgo_5fmsg_2eproto_once, descriptor_table_index_5fcgo_5fmsg_2eproto_sccs, descriptor_table_index_5fcgo_5fmsg_2eproto_deps, 4, 1, + &descriptor_table_index_5fcgo_5fmsg_2eproto_initialized, descriptor_table_protodef_index_5fcgo_5fmsg_2eproto, "index_cgo_msg.proto", 428, + &descriptor_table_index_5fcgo_5fmsg_2eproto_once, descriptor_table_index_5fcgo_5fmsg_2eproto_sccs, descriptor_table_index_5fcgo_5fmsg_2eproto_deps, 5, 1, schemas, file_default_instances, TableStruct_index_5fcgo_5fmsg_2eproto::offsets, - file_level_metadata_index_5fcgo_5fmsg_2eproto, 4, file_level_enum_descriptors_index_5fcgo_5fmsg_2eproto, file_level_service_descriptors_index_5fcgo_5fmsg_2eproto, + file_level_metadata_index_5fcgo_5fmsg_2eproto, 5, file_level_enum_descriptors_index_5fcgo_5fmsg_2eproto, file_level_service_descriptors_index_5fcgo_5fmsg_2eproto, }; // Force running AddDescriptors() at dynamic initialization time. @@ -707,6 +736,270 @@ void IndexParams::InternalSwap(IndexParams* other) { } +// =================================================================== + +void MapParams::InitAsDefaultInstance() { +} +class MapParams::_Internal { + public: +}; + +void MapParams::clear_params() { + params_.Clear(); +} +MapParams::MapParams() + : ::PROTOBUF_NAMESPACE_ID::Message(), _internal_metadata_(nullptr) { + SharedCtor(); + // @@protoc_insertion_point(constructor:milvus.proto.indexcgo.MapParams) +} +MapParams::MapParams(const MapParams& from) + : ::PROTOBUF_NAMESPACE_ID::Message(), + _internal_metadata_(nullptr), + params_(from.params_) { + _internal_metadata_.MergeFrom(from._internal_metadata_); + // @@protoc_insertion_point(copy_constructor:milvus.proto.indexcgo.MapParams) +} + +void MapParams::SharedCtor() { + ::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_MapParams_index_5fcgo_5fmsg_2eproto.base); +} + +MapParams::~MapParams() { + // @@protoc_insertion_point(destructor:milvus.proto.indexcgo.MapParams) + SharedDtor(); +} + +void MapParams::SharedDtor() { +} + +void MapParams::SetCachedSize(int size) const { + _cached_size_.Set(size); +} +const MapParams& MapParams::default_instance() { + ::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&::scc_info_MapParams_index_5fcgo_5fmsg_2eproto.base); + return *internal_default_instance(); +} + + +void MapParams::Clear() { +// @@protoc_insertion_point(message_clear_start:milvus.proto.indexcgo.MapParams) + ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + params_.Clear(); + _internal_metadata_.Clear(); +} + +#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER +const char* MapParams::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) { +#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure + while (!ctx->Done(&ptr)) { + ::PROTOBUF_NAMESPACE_ID::uint32 tag; + ptr = ::PROTOBUF_NAMESPACE_ID::internal::ReadTag(ptr, &tag); + CHK_(ptr); + switch (tag >> 3) { + // repeated .milvus.proto.common.KeyValuePair params = 1; + case 1: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 10)) { + ptr -= 1; + do { + ptr += 1; + ptr = ctx->ParseMessage(add_params(), ptr); + CHK_(ptr); + if (!ctx->DataAvailable(ptr)) break; + } while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 10); + } else goto handle_unusual; + continue; + default: { + handle_unusual: + if ((tag & 7) == 4 || tag == 0) { + ctx->SetLastTag(tag); + goto success; + } + ptr = UnknownFieldParse(tag, &_internal_metadata_, ptr, ctx); + CHK_(ptr != nullptr); + continue; + } + } // switch + } // while +success: + return ptr; +failure: + ptr = nullptr; + goto success; +#undef CHK_ +} +#else // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER +bool MapParams::MergePartialFromCodedStream( + ::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) { +#define DO_(EXPRESSION) if (!PROTOBUF_PREDICT_TRUE(EXPRESSION)) goto failure + ::PROTOBUF_NAMESPACE_ID::uint32 tag; + // @@protoc_insertion_point(parse_start:milvus.proto.indexcgo.MapParams) + for (;;) { + ::std::pair<::PROTOBUF_NAMESPACE_ID::uint32, bool> p = input->ReadTagWithCutoffNoLastTag(127u); + tag = p.first; + if (!p.second) goto handle_unusual; + switch (::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::GetTagFieldNumber(tag)) { + // repeated .milvus.proto.common.KeyValuePair params = 1; + case 1: { + if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (10 & 0xFF)) { + DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage( + input, add_params())); + } else { + goto handle_unusual; + } + break; + } + + default: { + handle_unusual: + if (tag == 0) { + goto success; + } + DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SkipField( + input, tag, _internal_metadata_.mutable_unknown_fields())); + break; + } + } + } +success: + // @@protoc_insertion_point(parse_success:milvus.proto.indexcgo.MapParams) + return true; +failure: + // @@protoc_insertion_point(parse_failure:milvus.proto.indexcgo.MapParams) + return false; +#undef DO_ +} +#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER + +void MapParams::SerializeWithCachedSizes( + ::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const { + // @@protoc_insertion_point(serialize_start:milvus.proto.indexcgo.MapParams) + ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; + (void) cached_has_bits; + + // repeated .milvus.proto.common.KeyValuePair params = 1; + for (unsigned int i = 0, + n = static_cast(this->params_size()); i < n; i++) { + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray( + 1, + this->params(static_cast(i)), + output); + } + + if (_internal_metadata_.have_unknown_fields()) { + ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields( + _internal_metadata_.unknown_fields(), output); + } + // @@protoc_insertion_point(serialize_end:milvus.proto.indexcgo.MapParams) +} + +::PROTOBUF_NAMESPACE_ID::uint8* MapParams::InternalSerializeWithCachedSizesToArray( + ::PROTOBUF_NAMESPACE_ID::uint8* target) const { + // @@protoc_insertion_point(serialize_to_array_start:milvus.proto.indexcgo.MapParams) + ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; + (void) cached_has_bits; + + // repeated .milvus.proto.common.KeyValuePair params = 1; + for (unsigned int i = 0, + n = static_cast(this->params_size()); i < n; i++) { + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite:: + InternalWriteMessageToArray( + 1, this->params(static_cast(i)), target); + } + + if (_internal_metadata_.have_unknown_fields()) { + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray( + _internal_metadata_.unknown_fields(), target); + } + // @@protoc_insertion_point(serialize_to_array_end:milvus.proto.indexcgo.MapParams) + return target; +} + +size_t MapParams::ByteSizeLong() const { +// @@protoc_insertion_point(message_byte_size_start:milvus.proto.indexcgo.MapParams) + size_t total_size = 0; + + if (_internal_metadata_.have_unknown_fields()) { + total_size += + ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::ComputeUnknownFieldsSize( + _internal_metadata_.unknown_fields()); + } + ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + // repeated .milvus.proto.common.KeyValuePair params = 1; + { + unsigned int count = static_cast(this->params_size()); + total_size += 1UL * count; + for (unsigned int i = 0; i < count; i++) { + total_size += + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize( + this->params(static_cast(i))); + } + } + + int cached_size = ::PROTOBUF_NAMESPACE_ID::internal::ToCachedSize(total_size); + SetCachedSize(cached_size); + return total_size; +} + +void MapParams::MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) { +// @@protoc_insertion_point(generalized_merge_from_start:milvus.proto.indexcgo.MapParams) + GOOGLE_DCHECK_NE(&from, this); + const MapParams* source = + ::PROTOBUF_NAMESPACE_ID::DynamicCastToGenerated( + &from); + if (source == nullptr) { + // @@protoc_insertion_point(generalized_merge_from_cast_fail:milvus.proto.indexcgo.MapParams) + ::PROTOBUF_NAMESPACE_ID::internal::ReflectionOps::Merge(from, this); + } else { + // @@protoc_insertion_point(generalized_merge_from_cast_success:milvus.proto.indexcgo.MapParams) + MergeFrom(*source); + } +} + +void MapParams::MergeFrom(const MapParams& from) { +// @@protoc_insertion_point(class_specific_merge_from_start:milvus.proto.indexcgo.MapParams) + GOOGLE_DCHECK_NE(&from, this); + _internal_metadata_.MergeFrom(from._internal_metadata_); + ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; + (void) cached_has_bits; + + params_.MergeFrom(from.params_); +} + +void MapParams::CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) { +// @@protoc_insertion_point(generalized_copy_from_start:milvus.proto.indexcgo.MapParams) + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +void MapParams::CopyFrom(const MapParams& from) { +// @@protoc_insertion_point(class_specific_copy_from_start:milvus.proto.indexcgo.MapParams) + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool MapParams::IsInitialized() const { + return true; +} + +void MapParams::InternalSwap(MapParams* other) { + using std::swap; + _internal_metadata_.Swap(&other->_internal_metadata_); + CastToBase(¶ms_)->InternalSwap(CastToBase(&other->params_)); +} + +::PROTOBUF_NAMESPACE_ID::Metadata MapParams::GetMetadata() const { + return GetMetadataStatic(); +} + + // =================================================================== void Binary::InitAsDefaultInstance() { @@ -1299,6 +1592,9 @@ template<> PROTOBUF_NOINLINE ::milvus::proto::indexcgo::TypeParams* Arena::Creat template<> PROTOBUF_NOINLINE ::milvus::proto::indexcgo::IndexParams* Arena::CreateMaybeMessage< ::milvus::proto::indexcgo::IndexParams >(Arena* arena) { return Arena::CreateInternal< ::milvus::proto::indexcgo::IndexParams >(arena); } +template<> PROTOBUF_NOINLINE ::milvus::proto::indexcgo::MapParams* Arena::CreateMaybeMessage< ::milvus::proto::indexcgo::MapParams >(Arena* arena) { + return Arena::CreateInternal< ::milvus::proto::indexcgo::MapParams >(arena); +} template<> PROTOBUF_NOINLINE ::milvus::proto::indexcgo::Binary* Arena::CreateMaybeMessage< ::milvus::proto::indexcgo::Binary >(Arena* arena) { return Arena::CreateInternal< ::milvus::proto::indexcgo::Binary >(arena); } diff --git a/internal/core/src/pb/index_cgo_msg.pb.h b/internal/core/src/pb/index_cgo_msg.pb.h index c741f2a9c1..ea6535683c 100644 --- a/internal/core/src/pb/index_cgo_msg.pb.h +++ b/internal/core/src/pb/index_cgo_msg.pb.h @@ -48,7 +48,7 @@ struct TableStruct_index_5fcgo_5fmsg_2eproto { PROTOBUF_SECTION_VARIABLE(protodesc_cold); static const ::PROTOBUF_NAMESPACE_ID::internal::AuxillaryParseTableField aux[] PROTOBUF_SECTION_VARIABLE(protodesc_cold); - static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[4] + static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[5] PROTOBUF_SECTION_VARIABLE(protodesc_cold); static const ::PROTOBUF_NAMESPACE_ID::internal::FieldMetadata field_metadata[]; static const ::PROTOBUF_NAMESPACE_ID::internal::SerializationTable serialization_table[]; @@ -67,6 +67,9 @@ extern BinarySetDefaultTypeInternal _BinarySet_default_instance_; class IndexParams; class IndexParamsDefaultTypeInternal; extern IndexParamsDefaultTypeInternal _IndexParams_default_instance_; +class MapParams; +class MapParamsDefaultTypeInternal; +extern MapParamsDefaultTypeInternal _MapParams_default_instance_; class TypeParams; class TypeParamsDefaultTypeInternal; extern TypeParamsDefaultTypeInternal _TypeParams_default_instance_; @@ -77,6 +80,7 @@ PROTOBUF_NAMESPACE_OPEN template<> ::milvus::proto::indexcgo::Binary* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::Binary>(Arena*); template<> ::milvus::proto::indexcgo::BinarySet* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::BinarySet>(Arena*); template<> ::milvus::proto::indexcgo::IndexParams* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::IndexParams>(Arena*); +template<> ::milvus::proto::indexcgo::MapParams* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::MapParams>(Arena*); template<> ::milvus::proto::indexcgo::TypeParams* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::TypeParams>(Arena*); PROTOBUF_NAMESPACE_CLOSE namespace milvus { @@ -359,6 +363,143 @@ class IndexParams : }; // ------------------------------------------------------------------- +class MapParams : + public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.indexcgo.MapParams) */ { + public: + MapParams(); + virtual ~MapParams(); + + MapParams(const MapParams& from); + MapParams(MapParams&& from) noexcept + : MapParams() { + *this = ::std::move(from); + } + + inline MapParams& operator=(const MapParams& from) { + CopyFrom(from); + return *this; + } + inline MapParams& operator=(MapParams&& from) noexcept { + if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) { + if (this != &from) InternalSwap(&from); + } else { + CopyFrom(from); + } + return *this; + } + + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() { + return GetDescriptor(); + } + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() { + return GetMetadataStatic().descriptor; + } + static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() { + return GetMetadataStatic().reflection; + } + static const MapParams& default_instance(); + + static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY + static inline const MapParams* internal_default_instance() { + return reinterpret_cast( + &_MapParams_default_instance_); + } + static constexpr int kIndexInFileMessages = + 2; + + friend void swap(MapParams& a, MapParams& b) { + a.Swap(&b); + } + inline void Swap(MapParams* other) { + if (other == this) return; + InternalSwap(other); + } + + // implements Message ---------------------------------------------- + + inline MapParams* New() const final { + return CreateMaybeMessage(nullptr); + } + + MapParams* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final { + return CreateMaybeMessage(arena); + } + void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final; + void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final; + void CopyFrom(const MapParams& from); + void MergeFrom(const MapParams& from); + PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final; + bool IsInitialized() const final; + + size_t ByteSizeLong() const final; + #if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER + const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final; + #else + bool MergePartialFromCodedStream( + ::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final; + #endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER + void SerializeWithCachedSizes( + ::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final; + ::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray( + ::PROTOBUF_NAMESPACE_ID::uint8* target) const final; + int GetCachedSize() const final { return _cached_size_.Get(); } + + private: + inline void SharedCtor(); + inline void SharedDtor(); + void SetCachedSize(int size) const final; + void InternalSwap(MapParams* other); + friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata; + static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() { + return "milvus.proto.indexcgo.MapParams"; + } + private: + inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const { + return nullptr; + } + inline void* MaybeArenaPtr() const { + return nullptr; + } + public: + + ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final; + private: + static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() { + ::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_index_5fcgo_5fmsg_2eproto); + return ::descriptor_table_index_5fcgo_5fmsg_2eproto.file_level_metadata[kIndexInFileMessages]; + } + + public: + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + enum : int { + kParamsFieldNumber = 1, + }; + // repeated .milvus.proto.common.KeyValuePair params = 1; + int params_size() const; + void clear_params(); + ::milvus::proto::common::KeyValuePair* mutable_params(int index); + ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >* + mutable_params(); + const ::milvus::proto::common::KeyValuePair& params(int index) const; + ::milvus::proto::common::KeyValuePair* add_params(); + const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >& + params() const; + + // @@protoc_insertion_point(class_scope:milvus.proto.indexcgo.MapParams) + private: + class _Internal; + + ::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_; + ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair > params_; + mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; + friend struct ::TableStruct_index_5fcgo_5fmsg_2eproto; +}; +// ------------------------------------------------------------------- + class Binary : public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.indexcgo.Binary) */ { public: @@ -401,7 +542,7 @@ class Binary : &_Binary_default_instance_); } static constexpr int kIndexInFileMessages = - 2; + 3; friend void swap(Binary& a, Binary& b) { a.Swap(&b); @@ -551,7 +692,7 @@ class BinarySet : &_BinarySet_default_instance_); } static constexpr int kIndexInFileMessages = - 3; + 4; friend void swap(BinarySet& a, BinarySet& b) { a.Swap(&b); @@ -715,6 +856,37 @@ IndexParams::params() const { // ------------------------------------------------------------------- +// MapParams + +// repeated .milvus.proto.common.KeyValuePair params = 1; +inline int MapParams::params_size() const { + return params_.size(); +} +inline ::milvus::proto::common::KeyValuePair* MapParams::mutable_params(int index) { + // @@protoc_insertion_point(field_mutable:milvus.proto.indexcgo.MapParams.params) + return params_.Mutable(index); +} +inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >* +MapParams::mutable_params() { + // @@protoc_insertion_point(field_mutable_list:milvus.proto.indexcgo.MapParams.params) + return ¶ms_; +} +inline const ::milvus::proto::common::KeyValuePair& MapParams::params(int index) const { + // @@protoc_insertion_point(field_get:milvus.proto.indexcgo.MapParams.params) + return params_.Get(index); +} +inline ::milvus::proto::common::KeyValuePair* MapParams::add_params() { + // @@protoc_insertion_point(field_add:milvus.proto.indexcgo.MapParams.params) + return params_.Add(); +} +inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >& +MapParams::params() const { + // @@protoc_insertion_point(field_list:milvus.proto.indexcgo.MapParams.params) + return params_; +} + +// ------------------------------------------------------------------- + // Binary // string key = 1; @@ -862,6 +1034,8 @@ BinarySet::datas() const { // ------------------------------------------------------------------- +// ------------------------------------------------------------------- + // @@protoc_insertion_point(namespace_scope) diff --git a/internal/core/src/segcore/reduce_c.cpp b/internal/core/src/segcore/reduce_c.cpp index 52100cbbe4..68af924649 100644 --- a/internal/core/src/segcore/reduce_c.cpp +++ b/internal/core/src/segcore/reduce_c.cpp @@ -196,7 +196,7 @@ ReorganizeQueryResults(CMarshaledHits* c_marshaled_hits, auto loc = search_result->result_offsets_[j]; result_distances[loc] = search_result->result_distances_[j]; row_datas[loc] = search_result->row_data_[j]; - result_ids[loc] = search_result->result_ids_[j]; + memcpy(&result_ids[loc], search_result->row_data_[j].data(), sizeof(int64_t)); } count += size; } diff --git a/internal/indexbuilder/index.go b/internal/indexbuilder/index.go index c84dac4394..fceec4bcf0 100644 --- a/internal/indexbuilder/index.go +++ b/internal/indexbuilder/index.go @@ -31,6 +31,10 @@ type Index interface { Load([]*Blob) error BuildFloatVecIndexWithoutIds(vectors []float32) error BuildBinaryVecIndexWithoutIds(vectors []byte) error + QueryOnFloatVecIndex(vectors []float32) (QueryResult, error) + QueryOnBinaryVecIndex(vectors []byte) (QueryResult, error) + QueryOnFloatVecIndexWithParam(vectors []float32, params map[string]string) (QueryResult, error) + QueryOnBinaryVecIndexWithParam(vectors []byte, params map[string]string) (QueryResult, error) Delete() error } @@ -132,6 +136,8 @@ func (index *CIndex) Delete() error { DeleteIndex(CIndex index); */ C.DeleteIndex(index.indexPtr) + // TODO: check if index.indexPtr will be released by golang, though it occupies little memory + // C.free(index.indexPtr) return nil } @@ -174,3 +180,115 @@ func NewCIndex(typeParams, indexParams map[string]string) (Index, error) { indexPtr: indexPtr, }, nil } + +func (index *CIndex) QueryOnFloatVecIndex(vectors []float32) (QueryResult, error) { + if len(vectors) <= 0 { + return nil, errors.New("nq is zero") + } + res, err := CreateQueryResult() + if err != nil { + return nil, err + } + fn := func() C.CStatus { + cRes, ok := res.(*CQueryResult) + if !ok { + // TODO: ugly here, fix me later + panic("only CQueryResult is supported now!") + } + return C.QueryOnFloatVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (*C.float)(&vectors[0]), &cRes.ptr) + } + err = TryCatch(fn) + if err != nil { + return nil, err + } + return res, nil +} + +func (index *CIndex) QueryOnBinaryVecIndex(vectors []byte) (QueryResult, error) { + if len(vectors) <= 0 { + return nil, errors.New("nq is zero") + } + res, err := CreateQueryResult() + if err != nil { + return nil, err + } + fn := func() C.CStatus { + cRes, ok := res.(*CQueryResult) + if !ok { + // TODO: ugly here, fix me later + panic("only CQueryResult is supported now!") + } + return C.QueryOnBinaryVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (*C.uint8_t)(&vectors[0]), &cRes.ptr) + } + err = TryCatch(fn) + if err != nil { + return nil, err + } + return res, nil +} + +func (index *CIndex) QueryOnFloatVecIndexWithParam(vectors []float32, params map[string]string) (QueryResult, error) { + if len(vectors) <= 0 { + return nil, errors.New("nq is zero") + } + + protoParams := &indexcgopb.MapParams{ + Params: make([]*commonpb.KeyValuePair, 0), + } + for key, value := range params { + protoParams.Params = append(protoParams.Params, &commonpb.KeyValuePair{Key: key, Value: value}) + } + paramsStr := proto.MarshalTextString(protoParams) + paramsPointer := C.CString(paramsStr) + + res, err := CreateQueryResult() + if err != nil { + return nil, err + } + fn := func() C.CStatus { + cRes, ok := res.(*CQueryResult) + if !ok { + // TODO: ugly here, fix me later + panic("only CQueryResult is supported now!") + } + return C.QueryOnFloatVecIndexWithParam(index.indexPtr, (C.int64_t)(len(vectors)), (*C.float)(&vectors[0]), paramsPointer, &cRes.ptr) + } + err = TryCatch(fn) + if err != nil { + return nil, err + } + return res, nil +} + +func (index *CIndex) QueryOnBinaryVecIndexWithParam(vectors []byte, params map[string]string) (QueryResult, error) { + if len(vectors) <= 0 { + return nil, errors.New("nq is zero") + } + + protoParams := &indexcgopb.MapParams{ + Params: make([]*commonpb.KeyValuePair, 0), + } + for key, value := range params { + protoParams.Params = append(protoParams.Params, &commonpb.KeyValuePair{Key: key, Value: value}) + } + paramsStr := proto.MarshalTextString(protoParams) + paramsPointer := C.CString(paramsStr) + + res, err := CreateQueryResult() + if err != nil { + return nil, err + } + fn := func() C.CStatus { + cRes, ok := res.(*CQueryResult) + if !ok { + // TODO: ugly here, fix me later + panic("only CQueryResult is supported now!") + } + return C.QueryOnBinaryVecIndexWithParam(index.indexPtr, (C.int64_t)(len(vectors)), (*C.uint8_t)(&vectors[0]), paramsPointer, &cRes.ptr) + } + err = TryCatch(fn) + if err != nil { + return nil, err + } + return res, nil +} diff --git a/internal/indexbuilder/query_result.go b/internal/indexbuilder/query_result.go new file mode 100644 index 0000000000..4e53f66505 --- /dev/null +++ b/internal/indexbuilder/query_result.go @@ -0,0 +1,103 @@ +package indexbuilder + +/* + +#cgo CFLAGS: -I${SRCDIR}/../core/output/include + +#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_indexbuilder -Wl,-rpath=${SRCDIR}/../core/output/lib + +#include // free +#include "segcore/collection_c.h" +#include "indexbuilder/index_c.h" + +*/ +import "C" +import ( + "github.com/zilliztech/milvus-distributed/internal/errors" + "strconv" + "unsafe" +) + +type QueryResult interface { + Delete() error + NQ() int64 + TOPK() int64 + IDs() []int64 + Distances() []float32 +} + +type CQueryResult struct { + ptr C.CIndexQueryResult +} + +type CFunc func() C.CStatus + +func TryCatch(fn CFunc) error { + status := fn() + errorCode := status.error_code + if errorCode != 0 { + errorMsg := C.GoString(status.error_msg) + defer C.free(unsafe.Pointer(status.error_msg)) + return errors.New("error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) + } + return nil +} + +func CreateQueryResult() (QueryResult, error) { + var ptr C.CIndexQueryResult + fn := func() C.CStatus { + return C.CreateQueryResult(&ptr) + } + err := TryCatch(fn) + if err != nil { + return nil, err + } + return &CQueryResult{ + ptr: ptr, + }, nil +} + +func (qs *CQueryResult) Delete() error { + fn := func() C.CStatus { + return C.DeleteQueryResult(qs.ptr) + } + return TryCatch(fn) +} + +func (qs *CQueryResult) NQ() int64 { + return int64(C.NqOfQueryResult(qs.ptr)) +} + +func (qs *CQueryResult) TOPK() int64 { + return int64(C.TopkOfQueryResult(qs.ptr)) +} + +func (qs *CQueryResult) IDs() []int64 { + nq := qs.NQ() + topk := qs.TOPK() + + if nq <= 0 || topk <= 0 { + return []int64{} + } + + // TODO: how could we avoid memory copy every time when this called + ids := make([]int64, nq*topk) + C.GetIdsOfQueryResult(qs.ptr, (*C.int64_t)(&ids[0])) + + return ids +} + +func (qs *CQueryResult) Distances() []float32 { + nq := qs.NQ() + topk := qs.TOPK() + + if nq <= 0 || topk <= 0 { + return []float32{} + } + + // TODO: how could we avoid memory copy every time when this called + distances := make([]float32, nq*topk) + C.GetDistancesOfQueryResult(qs.ptr, (*C.float)(&distances[0])) + + return distances +} diff --git a/internal/master/collection_task.go b/internal/master/collection_task.go index fbf4e7a281..590b36c3f8 100644 --- a/internal/master/collection_task.go +++ b/internal/master/collection_task.go @@ -71,6 +71,22 @@ func (t *createCollectionTask) Execute() error { singleFiled.FieldID = int64(index + 100) } + zeroField := &schemapb.FieldSchema{ + FieldID: int64(0), + Name: "RowID", + IsPrimaryKey: false, + DataType: schemapb.DataType_INT64, + } + + oneField := &schemapb.FieldSchema{ + FieldID: int64(1), + Name: "Timestamp", + IsPrimaryKey: false, + DataType: schemapb.DataType_INT64, + } + + schema.Fields = append(schema.Fields, zeroField, oneField) + collectionID, err := t.sch.globalIDAllocator() if err != nil { return err diff --git a/internal/master/master_test.go b/internal/master/master_test.go index df9421dd54..0a44ed90e8 100644 --- a/internal/master/master_test.go +++ b/internal/master/master_test.go @@ -99,6 +99,7 @@ func TestMaster(t *testing.T) { // Creates server. ctx, cancel := context.WithCancel(context.Background()) svr, err := CreateServer(ctx) + if err != nil { log.Print("create server failed", zap.Error(err)) } @@ -326,16 +327,23 @@ func TestMaster(t *testing.T) { t.Logf("collection id = %d", collMeta.ID) assert.Equal(t, collMeta.Schema.Name, "col1") assert.Equal(t, collMeta.Schema.AutoID, false) - assert.Equal(t, len(collMeta.Schema.Fields), 2) + assert.Equal(t, len(collMeta.Schema.Fields), 4) assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1") assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2") + assert.Equal(t, collMeta.Schema.Fields[2].Name, "RowID") + assert.Equal(t, collMeta.Schema.Fields[3].Name, "Timestamp") assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) + assert.Equal(t, collMeta.Schema.Fields[2].DataType, schemapb.DataType_INT64) + assert.Equal(t, collMeta.Schema.Fields[3].DataType, schemapb.DataType_INT64) assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2) assert.Equal(t, len(collMeta.Schema.Fields[0].IndexParams), 2) assert.Equal(t, len(collMeta.Schema.Fields[1].TypeParams), 2) assert.Equal(t, len(collMeta.Schema.Fields[1].IndexParams), 2) assert.Equal(t, int64(100), collMeta.Schema.Fields[0].FieldID) + assert.Equal(t, int64(101), collMeta.Schema.Fields[1].FieldID) + assert.Equal(t, int64(0), collMeta.Schema.Fields[2].FieldID) + assert.Equal(t, int64(1), collMeta.Schema.Fields[3].FieldID) assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1") assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2") assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1") @@ -651,9 +659,11 @@ func TestMaster(t *testing.T) { t.Logf("collection id = %d", collMeta.ID) assert.Equal(t, collMeta.Schema.Name, "col1") assert.Equal(t, collMeta.Schema.AutoID, false) - assert.Equal(t, len(collMeta.Schema.Fields), 2) + assert.Equal(t, len(collMeta.Schema.Fields), 4) assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1") assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2") + assert.Equal(t, collMeta.Schema.Fields[2].Name, "RowID") + assert.Equal(t, collMeta.Schema.Fields[3].Name, "Timestamp") assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2) @@ -919,7 +929,6 @@ func TestMaster(t *testing.T) { assert.Equal(t, createCollectionReq.ReqID, createCollectionMsg.CreateCollectionRequest.ReqID) assert.Equal(t, createCollectionReq.Timestamp, createCollectionMsg.CreateCollectionRequest.Timestamp) assert.Equal(t, createCollectionReq.ProxyID, createCollectionMsg.CreateCollectionRequest.ProxyID) - assert.Equal(t, createCollectionReq.Schema.Value, createCollectionMsg.CreateCollectionRequest.Schema.Value) ////////////////////////////CreatePartition//////////////////////// partitionName := "partitionName" + strconv.FormatUint(rand.Uint64(), 10) diff --git a/internal/proto/index_cgo_msg.proto b/internal/proto/index_cgo_msg.proto index 5c37844dac..d26df3eac0 100644 --- a/internal/proto/index_cgo_msg.proto +++ b/internal/proto/index_cgo_msg.proto @@ -13,6 +13,11 @@ message IndexParams { repeated common.KeyValuePair params = 1; } +// TypeParams & IndexParams will be replaced by MapParams later +message MapParams { + repeated common.KeyValuePair params = 1; +} + message Binary { string key = 1; bytes value = 2; diff --git a/internal/proto/indexcgopb/index_cgo_msg.pb.go b/internal/proto/indexcgopb/index_cgo_msg.pb.go index f49d4ae293..de98b5bb4f 100644 --- a/internal/proto/indexcgopb/index_cgo_msg.pb.go +++ b/internal/proto/indexcgopb/index_cgo_msg.pb.go @@ -99,6 +99,46 @@ func (m *IndexParams) GetParams() []*commonpb.KeyValuePair { return nil } +// TypeParams & IndexParams will be replaced by MapParams later +type MapParams struct { + Params []*commonpb.KeyValuePair `protobuf:"bytes,1,rep,name=params,proto3" json:"params,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MapParams) Reset() { *m = MapParams{} } +func (m *MapParams) String() string { return proto.CompactTextString(m) } +func (*MapParams) ProtoMessage() {} +func (*MapParams) Descriptor() ([]byte, []int) { + return fileDescriptor_c009bd9544a7343c, []int{2} +} + +func (m *MapParams) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MapParams.Unmarshal(m, b) +} +func (m *MapParams) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MapParams.Marshal(b, m, deterministic) +} +func (m *MapParams) XXX_Merge(src proto.Message) { + xxx_messageInfo_MapParams.Merge(m, src) +} +func (m *MapParams) XXX_Size() int { + return xxx_messageInfo_MapParams.Size(m) +} +func (m *MapParams) XXX_DiscardUnknown() { + xxx_messageInfo_MapParams.DiscardUnknown(m) +} + +var xxx_messageInfo_MapParams proto.InternalMessageInfo + +func (m *MapParams) GetParams() []*commonpb.KeyValuePair { + if m != nil { + return m.Params + } + return nil +} + type Binary struct { Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` @@ -111,7 +151,7 @@ func (m *Binary) Reset() { *m = Binary{} } func (m *Binary) String() string { return proto.CompactTextString(m) } func (*Binary) ProtoMessage() {} func (*Binary) Descriptor() ([]byte, []int) { - return fileDescriptor_c009bd9544a7343c, []int{2} + return fileDescriptor_c009bd9544a7343c, []int{3} } func (m *Binary) XXX_Unmarshal(b []byte) error { @@ -157,7 +197,7 @@ func (m *BinarySet) Reset() { *m = BinarySet{} } func (m *BinarySet) String() string { return proto.CompactTextString(m) } func (*BinarySet) ProtoMessage() {} func (*BinarySet) Descriptor() ([]byte, []int) { - return fileDescriptor_c009bd9544a7343c, []int{3} + return fileDescriptor_c009bd9544a7343c, []int{4} } func (m *BinarySet) XXX_Unmarshal(b []byte) error { @@ -188,6 +228,7 @@ func (m *BinarySet) GetDatas() []*Binary { func init() { proto.RegisterType((*TypeParams)(nil), "milvus.proto.indexcgo.TypeParams") proto.RegisterType((*IndexParams)(nil), "milvus.proto.indexcgo.IndexParams") + proto.RegisterType((*MapParams)(nil), "milvus.proto.indexcgo.MapParams") proto.RegisterType((*Binary)(nil), "milvus.proto.indexcgo.Binary") proto.RegisterType((*BinarySet)(nil), "milvus.proto.indexcgo.BinarySet") } @@ -195,22 +236,22 @@ func init() { func init() { proto.RegisterFile("index_cgo_msg.proto", fileDescriptor_c009bd9544a7343c) } var fileDescriptor_c009bd9544a7343c = []byte{ - // 257 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x90, 0xbf, 0x4b, 0xc3, 0x40, - 0x14, 0xc7, 0x89, 0xa5, 0x81, 0xbe, 0x76, 0x90, 0x53, 0x21, 0x08, 0x42, 0xcc, 0x94, 0xc5, 0x3b, - 0xb1, 0x93, 0x9b, 0x04, 0x41, 0xc5, 0xa5, 0x44, 0x71, 0x70, 0x29, 0x97, 0xe4, 0x48, 0x1f, 0xde, - 0x8f, 0x70, 0xb9, 0x14, 0xd3, 0xbf, 0x5e, 0x92, 0x6b, 0x06, 0xc1, 0xcd, 0xed, 0xbd, 0xc7, 0xf7, - 0xf3, 0xf9, 0x1e, 0x07, 0x67, 0xa8, 0x2b, 0xf1, 0xbd, 0x2d, 0x6b, 0xb3, 0x55, 0x6d, 0x4d, 0x1b, - 0x6b, 0x9c, 0x21, 0x17, 0x0a, 0xe5, 0xbe, 0x6b, 0xfd, 0x46, 0xc7, 0x44, 0x59, 0x9b, 0xcb, 0x55, - 0x69, 0x94, 0x32, 0xda, 0x9f, 0x93, 0x27, 0x80, 0xf7, 0xbe, 0x11, 0x1b, 0x6e, 0xb9, 0x6a, 0xc9, - 0x3d, 0x84, 0xcd, 0x38, 0x45, 0x41, 0x3c, 0x4b, 0x97, 0x77, 0xd7, 0xf4, 0x97, 0xe3, 0x48, 0xbe, - 0x8a, 0xfe, 0x83, 0xcb, 0x4e, 0x6c, 0x38, 0xda, 0xfc, 0x08, 0x24, 0xcf, 0xb0, 0x7c, 0x19, 0x2a, - 0xfe, 0x6f, 0xba, 0x85, 0x30, 0x43, 0xcd, 0x6d, 0x4f, 0x4e, 0x61, 0xf6, 0x25, 0xfa, 0x28, 0x88, - 0x83, 0x74, 0x91, 0x0f, 0x23, 0x39, 0x87, 0xf9, 0x7e, 0x00, 0xa2, 0x93, 0x38, 0x48, 0x57, 0xb9, - 0x5f, 0x92, 0x07, 0x58, 0x78, 0xe2, 0x4d, 0x38, 0xb2, 0x86, 0x79, 0xc5, 0x1d, 0x9f, 0x8a, 0xaf, - 0xe8, 0x9f, 0xdf, 0x40, 0x3d, 0x90, 0xfb, 0x6c, 0xf6, 0xf8, 0x99, 0xd5, 0xe8, 0x76, 0x5d, 0x31, - 0xbc, 0x8c, 0x1d, 0x50, 0x4a, 0x3c, 0x38, 0x51, 0xee, 0x98, 0x87, 0x6f, 0x2a, 0x6c, 0x9d, 0xc5, - 0xa2, 0x73, 0xa2, 0x62, 0xa8, 0x9d, 0xb0, 0x9a, 0x4b, 0x36, 0x1a, 0xd9, 0x64, 0x6c, 0x8a, 0x22, - 0x1c, 0x2f, 0xeb, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb4, 0xb0, 0x70, 0x01, 0x8f, 0x01, 0x00, - 0x00, + // 264 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x90, 0x4d, 0x4b, 0xc3, 0x40, + 0x10, 0x86, 0x89, 0xa5, 0x81, 0x4c, 0x7b, 0x90, 0xa8, 0x10, 0x04, 0x21, 0xe6, 0x94, 0x8b, 0x59, + 0xb1, 0x27, 0x6f, 0x12, 0xc4, 0x0f, 0x44, 0x28, 0x51, 0x3c, 0x78, 0x29, 0x9b, 0x64, 0x49, 0x07, + 0xf7, 0x8b, 0xcd, 0xa6, 0x98, 0xfe, 0x7a, 0x49, 0xb6, 0x3d, 0x08, 0xbd, 0xf5, 0x36, 0x33, 0xbc, + 0xcf, 0xf3, 0x2e, 0x0b, 0x67, 0x28, 0x6b, 0xf6, 0xbb, 0xaa, 0x1a, 0xb5, 0x12, 0x6d, 0x93, 0x69, + 0xa3, 0xac, 0x0a, 0x2f, 0x04, 0xf2, 0x4d, 0xd7, 0xba, 0x2d, 0x1b, 0x13, 0x55, 0xa3, 0x2e, 0xe7, + 0x95, 0x12, 0x42, 0x49, 0x77, 0x4e, 0x9e, 0x01, 0x3e, 0x7b, 0xcd, 0x96, 0xd4, 0x50, 0xd1, 0x86, + 0xf7, 0xe0, 0xeb, 0x71, 0x8a, 0xbc, 0x78, 0x92, 0xce, 0xee, 0xae, 0xb3, 0x7f, 0x8e, 0x1d, 0xf9, + 0xc6, 0xfa, 0x2f, 0xca, 0x3b, 0xb6, 0xa4, 0x68, 0x8a, 0x1d, 0x90, 0xbc, 0xc0, 0xec, 0x75, 0xa8, + 0x38, 0xde, 0xf4, 0x04, 0xc1, 0x3b, 0xd5, 0xc7, 0x7b, 0x6e, 0xc1, 0xcf, 0x51, 0x52, 0xd3, 0x87, + 0xa7, 0x30, 0xf9, 0x61, 0x7d, 0xe4, 0xc5, 0x5e, 0x1a, 0x14, 0xc3, 0x18, 0x9e, 0xc3, 0x74, 0x33, + 0x00, 0xd1, 0x49, 0xec, 0xa5, 0xf3, 0xc2, 0x2d, 0xc9, 0x03, 0x04, 0x8e, 0xf8, 0x60, 0x36, 0x5c, + 0xc0, 0xb4, 0xa6, 0x96, 0xee, 0x8b, 0xaf, 0xb2, 0x83, 0xdf, 0x99, 0x39, 0xa0, 0x70, 0xd9, 0xfc, + 0xf1, 0x3b, 0x6f, 0xd0, 0xae, 0xbb, 0x72, 0x78, 0x19, 0xd9, 0x22, 0xe7, 0xb8, 0xb5, 0xac, 0x5a, + 0x13, 0x07, 0xdf, 0xd4, 0xd8, 0x5a, 0x83, 0x65, 0x67, 0x59, 0x4d, 0x50, 0x5a, 0x66, 0x24, 0xe5, + 0x64, 0x34, 0x92, 0xbd, 0x51, 0x97, 0xa5, 0x3f, 0x5e, 0x16, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, + 0xf4, 0x8e, 0xbe, 0x8f, 0xd7, 0x01, 0x00, 0x00, } diff --git a/internal/writenode/data_sync_service_test.go b/internal/writenode/data_sync_service_test.go index 4abf1aff91..a37425ec3e 100644 --- a/internal/writenode/data_sync_service_test.go +++ b/internal/writenode/data_sync_service_test.go @@ -221,6 +221,30 @@ func newMeta() { Description: "test collection", AutoID: false, Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + Name: "Timestamp", + Description: "test collection filed 1", + DataType: schemapb.DataType_INT64, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f1_tk2", + Value: "col1_f1_tv2", + }, + }, + }, + { + FieldID: 0, + Name: "RawID", + Description: "test collection filed 1", + DataType: schemapb.DataType_INT64, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f1_tk2", + Value: "col1_f1_tv2", + }, + }, + }, { FieldID: 100, Name: "col1_f1", diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go index 4609a78472..bf6180d269 100644 --- a/internal/writenode/flow_graph_insert_buffer_node.go +++ b/internal/writenode/flow_graph_insert_buffer_node.go @@ -1,13 +1,14 @@ package writenode import ( + "bytes" "context" "encoding/binary" "log" - "math" "path" "strconv" "time" + "unsafe" "github.com/golang/protobuf/proto" "github.com/minio/minio-go/v7" @@ -114,29 +115,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { } } - // Timestamps - _, ok = idata.Data[1].(*storage.Int64FieldData) - if !ok { - idata.Data[1] = &storage.Int64FieldData{ - Data: []int64{}, - NumRows: 0, - } - } - tsData := idata.Data[1].(*storage.Int64FieldData) - for _, ts := range msg.Timestamps { - tsData.Data = append(tsData.Data, int64(ts)) - } - tsData.NumRows += len(msg.Timestamps) - // 1.1 Get CollectionMeta from etcd segMeta, collMeta, err := ibNode.getMeta(currentSegID) if err != nil { // GOOSE TODO add error handler - log.Println("Get meta wrong") + log.Println("Get meta wrong:", err) + continue } // 1.2 Get Fields - var pos = 0 // Record position of blob + var pos int = 0 // Record position of blob for _, field := range collMeta.Schema.Fields { switch field.DataType { case schemapb.DataType_VECTOR_FLOAT: @@ -167,17 +155,17 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { for _, blob := range msg.RowData { for j := 0; j < dim; j++ { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, math.Float32frombits(v)) - pos++ + var v float32 + buf := bytes.NewBuffer(blob.GetValue()[pos:]) + if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { + log.Println("binary.read float32 err:", err) + } + fieldData.Data = append(fieldData.Data, v) + pos += int(unsafe.Sizeof(*(&v))) } } + fieldData.NumRows += len(msg.RowIDs) - // log.Println(".Float vector data:\n", - // "..NumRows:", - // idata.Data[field.FieldID].(*storage.FloatVectorFieldData).NumRows, - // "..Dim:", - // idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Dim) case schemapb.DataType_VECTOR_BINARY: var dim int @@ -205,20 +193,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) for _, blob := range msg.RowData { - for d := 0; d < dim/8; d++ { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, byte(v)) - pos++ - } + bv := blob.GetValue()[pos : pos+(dim/8)] + fieldData.Data = append(fieldData.Data, bv...) + pos += len(bv) } fieldData.NumRows += len(msg.RowData) - // log.Println( - // ".Binary vector data:\n", - // "..NumRows:", - // idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).NumRows, - // "..Dim:", - // idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Dim) case schemapb.DataType_BOOL: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.BoolFieldData{ @@ -229,18 +209,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) for _, blob := range msg.RowData { - boolInt := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - if boolInt == 1 { - fieldData.Data = append(fieldData.Data, true) - } else { - fieldData.Data = append(fieldData.Data, false) + var v bool + buf := bytes.NewReader(blob.GetValue()[pos:]) + if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { + log.Println("binary.Read bool failed:", err) } - pos++ + fieldData.Data = append(fieldData.Data, v) + pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) - // log.Println("Bool data:", - // idata.Data[field.FieldID].(*storage.BoolFieldData).Data) case schemapb.DataType_INT8: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int8FieldData{ @@ -251,13 +229,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, int8(v)) - pos++ + var v int8 + buf := bytes.NewReader(blob.GetValue()[pos:]) + if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { + log.Println("binary.Read int8 failed:", err) + } + fieldData.Data = append(fieldData.Data, v) + pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) - // log.Println("Int8 data:", - // idata.Data[field.FieldID].(*storage.Int8FieldData).Data) case schemapb.DataType_INT16: if _, ok := idata.Data[field.FieldID]; !ok { @@ -269,14 +249,17 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, int16(v)) - pos++ + var v int16 + buf := bytes.NewReader(blob.GetValue()[pos:]) + if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { + log.Println("binary.Read int16 failed:", err) + } + fieldData.Data = append(fieldData.Data, v) + pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) - // log.Println("Int16 data:", - // idata.Data[field.FieldID].(*storage.Int16FieldData).Data) + case schemapb.DataType_INT32: if _, ok := idata.Data[field.FieldID]; !ok { idata.Data[field.FieldID] = &storage.Int32FieldData{ @@ -287,13 +270,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, int32(v)) - pos++ + var v int32 + buf := bytes.NewReader(blob.GetValue()[pos:]) + if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { + log.Println("binary.Read int32 failed:", err) + } + fieldData.Data = append(fieldData.Data, v) + pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) - // log.Println("Int32 data:", - // idata.Data[field.FieldID].(*storage.Int32FieldData).Data) case schemapb.DataType_INT64: if _, ok := idata.Data[field.FieldID]; !ok { @@ -304,15 +289,30 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { } fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData) - for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, int64(v)) - pos++ - } + switch field.FieldID { + case 0: + fieldData.Data = append(fieldData.Data, msg.RowIDs...) + fieldData.NumRows += len(msg.RowIDs) + case 1: + // Timestamps + for _, ts := range msg.Timestamps { + fieldData.Data = append(fieldData.Data, int64(ts)) + } + fieldData.NumRows += len(msg.Timestamps) + default: - fieldData.NumRows += len(msg.RowIDs) - // log.Println("Int64 data:", - // idata.Data[field.FieldID].(*storage.Int64FieldData).Data) + for _, blob := range msg.RowData { + var v int64 + buf := bytes.NewBuffer(blob.GetValue()[pos:]) + if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { + log.Println("binary.Read int64 failed:", err) + } + fieldData.Data = append(fieldData.Data, v) + pos += int(unsafe.Sizeof(*(&v))) + } + + fieldData.NumRows += len(msg.RowIDs) + } case schemapb.DataType_FLOAT: if _, ok := idata.Data[field.FieldID]; !ok { @@ -324,14 +324,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, math.Float32frombits(v)) - pos++ + var v float32 + buf := bytes.NewBuffer(blob.GetValue()[pos:]) + if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { + log.Println("binary.Read float32 failed:", err) + } + fieldData.Data = append(fieldData.Data, v) + pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) - // log.Println("Float32 data:", - // idata.Data[field.FieldID].(*storage.FloatFieldData).Data) case schemapb.DataType_DOUBLE: if _, ok := idata.Data[field.FieldID]; !ok { @@ -343,14 +345,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint64(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, math.Float64frombits(v)) - pos++ + var v float64 + buf := bytes.NewBuffer(blob.GetValue()[pos:]) + if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { + log.Println("binary.Read float64 failed:", err) + } + fieldData.Data = append(fieldData.Data, v) + pos += int(unsafe.Sizeof(*(&v))) } fieldData.NumRows += len(msg.RowIDs) - // log.Println("Float64 data:", - // idata.Data[field.FieldID].(*storage.DoubleFieldData).Data) } } @@ -376,7 +380,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { currentSegID, ibNode.insertBuffer.insertData[currentSegID]) if err != nil { - log.Println("generate binlog wrong") + log.Println("generate binlog wrong: ", err) } // clear buffer @@ -451,7 +455,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { segMeta, collMeta, err := ibNode.getMeta(currentSegID) if err != nil { // GOOSE TODO add error handler - log.Println("Get meta wrong") + log.Println("Get meta wrong: ", err) } inCodec := storage.NewInsertCodec(collMeta) @@ -460,14 +464,14 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { partitionID, err := typeutil.Hash32String(partitionTag) if err != nil { // GOOSE TODO add error handler - log.Println("partitionTag to partitionID Wrong") + log.Println("partitionTag to partitionID Wrong: ", err) } // buffer data to binlogs binLogs, err := inCodec.Serialize(partitionID, currentSegID, ibNode.insertBuffer.insertData[currentSegID]) if err != nil { - log.Println("generate binlog wrong") + log.Println("generate binlog wrong: ", err) } // clear buffer diff --git a/internal/writenode/flow_graph_insert_buffer_node_test.go b/internal/writenode/flow_graph_insert_buffer_node_test.go index b9f1b5ae68..ef268dd3ba 100644 --- a/internal/writenode/flow_graph_insert_buffer_node_test.go +++ b/internal/writenode/flow_graph_insert_buffer_node_test.go @@ -1,8 +1,10 @@ package writenode import ( + "bytes" "context" "encoding/binary" + "log" "math" "testing" "time" @@ -67,64 +69,78 @@ func genInsertMsg() insertMsg { binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) rawData = append(rawData, buf...) } + log.Println(len(rawData)) // Binary vector // Dimension of binary vector is 32 - var bvector = [4]byte{255, 255, 255, 0} - for _, ele := range bvector { - bs := make([]byte, 4) - binary.LittleEndian.PutUint32(bs, uint32(ele)) - rawData = append(rawData, bs...) - } + // size := 4, = 32 / 8 + var bvector = []byte{255, 255, 255, 0} + rawData = append(rawData, bvector...) + log.Println(len(rawData)) // Bool - bb := make([]byte, 4) var fieldBool = true - var fieldBoolInt uint32 - if fieldBool { - fieldBoolInt = 1 - } else { - fieldBoolInt = 0 + buf := new(bytes.Buffer) + if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil { + panic(err) } - binary.LittleEndian.PutUint32(bb, fieldBoolInt) - rawData = append(rawData, bb...) + rawData = append(rawData, buf.Bytes()...) + log.Println(len(rawData)) // int8 var dataInt8 int8 = 100 - bint8 := make([]byte, 4) - binary.LittleEndian.PutUint32(bint8, uint32(dataInt8)) - rawData = append(rawData, bint8...) + bint8 := new(bytes.Buffer) + if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil { + panic(err) + } + rawData = append(rawData, bint8.Bytes()...) + log.Println(len(rawData)) // int16 var dataInt16 int16 = 200 - bint16 := make([]byte, 4) - binary.LittleEndian.PutUint32(bint16, uint32(dataInt16)) - rawData = append(rawData, bint16...) + bint16 := new(bytes.Buffer) + if err := binary.Write(bint16, binary.LittleEndian, dataInt16); err != nil { + panic(err) + } + rawData = append(rawData, bint16.Bytes()...) + log.Println(len(rawData)) // int32 var dataInt32 int32 = 300 - bint32 := make([]byte, 4) - binary.LittleEndian.PutUint32(bint32, uint32(dataInt32)) - rawData = append(rawData, bint32...) + bint32 := new(bytes.Buffer) + if err := binary.Write(bint32, binary.LittleEndian, dataInt32); err != nil { + panic(err) + } + rawData = append(rawData, bint32.Bytes()...) + log.Println(len(rawData)) // int64 var dataInt64 int64 = 300 - bint64 := make([]byte, 4) - binary.LittleEndian.PutUint32(bint64, uint32(dataInt64)) - rawData = append(rawData, bint64...) + bint64 := new(bytes.Buffer) + if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil { + panic(err) + } + rawData = append(rawData, bint64.Bytes()...) + log.Println(len(rawData)) // float32 var datafloat float32 = 1.1 - bfloat32 := make([]byte, 4) - binary.LittleEndian.PutUint32(bfloat32, math.Float32bits(datafloat)) - rawData = append(rawData, bfloat32...) + bfloat32 := new(bytes.Buffer) + if err := binary.Write(bfloat32, binary.LittleEndian, datafloat); err != nil { + panic(err) + } + rawData = append(rawData, bfloat32.Bytes()...) + log.Println(len(rawData)) // float64 var datafloat64 float64 = 2.2 - bfloat64 := make([]byte, 8) - binary.LittleEndian.PutUint64(bfloat64, math.Float64bits(datafloat64)) - rawData = append(rawData, bfloat64...) + bfloat64 := new(bytes.Buffer) + if err := binary.Write(bfloat64, binary.LittleEndian, datafloat64); err != nil { + panic(err) + } + rawData = append(rawData, bfloat64.Bytes()...) + log.Println(len(rawData)) timeRange := TimeRange{ timestampMin: 0,