Add default fields when create collection

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-01-04 12:03:29 +08:00 committed by yefu.chen
parent 7ed63c79ce
commit 03b5d32569
16 changed files with 1256 additions and 205 deletions

View File

@ -38,94 +38,95 @@ IndexWrapper::IndexWrapper(const char* serialized_type_params, const char* seria
Assert(index_ != nullptr);
}
template <typename ParamsT> // ugly here, ParamsT will just be MapParams later
void
IndexWrapper::parse() {
namespace indexcgo = milvus::proto::indexcgo;
IndexWrapper::parse_impl(const std::string& serialized_params_str, knowhere::Config& conf) {
bool deserialized_success;
indexcgo::TypeParams type_config;
deserialized_success = google::protobuf::TextFormat::ParseFromString(type_params_, &type_config);
ParamsT params;
deserialized_success = google::protobuf::TextFormat::ParseFromString(serialized_params_str, &params);
Assert(deserialized_success);
indexcgo::IndexParams index_config;
deserialized_success = google::protobuf::TextFormat::ParseFromString(index_params_, &index_config);
Assert(deserialized_success);
for (auto i = 0; i < type_config.params_size(); ++i) {
const auto& type_param = type_config.params(i);
const auto& key = type_param.key();
const auto& value = type_param.value();
type_config_[key] = value;
config_[key] = value;
}
for (auto i = 0; i < index_config.params_size(); ++i) {
const auto& index_param = index_config.params(i);
const auto& key = index_param.key();
const auto& value = index_param.value();
index_config_[key] = value;
config_[key] = value;
for (auto i = 0; i < params.params_size(); ++i) {
const auto& param = params.params(i);
const auto& key = param.key();
const auto& value = param.value();
conf[key] = value;
}
auto stoi_closure = [](const std::string& s) -> int { return std::stoi(s); };
/***************************** meta *******************************/
check_parameter<int>(milvus::knowhere::meta::DIM, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::meta::TOPK, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::meta::DIM, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::meta::TOPK, stoi_closure, std::nullopt);
/***************************** IVF Params *******************************/
check_parameter<int>(milvus::knowhere::IndexParams::nprobe, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::nlist, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::m, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::nbits, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::nprobe, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::nlist, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::m, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::nbits, stoi_closure, std::nullopt);
/************************** NSG Parameter **************************/
check_parameter<int>(milvus::knowhere::IndexParams::knng, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::search_length, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::out_degree, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::candidate, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::knng, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::search_length, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::out_degree, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::candidate, stoi_closure, std::nullopt);
/************************** HNSW Params *****************************/
check_parameter<int>(milvus::knowhere::IndexParams::efConstruction, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::M, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::ef, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::efConstruction, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::M, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::ef, stoi_closure, std::nullopt);
/************************** Annoy Params *****************************/
check_parameter<int>(milvus::knowhere::IndexParams::n_trees, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::search_k, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::n_trees, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::search_k, stoi_closure, std::nullopt);
/************************** PQ Params *****************************/
check_parameter<int>(milvus::knowhere::IndexParams::PQM, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::PQM, stoi_closure, std::nullopt);
/************************** NGT Params *****************************/
check_parameter<int>(milvus::knowhere::IndexParams::edge_size, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::edge_size, stoi_closure, std::nullopt);
/************************** NGT Search Params *****************************/
check_parameter<int>(milvus::knowhere::IndexParams::epsilon, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::max_search_edges, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::epsilon, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::max_search_edges, stoi_closure, std::nullopt);
/************************** NGT_PANNG Params *****************************/
check_parameter<int>(milvus::knowhere::IndexParams::forcedly_pruned_edge_size, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::selectively_pruned_edge_size, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::forcedly_pruned_edge_size, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::selectively_pruned_edge_size, stoi_closure, std::nullopt);
/************************** NGT_ONNG Params *****************************/
check_parameter<int>(milvus::knowhere::IndexParams::outgoing_edge_size, stoi_closure, std::nullopt);
check_parameter<int>(milvus::knowhere::IndexParams::incoming_edge_size, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::outgoing_edge_size, stoi_closure, std::nullopt);
check_parameter<int>(conf, milvus::knowhere::IndexParams::incoming_edge_size, stoi_closure, std::nullopt);
/************************** Serialize Params *******************************/
check_parameter<int>(milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, stoi_closure, std::optional{4});
check_parameter<int>(conf, milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, stoi_closure, std::optional{4});
}
void
IndexWrapper::parse() {
namespace indexcgo = milvus::proto::indexcgo;
parse_impl<indexcgo::TypeParams>(type_params_, type_config_);
parse_impl<indexcgo::IndexParams>(index_params_, index_config_);
config_.update(type_config_); // just like dict().update in Python, amazing
config_.update(index_config_);
}
template <typename T>
void
IndexWrapper::check_parameter(const std::string& key, std::function<T(std::string)> fn, std::optional<T> default_v) {
if (!config_.contains(key)) {
IndexWrapper::check_parameter(knowhere::Config& conf,
const std::string& key,
std::function<T(std::string)> fn,
std::optional<T> default_v) {
if (!conf.contains(key)) {
if (default_v.has_value()) {
config_[key] = default_v.value();
conf[key] = default_v.value();
}
} else {
auto value = config_[key];
config_[key] = fn(value);
auto value = conf[key];
conf[key] = fn(value);
}
}
@ -257,5 +258,38 @@ IndexWrapper::get_index_type() {
return type.has_value() ? type.value() : knowhere::IndexEnum::INDEX_FAISS_IVFPQ;
}
std::unique_ptr<IndexWrapper::QueryResult>
IndexWrapper::Query(const knowhere::DatasetPtr& dataset) {
return std::move(QueryImpl(dataset, config_));
}
std::unique_ptr<IndexWrapper::QueryResult>
IndexWrapper::QueryWithParam(const knowhere::DatasetPtr& dataset, const char* serialized_search_params) {
namespace indexcgo = milvus::proto::indexcgo;
milvus::knowhere::Config search_conf;
parse_impl<indexcgo::MapParams>(std::string(serialized_search_params), search_conf);
return std::move(QueryImpl(dataset, search_conf));
}
std::unique_ptr<IndexWrapper::QueryResult>
IndexWrapper::QueryImpl(const knowhere::DatasetPtr& dataset, const knowhere::Config& conf) {
auto res = index_->Query(dataset, conf, nullptr);
auto ids = res->Get<int64_t*>(milvus::knowhere::meta::IDS);
auto distances = res->Get<float*>(milvus::knowhere::meta::DISTANCE);
auto nq = dataset->Get<int64_t>(milvus::knowhere::meta::ROWS);
auto k = config_[milvus::knowhere::meta::TOPK].get<int64_t>();
auto query_res = std::make_unique<IndexWrapper::QueryResult>();
query_res->nq = nq;
query_res->topk = k;
query_res->ids.resize(nq * k);
query_res->distances.resize(nq * k);
memcpy(query_res->ids.data(), ids, sizeof(int64_t) * nq * k);
memcpy(query_res->distances.data(), distances, sizeof(float) * nq * k);
return std::move(query_res);
}
} // namespace indexbuilder
} // namespace milvus

View File

@ -12,6 +12,7 @@
#include <string>
#include <optional>
#include <vector>
#include <memory>
#include "knowhere/index/vector_index/VecIndex.h"
namespace milvus {
@ -38,6 +39,19 @@ class IndexWrapper {
void
Load(const char* serialized_sliced_blob_buffer, int32_t size);
struct QueryResult {
std::vector<milvus::knowhere::IDType> ids;
std::vector<float> distances;
int64_t nq;
int64_t topk;
};
std::unique_ptr<QueryResult>
Query(const knowhere::DatasetPtr& dataset);
std::unique_ptr<QueryResult>
QueryWithParam(const knowhere::DatasetPtr& dataset, const char* serialized_search_params);
private:
void
parse();
@ -54,10 +68,18 @@ class IndexWrapper {
template <typename T>
void
check_parameter(const std::string& key,
check_parameter(knowhere::Config& conf,
const std::string& key,
std::function<T(std::string)> fn,
std::optional<T> default_v = std::nullopt);
template <typename ParamsT>
void
parse_impl(const std::string& serialized_params_str, knowhere::Config& conf);
std::unique_ptr<QueryResult>
QueryImpl(const knowhere::DatasetPtr& dataset, const knowhere::Config& conf);
public:
void
BuildWithIds(const knowhere::DatasetPtr& dataset);

View File

@ -115,3 +115,153 @@ LoadFromSlicedBuffer(CIndex index, const char* serialized_sliced_blob_buffer, in
}
return status;
}
CStatus
QueryOnFloatVecIndex(CIndex index, int64_t float_value_num, const float* vectors, CIndexQueryResult* res) {
auto status = CStatus();
try {
auto cIndex = (milvus::indexbuilder::IndexWrapper*)index;
auto dim = cIndex->dim();
auto row_nums = float_value_num / dim;
auto query_ds = milvus::knowhere::GenDataset(row_nums, dim, vectors);
auto query_res = cIndex->Query(query_ds);
*res = query_res.release();
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
}
return status;
}
CStatus
QueryOnFloatVecIndexWithParam(CIndex index,
int64_t float_value_num,
const float* vectors,
const char* serialized_search_params,
CIndexQueryResult* res) {
auto status = CStatus();
try {
auto cIndex = (milvus::indexbuilder::IndexWrapper*)index;
auto dim = cIndex->dim();
auto row_nums = float_value_num / dim;
auto query_ds = milvus::knowhere::GenDataset(row_nums, dim, vectors);
auto query_res = cIndex->QueryWithParam(query_ds, serialized_search_params);
*res = query_res.release();
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
}
return status;
}
CStatus
QueryOnBinaryVecIndex(CIndex index, int64_t data_size, const uint8_t* vectors, CIndexQueryResult* res) {
auto status = CStatus();
try {
auto cIndex = (milvus::indexbuilder::IndexWrapper*)index;
auto dim = cIndex->dim();
auto row_nums = (data_size * 8) / dim;
auto query_ds = milvus::knowhere::GenDataset(row_nums, dim, vectors);
auto query_res = cIndex->Query(query_ds);
*res = query_res.release();
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
}
return status;
}
CStatus
QueryOnBinaryVecIndexWithParam(CIndex index,
int64_t data_size,
const uint8_t* vectors,
const char* serialized_search_params,
CIndexQueryResult* res) {
auto status = CStatus();
try {
auto cIndex = (milvus::indexbuilder::IndexWrapper*)index;
auto dim = cIndex->dim();
auto row_nums = (data_size * 8) / dim;
auto query_ds = milvus::knowhere::GenDataset(row_nums, dim, vectors);
auto query_res = cIndex->QueryWithParam(query_ds, serialized_search_params);
*res = query_res.release();
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
}
return status;
}
CStatus
CreateQueryResult(CIndexQueryResult* res) {
auto status = CStatus();
try {
auto query_result = std::make_unique<milvus::indexbuilder::IndexWrapper::QueryResult>();
*res = query_result.release();
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
}
return status;
}
int64_t
NqOfQueryResult(CIndexQueryResult res) {
auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res;
return c_res->nq;
}
int64_t
TopkOfQueryResult(CIndexQueryResult res) {
auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res;
return c_res->topk;
}
void
GetIdsOfQueryResult(CIndexQueryResult res, int64_t* ids) {
auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res;
auto nq = c_res->nq;
auto k = c_res->topk;
// TODO: how could we avoid memory copy every time when this called
memcpy(ids, c_res->ids.data(), sizeof(int64_t) * nq * k);
}
void
GetDistancesOfQueryResult(CIndexQueryResult res, float* distances) {
auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res;
auto nq = c_res->nq;
auto k = c_res->topk;
// TODO: how could we avoid memory copy every time when this called
memcpy(distances, c_res->distances.data(), sizeof(float) * nq * k);
}
CStatus
DeleteQueryResult(CIndexQueryResult res) {
auto status = CStatus();
try {
auto c_res = (milvus::indexbuilder::IndexWrapper::QueryResult*)res;
delete c_res;
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
}
return status;
}

View File

@ -31,6 +31,7 @@ extern "C" {
#include "common/status_c.h"
typedef void* CIndex;
typedef void* CIndexQueryResult;
// TODO: how could we pass map between go and c++ more efficiently?
// Solution: using protobuf instead of json, this way significantly increase programming efficiency
@ -53,6 +54,44 @@ SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer);
CStatus
LoadFromSlicedBuffer(CIndex index, const char* serialized_sliced_blob_buffer, int32_t size);
CStatus
QueryOnFloatVecIndex(CIndex index, int64_t float_value_num, const float* vectors, CIndexQueryResult* res);
CStatus
QueryOnFloatVecIndexWithParam(CIndex index,
int64_t float_value_num,
const float* vectors,
const char* serialized_search_params,
CIndexQueryResult* res);
CStatus
QueryOnBinaryVecIndex(CIndex index, int64_t data_size, const uint8_t* vectors, CIndexQueryResult* res);
CStatus
QueryOnBinaryVecIndexWithParam(CIndex index,
int64_t data_size,
const uint8_t* vectors,
const char* serialized_search_params,
CIndexQueryResult* res);
CStatus
CreateQueryResult(CIndexQueryResult* res);
int64_t
NqOfQueryResult(CIndexQueryResult res);
int64_t
TopkOfQueryResult(CIndexQueryResult res);
void
GetIdsOfQueryResult(CIndexQueryResult res, int64_t* ids);
void
GetDistancesOfQueryResult(CIndexQueryResult res, float* distances);
CStatus
DeleteQueryResult(CIndexQueryResult res);
#ifdef __cplusplus
};
#endif

View File

@ -28,6 +28,10 @@ class IndexParamsDefaultTypeInternal {
public:
::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed<IndexParams> _instance;
} _IndexParams_default_instance_;
class MapParamsDefaultTypeInternal {
public:
::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed<MapParams> _instance;
} _MapParams_default_instance_;
class BinaryDefaultTypeInternal {
public:
::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed<Binary> _instance;
@ -83,6 +87,21 @@ static void InitDefaultsscc_info_IndexParams_index_5fcgo_5fmsg_2eproto() {
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_IndexParams_index_5fcgo_5fmsg_2eproto}, {
&scc_info_KeyValuePair_common_2eproto.base,}};
static void InitDefaultsscc_info_MapParams_index_5fcgo_5fmsg_2eproto() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
{
void* ptr = &::milvus::proto::indexcgo::_MapParams_default_instance_;
new (ptr) ::milvus::proto::indexcgo::MapParams();
::PROTOBUF_NAMESPACE_ID::internal::OnShutdownDestroyMessage(ptr);
}
::milvus::proto::indexcgo::MapParams::InitAsDefaultInstance();
}
::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<1> scc_info_MapParams_index_5fcgo_5fmsg_2eproto =
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_MapParams_index_5fcgo_5fmsg_2eproto}, {
&scc_info_KeyValuePair_common_2eproto.base,}};
static void InitDefaultsscc_info_TypeParams_index_5fcgo_5fmsg_2eproto() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
@ -98,7 +117,7 @@ static void InitDefaultsscc_info_TypeParams_index_5fcgo_5fmsg_2eproto() {
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_TypeParams_index_5fcgo_5fmsg_2eproto}, {
&scc_info_KeyValuePair_common_2eproto.base,}};
static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_index_5fcgo_5fmsg_2eproto[4];
static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_index_5fcgo_5fmsg_2eproto[5];
static constexpr ::PROTOBUF_NAMESPACE_ID::EnumDescriptor const** file_level_enum_descriptors_index_5fcgo_5fmsg_2eproto = nullptr;
static constexpr ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor const** file_level_service_descriptors_index_5fcgo_5fmsg_2eproto = nullptr;
@ -116,6 +135,12 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_index_5fcgo_5fmsg_2eproto::off
~0u, // no _weak_field_map_
PROTOBUF_FIELD_OFFSET(::milvus::proto::indexcgo::IndexParams, params_),
~0u, // no _has_bits_
PROTOBUF_FIELD_OFFSET(::milvus::proto::indexcgo::MapParams, _internal_metadata_),
~0u, // no _extensions_
~0u, // no _oneof_case_
~0u, // no _weak_field_map_
PROTOBUF_FIELD_OFFSET(::milvus::proto::indexcgo::MapParams, params_),
~0u, // no _has_bits_
PROTOBUF_FIELD_OFFSET(::milvus::proto::indexcgo::Binary, _internal_metadata_),
~0u, // no _extensions_
~0u, // no _oneof_case_
@ -132,13 +157,15 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_index_5fcgo_5fmsg_2eproto::off
static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = {
{ 0, -1, sizeof(::milvus::proto::indexcgo::TypeParams)},
{ 6, -1, sizeof(::milvus::proto::indexcgo::IndexParams)},
{ 12, -1, sizeof(::milvus::proto::indexcgo::Binary)},
{ 19, -1, sizeof(::milvus::proto::indexcgo::BinarySet)},
{ 12, -1, sizeof(::milvus::proto::indexcgo::MapParams)},
{ 18, -1, sizeof(::milvus::proto::indexcgo::Binary)},
{ 25, -1, sizeof(::milvus::proto::indexcgo::BinarySet)},
};
static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = {
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::indexcgo::_TypeParams_default_instance_),
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::indexcgo::_IndexParams_default_instance_),
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::indexcgo::_MapParams_default_instance_),
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::indexcgo::_Binary_default_instance_),
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::indexcgo::_BinarySet_default_instance_),
};
@ -148,29 +175,31 @@ const char descriptor_table_protodef_index_5fcgo_5fmsg_2eproto[] PROTOBUF_SECTIO
"xcgo\032\014common.proto\"\?\n\nTypeParams\0221\n\006para"
"ms\030\001 \003(\0132!.milvus.proto.common.KeyValueP"
"air\"@\n\013IndexParams\0221\n\006params\030\001 \003(\0132!.mil"
"vus.proto.common.KeyValuePair\"$\n\006Binary\022"
"\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\014\"9\n\tBinarySet"
"\022,\n\005datas\030\001 \003(\0132\035.milvus.proto.indexcgo."
"BinaryBDZBgithub.com/zilliztech/milvus-d"
"istributed/internal/proto/indexcgopbb\006pr"
"oto3"
"vus.proto.common.KeyValuePair\">\n\tMapPara"
"ms\0221\n\006params\030\001 \003(\0132!.milvus.proto.common"
".KeyValuePair\"$\n\006Binary\022\013\n\003key\030\001 \001(\t\022\r\n\005"
"value\030\002 \001(\014\"9\n\tBinarySet\022,\n\005datas\030\001 \003(\0132"
"\035.milvus.proto.indexcgo.BinaryBDZBgithub"
".com/zilliztech/milvus-distributed/inter"
"nal/proto/indexcgopbb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_index_5fcgo_5fmsg_2eproto_deps[1] = {
&::descriptor_table_common_2eproto,
};
static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_index_5fcgo_5fmsg_2eproto_sccs[4] = {
static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_index_5fcgo_5fmsg_2eproto_sccs[5] = {
&scc_info_Binary_index_5fcgo_5fmsg_2eproto.base,
&scc_info_BinarySet_index_5fcgo_5fmsg_2eproto.base,
&scc_info_IndexParams_index_5fcgo_5fmsg_2eproto.base,
&scc_info_MapParams_index_5fcgo_5fmsg_2eproto.base,
&scc_info_TypeParams_index_5fcgo_5fmsg_2eproto.base,
};
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_index_5fcgo_5fmsg_2eproto_once;
static bool descriptor_table_index_5fcgo_5fmsg_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_index_5fcgo_5fmsg_2eproto = {
&descriptor_table_index_5fcgo_5fmsg_2eproto_initialized, descriptor_table_protodef_index_5fcgo_5fmsg_2eproto, "index_cgo_msg.proto", 364,
&descriptor_table_index_5fcgo_5fmsg_2eproto_once, descriptor_table_index_5fcgo_5fmsg_2eproto_sccs, descriptor_table_index_5fcgo_5fmsg_2eproto_deps, 4, 1,
&descriptor_table_index_5fcgo_5fmsg_2eproto_initialized, descriptor_table_protodef_index_5fcgo_5fmsg_2eproto, "index_cgo_msg.proto", 428,
&descriptor_table_index_5fcgo_5fmsg_2eproto_once, descriptor_table_index_5fcgo_5fmsg_2eproto_sccs, descriptor_table_index_5fcgo_5fmsg_2eproto_deps, 5, 1,
schemas, file_default_instances, TableStruct_index_5fcgo_5fmsg_2eproto::offsets,
file_level_metadata_index_5fcgo_5fmsg_2eproto, 4, file_level_enum_descriptors_index_5fcgo_5fmsg_2eproto, file_level_service_descriptors_index_5fcgo_5fmsg_2eproto,
file_level_metadata_index_5fcgo_5fmsg_2eproto, 5, file_level_enum_descriptors_index_5fcgo_5fmsg_2eproto, file_level_service_descriptors_index_5fcgo_5fmsg_2eproto,
};
// Force running AddDescriptors() at dynamic initialization time.
@ -707,6 +736,270 @@ void IndexParams::InternalSwap(IndexParams* other) {
}
// ===================================================================
void MapParams::InitAsDefaultInstance() {
}
class MapParams::_Internal {
public:
};
void MapParams::clear_params() {
params_.Clear();
}
MapParams::MapParams()
: ::PROTOBUF_NAMESPACE_ID::Message(), _internal_metadata_(nullptr) {
SharedCtor();
// @@protoc_insertion_point(constructor:milvus.proto.indexcgo.MapParams)
}
MapParams::MapParams(const MapParams& from)
: ::PROTOBUF_NAMESPACE_ID::Message(),
_internal_metadata_(nullptr),
params_(from.params_) {
_internal_metadata_.MergeFrom(from._internal_metadata_);
// @@protoc_insertion_point(copy_constructor:milvus.proto.indexcgo.MapParams)
}
void MapParams::SharedCtor() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_MapParams_index_5fcgo_5fmsg_2eproto.base);
}
MapParams::~MapParams() {
// @@protoc_insertion_point(destructor:milvus.proto.indexcgo.MapParams)
SharedDtor();
}
void MapParams::SharedDtor() {
}
void MapParams::SetCachedSize(int size) const {
_cached_size_.Set(size);
}
const MapParams& MapParams::default_instance() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&::scc_info_MapParams_index_5fcgo_5fmsg_2eproto.base);
return *internal_default_instance();
}
void MapParams::Clear() {
// @@protoc_insertion_point(message_clear_start:milvus.proto.indexcgo.MapParams)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
params_.Clear();
_internal_metadata_.Clear();
}
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* MapParams::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) {
#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure
while (!ctx->Done(&ptr)) {
::PROTOBUF_NAMESPACE_ID::uint32 tag;
ptr = ::PROTOBUF_NAMESPACE_ID::internal::ReadTag(ptr, &tag);
CHK_(ptr);
switch (tag >> 3) {
// repeated .milvus.proto.common.KeyValuePair params = 1;
case 1:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 10)) {
ptr -= 1;
do {
ptr += 1;
ptr = ctx->ParseMessage(add_params(), ptr);
CHK_(ptr);
if (!ctx->DataAvailable(ptr)) break;
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 10);
} else goto handle_unusual;
continue;
default: {
handle_unusual:
if ((tag & 7) == 4 || tag == 0) {
ctx->SetLastTag(tag);
goto success;
}
ptr = UnknownFieldParse(tag, &_internal_metadata_, ptr, ctx);
CHK_(ptr != nullptr);
continue;
}
} // switch
} // while
success:
return ptr;
failure:
ptr = nullptr;
goto success;
#undef CHK_
}
#else // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
bool MapParams::MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) {
#define DO_(EXPRESSION) if (!PROTOBUF_PREDICT_TRUE(EXPRESSION)) goto failure
::PROTOBUF_NAMESPACE_ID::uint32 tag;
// @@protoc_insertion_point(parse_start:milvus.proto.indexcgo.MapParams)
for (;;) {
::std::pair<::PROTOBUF_NAMESPACE_ID::uint32, bool> p = input->ReadTagWithCutoffNoLastTag(127u);
tag = p.first;
if (!p.second) goto handle_unusual;
switch (::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::GetTagFieldNumber(tag)) {
// repeated .milvus.proto.common.KeyValuePair params = 1;
case 1: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (10 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage(
input, add_params()));
} else {
goto handle_unusual;
}
break;
}
default: {
handle_unusual:
if (tag == 0) {
goto success;
}
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SkipField(
input, tag, _internal_metadata_.mutable_unknown_fields()));
break;
}
}
}
success:
// @@protoc_insertion_point(parse_success:milvus.proto.indexcgo.MapParams)
return true;
failure:
// @@protoc_insertion_point(parse_failure:milvus.proto.indexcgo.MapParams)
return false;
#undef DO_
}
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void MapParams::SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const {
// @@protoc_insertion_point(serialize_start:milvus.proto.indexcgo.MapParams)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
// repeated .milvus.proto.common.KeyValuePair params = 1;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->params_size()); i < n; i++) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray(
1,
this->params(static_cast<int>(i)),
output);
}
if (_internal_metadata_.have_unknown_fields()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields(
_internal_metadata_.unknown_fields(), output);
}
// @@protoc_insertion_point(serialize_end:milvus.proto.indexcgo.MapParams)
}
::PROTOBUF_NAMESPACE_ID::uint8* MapParams::InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const {
// @@protoc_insertion_point(serialize_to_array_start:milvus.proto.indexcgo.MapParams)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
// repeated .milvus.proto.common.KeyValuePair params = 1;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->params_size()); i < n; i++) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::
InternalWriteMessageToArray(
1, this->params(static_cast<int>(i)), target);
}
if (_internal_metadata_.have_unknown_fields()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray(
_internal_metadata_.unknown_fields(), target);
}
// @@protoc_insertion_point(serialize_to_array_end:milvus.proto.indexcgo.MapParams)
return target;
}
size_t MapParams::ByteSizeLong() const {
// @@protoc_insertion_point(message_byte_size_start:milvus.proto.indexcgo.MapParams)
size_t total_size = 0;
if (_internal_metadata_.have_unknown_fields()) {
total_size +=
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::ComputeUnknownFieldsSize(
_internal_metadata_.unknown_fields());
}
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
// repeated .milvus.proto.common.KeyValuePair params = 1;
{
unsigned int count = static_cast<unsigned int>(this->params_size());
total_size += 1UL * count;
for (unsigned int i = 0; i < count; i++) {
total_size +=
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(
this->params(static_cast<int>(i)));
}
}
int cached_size = ::PROTOBUF_NAMESPACE_ID::internal::ToCachedSize(total_size);
SetCachedSize(cached_size);
return total_size;
}
void MapParams::MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) {
// @@protoc_insertion_point(generalized_merge_from_start:milvus.proto.indexcgo.MapParams)
GOOGLE_DCHECK_NE(&from, this);
const MapParams* source =
::PROTOBUF_NAMESPACE_ID::DynamicCastToGenerated<MapParams>(
&from);
if (source == nullptr) {
// @@protoc_insertion_point(generalized_merge_from_cast_fail:milvus.proto.indexcgo.MapParams)
::PROTOBUF_NAMESPACE_ID::internal::ReflectionOps::Merge(from, this);
} else {
// @@protoc_insertion_point(generalized_merge_from_cast_success:milvus.proto.indexcgo.MapParams)
MergeFrom(*source);
}
}
void MapParams::MergeFrom(const MapParams& from) {
// @@protoc_insertion_point(class_specific_merge_from_start:milvus.proto.indexcgo.MapParams)
GOOGLE_DCHECK_NE(&from, this);
_internal_metadata_.MergeFrom(from._internal_metadata_);
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
params_.MergeFrom(from.params_);
}
void MapParams::CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) {
// @@protoc_insertion_point(generalized_copy_from_start:milvus.proto.indexcgo.MapParams)
if (&from == this) return;
Clear();
MergeFrom(from);
}
void MapParams::CopyFrom(const MapParams& from) {
// @@protoc_insertion_point(class_specific_copy_from_start:milvus.proto.indexcgo.MapParams)
if (&from == this) return;
Clear();
MergeFrom(from);
}
bool MapParams::IsInitialized() const {
return true;
}
void MapParams::InternalSwap(MapParams* other) {
using std::swap;
_internal_metadata_.Swap(&other->_internal_metadata_);
CastToBase(&params_)->InternalSwap(CastToBase(&other->params_));
}
::PROTOBUF_NAMESPACE_ID::Metadata MapParams::GetMetadata() const {
return GetMetadataStatic();
}
// ===================================================================
void Binary::InitAsDefaultInstance() {
@ -1299,6 +1592,9 @@ template<> PROTOBUF_NOINLINE ::milvus::proto::indexcgo::TypeParams* Arena::Creat
template<> PROTOBUF_NOINLINE ::milvus::proto::indexcgo::IndexParams* Arena::CreateMaybeMessage< ::milvus::proto::indexcgo::IndexParams >(Arena* arena) {
return Arena::CreateInternal< ::milvus::proto::indexcgo::IndexParams >(arena);
}
template<> PROTOBUF_NOINLINE ::milvus::proto::indexcgo::MapParams* Arena::CreateMaybeMessage< ::milvus::proto::indexcgo::MapParams >(Arena* arena) {
return Arena::CreateInternal< ::milvus::proto::indexcgo::MapParams >(arena);
}
template<> PROTOBUF_NOINLINE ::milvus::proto::indexcgo::Binary* Arena::CreateMaybeMessage< ::milvus::proto::indexcgo::Binary >(Arena* arena) {
return Arena::CreateInternal< ::milvus::proto::indexcgo::Binary >(arena);
}

View File

@ -48,7 +48,7 @@ struct TableStruct_index_5fcgo_5fmsg_2eproto {
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::AuxillaryParseTableField aux[]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[4]
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[5]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::FieldMetadata field_metadata[];
static const ::PROTOBUF_NAMESPACE_ID::internal::SerializationTable serialization_table[];
@ -67,6 +67,9 @@ extern BinarySetDefaultTypeInternal _BinarySet_default_instance_;
class IndexParams;
class IndexParamsDefaultTypeInternal;
extern IndexParamsDefaultTypeInternal _IndexParams_default_instance_;
class MapParams;
class MapParamsDefaultTypeInternal;
extern MapParamsDefaultTypeInternal _MapParams_default_instance_;
class TypeParams;
class TypeParamsDefaultTypeInternal;
extern TypeParamsDefaultTypeInternal _TypeParams_default_instance_;
@ -77,6 +80,7 @@ PROTOBUF_NAMESPACE_OPEN
template<> ::milvus::proto::indexcgo::Binary* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::Binary>(Arena*);
template<> ::milvus::proto::indexcgo::BinarySet* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::BinarySet>(Arena*);
template<> ::milvus::proto::indexcgo::IndexParams* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::IndexParams>(Arena*);
template<> ::milvus::proto::indexcgo::MapParams* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::MapParams>(Arena*);
template<> ::milvus::proto::indexcgo::TypeParams* Arena::CreateMaybeMessage<::milvus::proto::indexcgo::TypeParams>(Arena*);
PROTOBUF_NAMESPACE_CLOSE
namespace milvus {
@ -359,6 +363,143 @@ class IndexParams :
};
// -------------------------------------------------------------------
class MapParams :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.indexcgo.MapParams) */ {
public:
MapParams();
virtual ~MapParams();
MapParams(const MapParams& from);
MapParams(MapParams&& from) noexcept
: MapParams() {
*this = ::std::move(from);
}
inline MapParams& operator=(const MapParams& from) {
CopyFrom(from);
return *this;
}
inline MapParams& operator=(MapParams&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const MapParams& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const MapParams* internal_default_instance() {
return reinterpret_cast<const MapParams*>(
&_MapParams_default_instance_);
}
static constexpr int kIndexInFileMessages =
2;
friend void swap(MapParams& a, MapParams& b) {
a.Swap(&b);
}
inline void Swap(MapParams* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline MapParams* New() const final {
return CreateMaybeMessage<MapParams>(nullptr);
}
MapParams* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<MapParams>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const MapParams& from);
void MergeFrom(const MapParams& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
#else
bool MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final;
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final;
::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(MapParams* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "milvus.proto.indexcgo.MapParams";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_index_5fcgo_5fmsg_2eproto);
return ::descriptor_table_index_5fcgo_5fmsg_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kParamsFieldNumber = 1,
};
// repeated .milvus.proto.common.KeyValuePair params = 1;
int params_size() const;
void clear_params();
::milvus::proto::common::KeyValuePair* mutable_params(int index);
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >*
mutable_params();
const ::milvus::proto::common::KeyValuePair& params(int index) const;
::milvus::proto::common::KeyValuePair* add_params();
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >&
params() const;
// @@protoc_insertion_point(class_scope:milvus.proto.indexcgo.MapParams)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair > params_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_index_5fcgo_5fmsg_2eproto;
};
// -------------------------------------------------------------------
class Binary :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.indexcgo.Binary) */ {
public:
@ -401,7 +542,7 @@ class Binary :
&_Binary_default_instance_);
}
static constexpr int kIndexInFileMessages =
2;
3;
friend void swap(Binary& a, Binary& b) {
a.Swap(&b);
@ -551,7 +692,7 @@ class BinarySet :
&_BinarySet_default_instance_);
}
static constexpr int kIndexInFileMessages =
3;
4;
friend void swap(BinarySet& a, BinarySet& b) {
a.Swap(&b);
@ -715,6 +856,37 @@ IndexParams::params() const {
// -------------------------------------------------------------------
// MapParams
// repeated .milvus.proto.common.KeyValuePair params = 1;
inline int MapParams::params_size() const {
return params_.size();
}
inline ::milvus::proto::common::KeyValuePair* MapParams::mutable_params(int index) {
// @@protoc_insertion_point(field_mutable:milvus.proto.indexcgo.MapParams.params)
return params_.Mutable(index);
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >*
MapParams::mutable_params() {
// @@protoc_insertion_point(field_mutable_list:milvus.proto.indexcgo.MapParams.params)
return &params_;
}
inline const ::milvus::proto::common::KeyValuePair& MapParams::params(int index) const {
// @@protoc_insertion_point(field_get:milvus.proto.indexcgo.MapParams.params)
return params_.Get(index);
}
inline ::milvus::proto::common::KeyValuePair* MapParams::add_params() {
// @@protoc_insertion_point(field_add:milvus.proto.indexcgo.MapParams.params)
return params_.Add();
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >&
MapParams::params() const {
// @@protoc_insertion_point(field_list:milvus.proto.indexcgo.MapParams.params)
return params_;
}
// -------------------------------------------------------------------
// Binary
// string key = 1;
@ -862,6 +1034,8 @@ BinarySet::datas() const {
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// @@protoc_insertion_point(namespace_scope)

View File

@ -196,7 +196,7 @@ ReorganizeQueryResults(CMarshaledHits* c_marshaled_hits,
auto loc = search_result->result_offsets_[j];
result_distances[loc] = search_result->result_distances_[j];
row_datas[loc] = search_result->row_data_[j];
result_ids[loc] = search_result->result_ids_[j];
memcpy(&result_ids[loc], search_result->row_data_[j].data(), sizeof(int64_t));
}
count += size;
}

View File

@ -31,6 +31,10 @@ type Index interface {
Load([]*Blob) error
BuildFloatVecIndexWithoutIds(vectors []float32) error
BuildBinaryVecIndexWithoutIds(vectors []byte) error
QueryOnFloatVecIndex(vectors []float32) (QueryResult, error)
QueryOnBinaryVecIndex(vectors []byte) (QueryResult, error)
QueryOnFloatVecIndexWithParam(vectors []float32, params map[string]string) (QueryResult, error)
QueryOnBinaryVecIndexWithParam(vectors []byte, params map[string]string) (QueryResult, error)
Delete() error
}
@ -132,6 +136,8 @@ func (index *CIndex) Delete() error {
DeleteIndex(CIndex index);
*/
C.DeleteIndex(index.indexPtr)
// TODO: check if index.indexPtr will be released by golang, though it occupies little memory
// C.free(index.indexPtr)
return nil
}
@ -174,3 +180,115 @@ func NewCIndex(typeParams, indexParams map[string]string) (Index, error) {
indexPtr: indexPtr,
}, nil
}
func (index *CIndex) QueryOnFloatVecIndex(vectors []float32) (QueryResult, error) {
if len(vectors) <= 0 {
return nil, errors.New("nq is zero")
}
res, err := CreateQueryResult()
if err != nil {
return nil, err
}
fn := func() C.CStatus {
cRes, ok := res.(*CQueryResult)
if !ok {
// TODO: ugly here, fix me later
panic("only CQueryResult is supported now!")
}
return C.QueryOnFloatVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (*C.float)(&vectors[0]), &cRes.ptr)
}
err = TryCatch(fn)
if err != nil {
return nil, err
}
return res, nil
}
func (index *CIndex) QueryOnBinaryVecIndex(vectors []byte) (QueryResult, error) {
if len(vectors) <= 0 {
return nil, errors.New("nq is zero")
}
res, err := CreateQueryResult()
if err != nil {
return nil, err
}
fn := func() C.CStatus {
cRes, ok := res.(*CQueryResult)
if !ok {
// TODO: ugly here, fix me later
panic("only CQueryResult is supported now!")
}
return C.QueryOnBinaryVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (*C.uint8_t)(&vectors[0]), &cRes.ptr)
}
err = TryCatch(fn)
if err != nil {
return nil, err
}
return res, nil
}
func (index *CIndex) QueryOnFloatVecIndexWithParam(vectors []float32, params map[string]string) (QueryResult, error) {
if len(vectors) <= 0 {
return nil, errors.New("nq is zero")
}
protoParams := &indexcgopb.MapParams{
Params: make([]*commonpb.KeyValuePair, 0),
}
for key, value := range params {
protoParams.Params = append(protoParams.Params, &commonpb.KeyValuePair{Key: key, Value: value})
}
paramsStr := proto.MarshalTextString(protoParams)
paramsPointer := C.CString(paramsStr)
res, err := CreateQueryResult()
if err != nil {
return nil, err
}
fn := func() C.CStatus {
cRes, ok := res.(*CQueryResult)
if !ok {
// TODO: ugly here, fix me later
panic("only CQueryResult is supported now!")
}
return C.QueryOnFloatVecIndexWithParam(index.indexPtr, (C.int64_t)(len(vectors)), (*C.float)(&vectors[0]), paramsPointer, &cRes.ptr)
}
err = TryCatch(fn)
if err != nil {
return nil, err
}
return res, nil
}
func (index *CIndex) QueryOnBinaryVecIndexWithParam(vectors []byte, params map[string]string) (QueryResult, error) {
if len(vectors) <= 0 {
return nil, errors.New("nq is zero")
}
protoParams := &indexcgopb.MapParams{
Params: make([]*commonpb.KeyValuePair, 0),
}
for key, value := range params {
protoParams.Params = append(protoParams.Params, &commonpb.KeyValuePair{Key: key, Value: value})
}
paramsStr := proto.MarshalTextString(protoParams)
paramsPointer := C.CString(paramsStr)
res, err := CreateQueryResult()
if err != nil {
return nil, err
}
fn := func() C.CStatus {
cRes, ok := res.(*CQueryResult)
if !ok {
// TODO: ugly here, fix me later
panic("only CQueryResult is supported now!")
}
return C.QueryOnBinaryVecIndexWithParam(index.indexPtr, (C.int64_t)(len(vectors)), (*C.uint8_t)(&vectors[0]), paramsPointer, &cRes.ptr)
}
err = TryCatch(fn)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,103 @@
package indexbuilder
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_indexbuilder -Wl,-rpath=${SRCDIR}/../core/output/lib
#include <stdlib.h> // free
#include "segcore/collection_c.h"
#include "indexbuilder/index_c.h"
*/
import "C"
import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"strconv"
"unsafe"
)
type QueryResult interface {
Delete() error
NQ() int64
TOPK() int64
IDs() []int64
Distances() []float32
}
type CQueryResult struct {
ptr C.CIndexQueryResult
}
type CFunc func() C.CStatus
func TryCatch(fn CFunc) error {
status := fn()
errorCode := status.error_code
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return errors.New("error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
return nil
}
func CreateQueryResult() (QueryResult, error) {
var ptr C.CIndexQueryResult
fn := func() C.CStatus {
return C.CreateQueryResult(&ptr)
}
err := TryCatch(fn)
if err != nil {
return nil, err
}
return &CQueryResult{
ptr: ptr,
}, nil
}
func (qs *CQueryResult) Delete() error {
fn := func() C.CStatus {
return C.DeleteQueryResult(qs.ptr)
}
return TryCatch(fn)
}
func (qs *CQueryResult) NQ() int64 {
return int64(C.NqOfQueryResult(qs.ptr))
}
func (qs *CQueryResult) TOPK() int64 {
return int64(C.TopkOfQueryResult(qs.ptr))
}
func (qs *CQueryResult) IDs() []int64 {
nq := qs.NQ()
topk := qs.TOPK()
if nq <= 0 || topk <= 0 {
return []int64{}
}
// TODO: how could we avoid memory copy every time when this called
ids := make([]int64, nq*topk)
C.GetIdsOfQueryResult(qs.ptr, (*C.int64_t)(&ids[0]))
return ids
}
func (qs *CQueryResult) Distances() []float32 {
nq := qs.NQ()
topk := qs.TOPK()
if nq <= 0 || topk <= 0 {
return []float32{}
}
// TODO: how could we avoid memory copy every time when this called
distances := make([]float32, nq*topk)
C.GetDistancesOfQueryResult(qs.ptr, (*C.float)(&distances[0]))
return distances
}

View File

@ -71,6 +71,22 @@ func (t *createCollectionTask) Execute() error {
singleFiled.FieldID = int64(index + 100)
}
zeroField := &schemapb.FieldSchema{
FieldID: int64(0),
Name: "RowID",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT64,
}
oneField := &schemapb.FieldSchema{
FieldID: int64(1),
Name: "Timestamp",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT64,
}
schema.Fields = append(schema.Fields, zeroField, oneField)
collectionID, err := t.sch.globalIDAllocator()
if err != nil {
return err

View File

@ -99,6 +99,7 @@ func TestMaster(t *testing.T) {
// Creates server.
ctx, cancel := context.WithCancel(context.Background())
svr, err := CreateServer(ctx)
if err != nil {
log.Print("create server failed", zap.Error(err))
}
@ -326,16 +327,23 @@ func TestMaster(t *testing.T) {
t.Logf("collection id = %d", collMeta.ID)
assert.Equal(t, collMeta.Schema.Name, "col1")
assert.Equal(t, collMeta.Schema.AutoID, false)
assert.Equal(t, len(collMeta.Schema.Fields), 2)
assert.Equal(t, len(collMeta.Schema.Fields), 4)
assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1")
assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2")
assert.Equal(t, collMeta.Schema.Fields[2].Name, "RowID")
assert.Equal(t, collMeta.Schema.Fields[3].Name, "Timestamp")
assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT)
assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY)
assert.Equal(t, collMeta.Schema.Fields[2].DataType, schemapb.DataType_INT64)
assert.Equal(t, collMeta.Schema.Fields[3].DataType, schemapb.DataType_INT64)
assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2)
assert.Equal(t, len(collMeta.Schema.Fields[0].IndexParams), 2)
assert.Equal(t, len(collMeta.Schema.Fields[1].TypeParams), 2)
assert.Equal(t, len(collMeta.Schema.Fields[1].IndexParams), 2)
assert.Equal(t, int64(100), collMeta.Schema.Fields[0].FieldID)
assert.Equal(t, int64(101), collMeta.Schema.Fields[1].FieldID)
assert.Equal(t, int64(0), collMeta.Schema.Fields[2].FieldID)
assert.Equal(t, int64(1), collMeta.Schema.Fields[3].FieldID)
assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1")
assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2")
assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1")
@ -651,9 +659,11 @@ func TestMaster(t *testing.T) {
t.Logf("collection id = %d", collMeta.ID)
assert.Equal(t, collMeta.Schema.Name, "col1")
assert.Equal(t, collMeta.Schema.AutoID, false)
assert.Equal(t, len(collMeta.Schema.Fields), 2)
assert.Equal(t, len(collMeta.Schema.Fields), 4)
assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1")
assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2")
assert.Equal(t, collMeta.Schema.Fields[2].Name, "RowID")
assert.Equal(t, collMeta.Schema.Fields[3].Name, "Timestamp")
assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT)
assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY)
assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2)
@ -919,7 +929,6 @@ func TestMaster(t *testing.T) {
assert.Equal(t, createCollectionReq.ReqID, createCollectionMsg.CreateCollectionRequest.ReqID)
assert.Equal(t, createCollectionReq.Timestamp, createCollectionMsg.CreateCollectionRequest.Timestamp)
assert.Equal(t, createCollectionReq.ProxyID, createCollectionMsg.CreateCollectionRequest.ProxyID)
assert.Equal(t, createCollectionReq.Schema.Value, createCollectionMsg.CreateCollectionRequest.Schema.Value)
////////////////////////////CreatePartition////////////////////////
partitionName := "partitionName" + strconv.FormatUint(rand.Uint64(), 10)

View File

@ -13,6 +13,11 @@ message IndexParams {
repeated common.KeyValuePair params = 1;
}
// TypeParams & IndexParams will be replaced by MapParams later
message MapParams {
repeated common.KeyValuePair params = 1;
}
message Binary {
string key = 1;
bytes value = 2;

View File

@ -99,6 +99,46 @@ func (m *IndexParams) GetParams() []*commonpb.KeyValuePair {
return nil
}
// TypeParams & IndexParams will be replaced by MapParams later
type MapParams struct {
Params []*commonpb.KeyValuePair `protobuf:"bytes,1,rep,name=params,proto3" json:"params,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MapParams) Reset() { *m = MapParams{} }
func (m *MapParams) String() string { return proto.CompactTextString(m) }
func (*MapParams) ProtoMessage() {}
func (*MapParams) Descriptor() ([]byte, []int) {
return fileDescriptor_c009bd9544a7343c, []int{2}
}
func (m *MapParams) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MapParams.Unmarshal(m, b)
}
func (m *MapParams) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_MapParams.Marshal(b, m, deterministic)
}
func (m *MapParams) XXX_Merge(src proto.Message) {
xxx_messageInfo_MapParams.Merge(m, src)
}
func (m *MapParams) XXX_Size() int {
return xxx_messageInfo_MapParams.Size(m)
}
func (m *MapParams) XXX_DiscardUnknown() {
xxx_messageInfo_MapParams.DiscardUnknown(m)
}
var xxx_messageInfo_MapParams proto.InternalMessageInfo
func (m *MapParams) GetParams() []*commonpb.KeyValuePair {
if m != nil {
return m.Params
}
return nil
}
type Binary struct {
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
@ -111,7 +151,7 @@ func (m *Binary) Reset() { *m = Binary{} }
func (m *Binary) String() string { return proto.CompactTextString(m) }
func (*Binary) ProtoMessage() {}
func (*Binary) Descriptor() ([]byte, []int) {
return fileDescriptor_c009bd9544a7343c, []int{2}
return fileDescriptor_c009bd9544a7343c, []int{3}
}
func (m *Binary) XXX_Unmarshal(b []byte) error {
@ -157,7 +197,7 @@ func (m *BinarySet) Reset() { *m = BinarySet{} }
func (m *BinarySet) String() string { return proto.CompactTextString(m) }
func (*BinarySet) ProtoMessage() {}
func (*BinarySet) Descriptor() ([]byte, []int) {
return fileDescriptor_c009bd9544a7343c, []int{3}
return fileDescriptor_c009bd9544a7343c, []int{4}
}
func (m *BinarySet) XXX_Unmarshal(b []byte) error {
@ -188,6 +228,7 @@ func (m *BinarySet) GetDatas() []*Binary {
func init() {
proto.RegisterType((*TypeParams)(nil), "milvus.proto.indexcgo.TypeParams")
proto.RegisterType((*IndexParams)(nil), "milvus.proto.indexcgo.IndexParams")
proto.RegisterType((*MapParams)(nil), "milvus.proto.indexcgo.MapParams")
proto.RegisterType((*Binary)(nil), "milvus.proto.indexcgo.Binary")
proto.RegisterType((*BinarySet)(nil), "milvus.proto.indexcgo.BinarySet")
}
@ -195,22 +236,22 @@ func init() {
func init() { proto.RegisterFile("index_cgo_msg.proto", fileDescriptor_c009bd9544a7343c) }
var fileDescriptor_c009bd9544a7343c = []byte{
// 257 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x90, 0xbf, 0x4b, 0xc3, 0x40,
0x14, 0xc7, 0x89, 0xa5, 0x81, 0xbe, 0x76, 0x90, 0x53, 0x21, 0x08, 0x42, 0xcc, 0x94, 0xc5, 0x3b,
0xb1, 0x93, 0x9b, 0x04, 0x41, 0xc5, 0xa5, 0x44, 0x71, 0x70, 0x29, 0x97, 0xe4, 0x48, 0x1f, 0xde,
0x8f, 0x70, 0xb9, 0x14, 0xd3, 0xbf, 0x5e, 0x92, 0x6b, 0x06, 0xc1, 0xcd, 0xed, 0xbd, 0xc7, 0xf7,
0xf3, 0xf9, 0x1e, 0x07, 0x67, 0xa8, 0x2b, 0xf1, 0xbd, 0x2d, 0x6b, 0xb3, 0x55, 0x6d, 0x4d, 0x1b,
0x6b, 0x9c, 0x21, 0x17, 0x0a, 0xe5, 0xbe, 0x6b, 0xfd, 0x46, 0xc7, 0x44, 0x59, 0x9b, 0xcb, 0x55,
0x69, 0x94, 0x32, 0xda, 0x9f, 0x93, 0x27, 0x80, 0xf7, 0xbe, 0x11, 0x1b, 0x6e, 0xb9, 0x6a, 0xc9,
0x3d, 0x84, 0xcd, 0x38, 0x45, 0x41, 0x3c, 0x4b, 0x97, 0x77, 0xd7, 0xf4, 0x97, 0xe3, 0x48, 0xbe,
0x8a, 0xfe, 0x83, 0xcb, 0x4e, 0x6c, 0x38, 0xda, 0xfc, 0x08, 0x24, 0xcf, 0xb0, 0x7c, 0x19, 0x2a,
0xfe, 0x6f, 0xba, 0x85, 0x30, 0x43, 0xcd, 0x6d, 0x4f, 0x4e, 0x61, 0xf6, 0x25, 0xfa, 0x28, 0x88,
0x83, 0x74, 0x91, 0x0f, 0x23, 0x39, 0x87, 0xf9, 0x7e, 0x00, 0xa2, 0x93, 0x38, 0x48, 0x57, 0xb9,
0x5f, 0x92, 0x07, 0x58, 0x78, 0xe2, 0x4d, 0x38, 0xb2, 0x86, 0x79, 0xc5, 0x1d, 0x9f, 0x8a, 0xaf,
0xe8, 0x9f, 0xdf, 0x40, 0x3d, 0x90, 0xfb, 0x6c, 0xf6, 0xf8, 0x99, 0xd5, 0xe8, 0x76, 0x5d, 0x31,
0xbc, 0x8c, 0x1d, 0x50, 0x4a, 0x3c, 0x38, 0x51, 0xee, 0x98, 0x87, 0x6f, 0x2a, 0x6c, 0x9d, 0xc5,
0xa2, 0x73, 0xa2, 0x62, 0xa8, 0x9d, 0xb0, 0x9a, 0x4b, 0x36, 0x1a, 0xd9, 0x64, 0x6c, 0x8a, 0x22,
0x1c, 0x2f, 0xeb, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb4, 0xb0, 0x70, 0x01, 0x8f, 0x01, 0x00,
0x00,
// 264 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x90, 0x4d, 0x4b, 0xc3, 0x40,
0x10, 0x86, 0x89, 0xa5, 0x81, 0x4c, 0x7b, 0x90, 0xa8, 0x10, 0x04, 0x21, 0xe6, 0x94, 0x8b, 0x59,
0xb1, 0x27, 0x6f, 0x12, 0xc4, 0x0f, 0x44, 0x28, 0x51, 0x3c, 0x78, 0x29, 0x9b, 0x64, 0x49, 0x07,
0xf7, 0x8b, 0xcd, 0xa6, 0x98, 0xfe, 0x7a, 0x49, 0xb6, 0x3d, 0x08, 0xbd, 0xf5, 0x36, 0x33, 0xbc,
0xcf, 0xf3, 0x2e, 0x0b, 0x67, 0x28, 0x6b, 0xf6, 0xbb, 0xaa, 0x1a, 0xb5, 0x12, 0x6d, 0x93, 0x69,
0xa3, 0xac, 0x0a, 0x2f, 0x04, 0xf2, 0x4d, 0xd7, 0xba, 0x2d, 0x1b, 0x13, 0x55, 0xa3, 0x2e, 0xe7,
0x95, 0x12, 0x42, 0x49, 0x77, 0x4e, 0x9e, 0x01, 0x3e, 0x7b, 0xcd, 0x96, 0xd4, 0x50, 0xd1, 0x86,
0xf7, 0xe0, 0xeb, 0x71, 0x8a, 0xbc, 0x78, 0x92, 0xce, 0xee, 0xae, 0xb3, 0x7f, 0x8e, 0x1d, 0xf9,
0xc6, 0xfa, 0x2f, 0xca, 0x3b, 0xb6, 0xa4, 0x68, 0x8a, 0x1d, 0x90, 0xbc, 0xc0, 0xec, 0x75, 0xa8,
0x38, 0xde, 0xf4, 0x04, 0xc1, 0x3b, 0xd5, 0xc7, 0x7b, 0x6e, 0xc1, 0xcf, 0x51, 0x52, 0xd3, 0x87,
0xa7, 0x30, 0xf9, 0x61, 0x7d, 0xe4, 0xc5, 0x5e, 0x1a, 0x14, 0xc3, 0x18, 0x9e, 0xc3, 0x74, 0x33,
0x00, 0xd1, 0x49, 0xec, 0xa5, 0xf3, 0xc2, 0x2d, 0xc9, 0x03, 0x04, 0x8e, 0xf8, 0x60, 0x36, 0x5c,
0xc0, 0xb4, 0xa6, 0x96, 0xee, 0x8b, 0xaf, 0xb2, 0x83, 0xdf, 0x99, 0x39, 0xa0, 0x70, 0xd9, 0xfc,
0xf1, 0x3b, 0x6f, 0xd0, 0xae, 0xbb, 0x72, 0x78, 0x19, 0xd9, 0x22, 0xe7, 0xb8, 0xb5, 0xac, 0x5a,
0x13, 0x07, 0xdf, 0xd4, 0xd8, 0x5a, 0x83, 0x65, 0x67, 0x59, 0x4d, 0x50, 0x5a, 0x66, 0x24, 0xe5,
0x64, 0x34, 0x92, 0xbd, 0x51, 0x97, 0xa5, 0x3f, 0x5e, 0x16, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff,
0xf4, 0x8e, 0xbe, 0x8f, 0xd7, 0x01, 0x00, 0x00,
}

View File

@ -221,6 +221,30 @@ func newMeta() {
Description: "test collection",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
Name: "Timestamp",
Description: "test collection filed 1",
DataType: schemapb.DataType_INT64,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "col1_f1_tk2",
Value: "col1_f1_tv2",
},
},
},
{
FieldID: 0,
Name: "RawID",
Description: "test collection filed 1",
DataType: schemapb.DataType_INT64,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "col1_f1_tk2",
Value: "col1_f1_tv2",
},
},
},
{
FieldID: 100,
Name: "col1_f1",

View File

@ -1,13 +1,14 @@
package writenode
import (
"bytes"
"context"
"encoding/binary"
"log"
"math"
"path"
"strconv"
"time"
"unsafe"
"github.com/golang/protobuf/proto"
"github.com/minio/minio-go/v7"
@ -114,29 +115,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
}
}
// Timestamps
_, ok = idata.Data[1].(*storage.Int64FieldData)
if !ok {
idata.Data[1] = &storage.Int64FieldData{
Data: []int64{},
NumRows: 0,
}
}
tsData := idata.Data[1].(*storage.Int64FieldData)
for _, ts := range msg.Timestamps {
tsData.Data = append(tsData.Data, int64(ts))
}
tsData.NumRows += len(msg.Timestamps)
// 1.1 Get CollectionMeta from etcd
segMeta, collMeta, err := ibNode.getMeta(currentSegID)
if err != nil {
// GOOSE TODO add error handler
log.Println("Get meta wrong")
log.Println("Get meta wrong:", err)
continue
}
// 1.2 Get Fields
var pos = 0 // Record position of blob
var pos int = 0 // Record position of blob
for _, field := range collMeta.Schema.Fields {
switch field.DataType {
case schemapb.DataType_VECTOR_FLOAT:
@ -167,17 +155,17 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
for _, blob := range msg.RowData {
for j := 0; j < dim; j++ {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float32frombits(v))
pos++
var v float32
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.read float32 err:", err)
}
fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println(".Float vector data:\n",
// "..NumRows:",
// idata.Data[field.FieldID].(*storage.FloatVectorFieldData).NumRows,
// "..Dim:",
// idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Dim)
case schemapb.DataType_VECTOR_BINARY:
var dim int
@ -205,20 +193,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
for _, blob := range msg.RowData {
for d := 0; d < dim/8; d++ {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, byte(v))
pos++
}
bv := blob.GetValue()[pos : pos+(dim/8)]
fieldData.Data = append(fieldData.Data, bv...)
pos += len(bv)
}
fieldData.NumRows += len(msg.RowData)
// log.Println(
// ".Binary vector data:\n",
// "..NumRows:",
// idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).NumRows,
// "..Dim:",
// idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Dim)
case schemapb.DataType_BOOL:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BoolFieldData{
@ -229,18 +209,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
for _, blob := range msg.RowData {
boolInt := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
if boolInt == 1 {
fieldData.Data = append(fieldData.Data, true)
} else {
fieldData.Data = append(fieldData.Data, false)
var v bool
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read bool failed:", err)
}
pos++
fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Bool data:",
// idata.Data[field.FieldID].(*storage.BoolFieldData).Data)
case schemapb.DataType_INT8:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int8FieldData{
@ -251,13 +229,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int8(v))
pos++
var v int8
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int8 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Int8 data:",
// idata.Data[field.FieldID].(*storage.Int8FieldData).Data)
case schemapb.DataType_INT16:
if _, ok := idata.Data[field.FieldID]; !ok {
@ -269,14 +249,17 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int16(v))
pos++
var v int16
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int16 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Int16 data:",
// idata.Data[field.FieldID].(*storage.Int16FieldData).Data)
case schemapb.DataType_INT32:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int32FieldData{
@ -287,13 +270,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int32(v))
pos++
var v int32
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int32 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Int32 data:",
// idata.Data[field.FieldID].(*storage.Int32FieldData).Data)
case schemapb.DataType_INT64:
if _, ok := idata.Data[field.FieldID]; !ok {
@ -304,15 +289,30 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
}
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, int64(v))
pos++
}
switch field.FieldID {
case 0:
fieldData.Data = append(fieldData.Data, msg.RowIDs...)
fieldData.NumRows += len(msg.RowIDs)
case 1:
// Timestamps
for _, ts := range msg.Timestamps {
fieldData.Data = append(fieldData.Data, int64(ts))
}
fieldData.NumRows += len(msg.Timestamps)
default:
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Int64 data:",
// idata.Data[field.FieldID].(*storage.Int64FieldData).Data)
for _, blob := range msg.RowData {
var v int64
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int64 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
fieldData.NumRows += len(msg.RowIDs)
}
case schemapb.DataType_FLOAT:
if _, ok := idata.Data[field.FieldID]; !ok {
@ -324,14 +324,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float32frombits(v))
pos++
var v float32
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read float32 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Float32 data:",
// idata.Data[field.FieldID].(*storage.FloatFieldData).Data)
case schemapb.DataType_DOUBLE:
if _, ok := idata.Data[field.FieldID]; !ok {
@ -343,14 +345,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
for _, blob := range msg.RowData {
v := binary.LittleEndian.Uint64(blob.GetValue()[pos*4:])
fieldData.Data = append(fieldData.Data, math.Float64frombits(v))
pos++
var v float64
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read float64 failed:", err)
}
fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
fieldData.NumRows += len(msg.RowIDs)
// log.Println("Float64 data:",
// idata.Data[field.FieldID].(*storage.DoubleFieldData).Data)
}
}
@ -376,7 +380,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
if err != nil {
log.Println("generate binlog wrong")
log.Println("generate binlog wrong: ", err)
}
// clear buffer
@ -451,7 +455,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
segMeta, collMeta, err := ibNode.getMeta(currentSegID)
if err != nil {
// GOOSE TODO add error handler
log.Println("Get meta wrong")
log.Println("Get meta wrong: ", err)
}
inCodec := storage.NewInsertCodec(collMeta)
@ -460,14 +464,14 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
partitionID, err := typeutil.Hash32String(partitionTag)
if err != nil {
// GOOSE TODO add error handler
log.Println("partitionTag to partitionID Wrong")
log.Println("partitionTag to partitionID Wrong: ", err)
}
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
if err != nil {
log.Println("generate binlog wrong")
log.Println("generate binlog wrong: ", err)
}
// clear buffer

View File

@ -1,8 +1,10 @@
package writenode
import (
"bytes"
"context"
"encoding/binary"
"log"
"math"
"testing"
"time"
@ -67,64 +69,78 @@ func genInsertMsg() insertMsg {
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...)
}
log.Println(len(rawData))
// Binary vector
// Dimension of binary vector is 32
var bvector = [4]byte{255, 255, 255, 0}
for _, ele := range bvector {
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, uint32(ele))
rawData = append(rawData, bs...)
}
// size := 4, = 32 / 8
var bvector = []byte{255, 255, 255, 0}
rawData = append(rawData, bvector...)
log.Println(len(rawData))
// Bool
bb := make([]byte, 4)
var fieldBool = true
var fieldBoolInt uint32
if fieldBool {
fieldBoolInt = 1
} else {
fieldBoolInt = 0
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil {
panic(err)
}
binary.LittleEndian.PutUint32(bb, fieldBoolInt)
rawData = append(rawData, bb...)
rawData = append(rawData, buf.Bytes()...)
log.Println(len(rawData))
// int8
var dataInt8 int8 = 100
bint8 := make([]byte, 4)
binary.LittleEndian.PutUint32(bint8, uint32(dataInt8))
rawData = append(rawData, bint8...)
bint8 := new(bytes.Buffer)
if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil {
panic(err)
}
rawData = append(rawData, bint8.Bytes()...)
log.Println(len(rawData))
// int16
var dataInt16 int16 = 200
bint16 := make([]byte, 4)
binary.LittleEndian.PutUint32(bint16, uint32(dataInt16))
rawData = append(rawData, bint16...)
bint16 := new(bytes.Buffer)
if err := binary.Write(bint16, binary.LittleEndian, dataInt16); err != nil {
panic(err)
}
rawData = append(rawData, bint16.Bytes()...)
log.Println(len(rawData))
// int32
var dataInt32 int32 = 300
bint32 := make([]byte, 4)
binary.LittleEndian.PutUint32(bint32, uint32(dataInt32))
rawData = append(rawData, bint32...)
bint32 := new(bytes.Buffer)
if err := binary.Write(bint32, binary.LittleEndian, dataInt32); err != nil {
panic(err)
}
rawData = append(rawData, bint32.Bytes()...)
log.Println(len(rawData))
// int64
var dataInt64 int64 = 300
bint64 := make([]byte, 4)
binary.LittleEndian.PutUint32(bint64, uint32(dataInt64))
rawData = append(rawData, bint64...)
bint64 := new(bytes.Buffer)
if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil {
panic(err)
}
rawData = append(rawData, bint64.Bytes()...)
log.Println(len(rawData))
// float32
var datafloat float32 = 1.1
bfloat32 := make([]byte, 4)
binary.LittleEndian.PutUint32(bfloat32, math.Float32bits(datafloat))
rawData = append(rawData, bfloat32...)
bfloat32 := new(bytes.Buffer)
if err := binary.Write(bfloat32, binary.LittleEndian, datafloat); err != nil {
panic(err)
}
rawData = append(rawData, bfloat32.Bytes()...)
log.Println(len(rawData))
// float64
var datafloat64 float64 = 2.2
bfloat64 := make([]byte, 8)
binary.LittleEndian.PutUint64(bfloat64, math.Float64bits(datafloat64))
rawData = append(rawData, bfloat64...)
bfloat64 := new(bytes.Buffer)
if err := binary.Write(bfloat64, binary.LittleEndian, datafloat64); err != nil {
panic(err)
}
rawData = append(rawData, bfloat64.Bytes()...)
log.Println(len(rawData))
timeRange := TimeRange{
timestampMin: 0,