Support GetEntityByIDs in CGo, fix segcore bugs (#5563)

Signed-off-by: fluorinedog <fluorinedog@gmail.com>
This commit is contained in:
FluorineDog 2021-06-04 10:38:34 +08:00 committed by GitHub
parent f9e03c5468
commit 9a90313390
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 2053 additions and 259 deletions

1
go.mod
View File

@ -13,6 +13,7 @@ require (
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/frankban/quicktest v1.10.2 // indirect
github.com/go-basic/ipv4 v1.0.0
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.4.3

View File

@ -0,0 +1,34 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "common/type_c.h"
#include <string>
namespace milvus {
inline CProtoResult
AllocCProtoResult(const google::protobuf::Message& msg) {
auto size = msg.ByteSize();
void* buffer = malloc(size);
msg.SerializePartialToArray(buffer, size);
return CProtoResult{CStatus{Success}, CProto{buffer, size}};
}
inline CStatus
SuccessCStatus() {
return CStatus{Success, ""};
}
inline CStatus
FailureCStatus(ErrorCode error_code, const std::string& str) {
auto str_dup = strdup(str.c_str());
return CStatus{error_code, str_dup};
}
} // namespace milvus

View File

@ -14,6 +14,9 @@
#include "exceptions/EasyAssert.h"
#include <boost/bimap.hpp>
#include <boost/algorithm/string/case_conv.hpp>
#include "common/type_c.h"
#include "pb/schema.pb.h"
#include "CGoHelper.h"
namespace milvus {
@ -48,3 +51,16 @@ MetricTypeToName(MetricType metric_type) {
}
} // namespace milvus
CProtoResult
CTestBoolArrayPb(CProto pb) {
milvus::proto::schema::BoolArray bool_array;
bool_array.ParseFromArray(pb.proto_blob, pb.proto_size);
// std::cout << pb.proto_size << std::endl;
// std::cout << bool_array.DebugString() << std::endl;
for (auto& b : *bool_array.mutable_data()) {
b = !b;
}
// create bool proto
return milvus::AllocCProtoResult(bool_array);
}

View File

@ -36,12 +36,25 @@ typedef struct CStatus {
const char* error_msg;
} CStatus;
typedef struct CProto {
const void* proto_blob;
int64_t proto_size;
} CProto;
typedef struct CLoadFieldDataInfo {
int64_t field_id;
void* blob;
int64_t row_count;
} CLoadFieldDataInfo;
typedef struct CProtoResult {
CStatus status;
CProto proto;
} CProtoResult;
CProtoResult
CTestBoolArrayPb(CProto pb);
#ifdef __cplusplus
}
#endif

View File

@ -318,17 +318,20 @@ namespace NGT {
virtual uint8_t &operator[](size_t idx) const = 0;
void serialize(std::ostream &os, ObjectSpace *objectspace = 0) {
assert(objectspace != 0);
if(objectspace == 0) return; // make compiler happy;
size_t byteSize = objectspace->getByteSizeOfObject();
NGT::Serializer::write(os, (uint8_t*)&(*this)[0], byteSize);
}
void deserialize(std::istream &is, ObjectSpace *objectspace = 0) {
assert(objectspace != 0);
if(objectspace == 0) return; // make compiler happy;
size_t byteSize = objectspace->getByteSizeOfObject();
assert(&(*this)[0] != 0);
NGT::Serializer::read(is, (uint8_t*)&(*this)[0], byteSize);
}
void serializeAsText(std::ostream &os, ObjectSpace *objectspace = 0) {
assert(objectspace != 0);
if(objectspace == 0) return; // make compiler happy;
const std::type_info &t = objectspace->getObjectType();
size_t dimension = objectspace->getDimension();
void *ref = (void*)&(*this)[0];
@ -349,6 +352,7 @@ namespace NGT {
}
void deserializeAsText(std::ifstream &is, ObjectSpace *objectspace = 0) {
assert(objectspace != 0);
if(objectspace == 0) return;
const std::type_info &t = objectspace->getObjectType();
size_t dimension = objectspace->getDimension();
void *ref = (void*)&(*this)[0];
@ -375,6 +379,7 @@ namespace NGT {
public:
Object(NGT::ObjectSpace *os = 0):vector(0) {
assert(os != 0);
if(os == 0) return;
size_t s = os->getByteSizeOfObject();
construct(s);
}

View File

@ -663,7 +663,7 @@ namespace NGT {
size_t actualResultSize = 0;
std::vector<MeasuredValue> accuracies = evaluate(gtStream, resultStream, type, actualResultSize);
size_t size;
double distanceCount, visitCount, time;
double distanceCount = 0, visitCount = 0, time = 0;
calculateMeanValues(accuracies, accuracyRange.first, accuracyRange.second, size, distanceCount, visitCount, time);
if (distanceCount == 0) {
std::stringstream msg;

View File

@ -22,6 +22,8 @@ extern PROTOBUF_INTERNAL_EXPORT_plan_2eproto ::PROTOBUF_NAMESPACE_ID::internal::
extern PROTOBUF_INTERNAL_EXPORT_plan_2eproto ::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<2> scc_info_RangeExpr_plan_2eproto;
extern PROTOBUF_INTERNAL_EXPORT_plan_2eproto ::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<2> scc_info_TermExpr_plan_2eproto;
extern PROTOBUF_INTERNAL_EXPORT_plan_2eproto ::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<2> scc_info_VectorANNS_plan_2eproto;
extern PROTOBUF_INTERNAL_EXPORT_schema_2eproto ::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<2> scc_info_FieldData_schema_2eproto;
extern PROTOBUF_INTERNAL_EXPORT_schema_2eproto ::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<2> scc_info_IDs_schema_2eproto;
namespace milvus {
namespace proto {
namespace plan {
@ -73,6 +75,14 @@ class PlanNodeDefaultTypeInternal {
::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed<PlanNode> _instance;
const ::milvus::proto::plan::VectorANNS* vector_anns_;
} _PlanNode_default_instance_;
class RetrieveRequestDefaultTypeInternal {
public:
::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed<RetrieveRequest> _instance;
} _RetrieveRequest_default_instance_;
class RetrieveResultsDefaultTypeInternal {
public:
::PROTOBUF_NAMESPACE_ID::internal::ExplicitlyConstructed<RetrieveResults> _instance;
} _RetrieveResults_default_instance_;
} // namespace plan
} // namespace proto
} // namespace milvus
@ -177,6 +187,37 @@ static void InitDefaultsscc_info_RangeExpr_plan_2eproto() {
&scc_info_ColumnInfo_plan_2eproto.base,
&scc_info_GenericValue_plan_2eproto.base,}};
static void InitDefaultsscc_info_RetrieveRequest_plan_2eproto() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
{
void* ptr = &::milvus::proto::plan::_RetrieveRequest_default_instance_;
new (ptr) ::milvus::proto::plan::RetrieveRequest();
::PROTOBUF_NAMESPACE_ID::internal::OnShutdownDestroyMessage(ptr);
}
::milvus::proto::plan::RetrieveRequest::InitAsDefaultInstance();
}
::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<1> scc_info_RetrieveRequest_plan_2eproto =
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_RetrieveRequest_plan_2eproto}, {
&scc_info_IDs_schema_2eproto.base,}};
static void InitDefaultsscc_info_RetrieveResults_plan_2eproto() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
{
void* ptr = &::milvus::proto::plan::_RetrieveResults_default_instance_;
new (ptr) ::milvus::proto::plan::RetrieveResults();
::PROTOBUF_NAMESPACE_ID::internal::OnShutdownDestroyMessage(ptr);
}
::milvus::proto::plan::RetrieveResults::InitAsDefaultInstance();
}
::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<2> scc_info_RetrieveResults_plan_2eproto =
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 2, InitDefaultsscc_info_RetrieveResults_plan_2eproto}, {
&scc_info_IDs_schema_2eproto.base,
&scc_info_FieldData_schema_2eproto.base,}};
static void InitDefaultsscc_info_TermExpr_plan_2eproto() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
@ -209,7 +250,7 @@ static void InitDefaultsscc_info_VectorANNS_plan_2eproto() {
&scc_info_BinaryExpr_plan_2eproto.base,
&scc_info_QueryInfo_plan_2eproto.base,}};
static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_plan_2eproto[10];
static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_plan_2eproto[12];
static const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* file_level_enum_descriptors_plan_2eproto[3];
static constexpr ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor const** file_level_service_descriptors_plan_2eproto = nullptr;
@ -295,6 +336,20 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_plan_2eproto::offsets[] PROTOB
~0u, // no _weak_field_map_
offsetof(::milvus::proto::plan::PlanNodeDefaultTypeInternal, vector_anns_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::plan::PlanNode, node_),
~0u, // no _has_bits_
PROTOBUF_FIELD_OFFSET(::milvus::proto::plan::RetrieveRequest, _internal_metadata_),
~0u, // no _extensions_
~0u, // no _oneof_case_
~0u, // no _weak_field_map_
PROTOBUF_FIELD_OFFSET(::milvus::proto::plan::RetrieveRequest, ids_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::plan::RetrieveRequest, output_fields_),
~0u, // no _has_bits_
PROTOBUF_FIELD_OFFSET(::milvus::proto::plan::RetrieveResults, _internal_metadata_),
~0u, // no _extensions_
~0u, // no _oneof_case_
~0u, // no _weak_field_map_
PROTOBUF_FIELD_OFFSET(::milvus::proto::plan::RetrieveResults, ids_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::plan::RetrieveResults, fields_data_),
};
static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = {
{ 0, -1, sizeof(::milvus::proto::plan::GenericValue)},
@ -307,6 +362,8 @@ static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOB
{ 54, -1, sizeof(::milvus::proto::plan::Expr)},
{ 64, -1, sizeof(::milvus::proto::plan::VectorANNS)},
{ 74, -1, sizeof(::milvus::proto::plan::PlanNode)},
{ 81, -1, sizeof(::milvus::proto::plan::RetrieveRequest)},
{ 88, -1, sizeof(::milvus::proto::plan::RetrieveResults)},
};
static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = {
@ -320,6 +377,8 @@ static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] =
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::plan::_Expr_default_instance_),
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::plan::_VectorANNS_default_instance_),
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::plan::_PlanNode_default_instance_),
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::plan::_RetrieveRequest_default_instance_),
reinterpret_cast<const ::PROTOBUF_NAMESPACE_ID::Message*>(&::milvus::proto::plan::_RetrieveResults_default_instance_),
};
const char descriptor_table_protodef_plan_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) =
@ -360,29 +419,36 @@ const char descriptor_table_protodef_plan_2eproto[] PROTOBUF_SECTION_VARIABLE(pr
"vus.proto.plan.QueryInfo\022\027\n\017placeholder_"
"tag\030\005 \001(\t\"H\n\010PlanNode\0224\n\013vector_anns\030\001 \001"
"(\0132\035.milvus.proto.plan.VectorANNSH\000B\006\n\004n"
"odeB3Z1github.com/milvus-io/milvus/inter"
"nal/proto/planpbb\006proto3"
"ode\"O\n\017RetrieveRequest\022%\n\003ids\030\001 \001(\0132\030.mi"
"lvus.proto.schema.IDs\022\025\n\routput_fields\030\002"
" \003(\t\"m\n\017RetrieveResults\022%\n\003ids\030\001 \001(\0132\030.m"
"ilvus.proto.schema.IDs\0223\n\013fields_data\030\002 "
"\003(\0132\036.milvus.proto.schema.FieldDataB3Z1g"
"ithub.com/milvus-io/milvus/internal/prot"
"o/planpbb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_plan_2eproto_deps[1] = {
&::descriptor_table_schema_2eproto,
};
static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_plan_2eproto_sccs[8] = {
static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_plan_2eproto_sccs[10] = {
&scc_info_BinaryExpr_plan_2eproto.base,
&scc_info_ColumnInfo_plan_2eproto.base,
&scc_info_GenericValue_plan_2eproto.base,
&scc_info_PlanNode_plan_2eproto.base,
&scc_info_QueryInfo_plan_2eproto.base,
&scc_info_RangeExpr_plan_2eproto.base,
&scc_info_RetrieveRequest_plan_2eproto.base,
&scc_info_RetrieveResults_plan_2eproto.base,
&scc_info_TermExpr_plan_2eproto.base,
&scc_info_VectorANNS_plan_2eproto.base,
};
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_plan_2eproto_once;
static bool descriptor_table_plan_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_plan_2eproto = {
&descriptor_table_plan_2eproto_initialized, descriptor_table_protodef_plan_2eproto, "plan.proto", 1544,
&descriptor_table_plan_2eproto_once, descriptor_table_plan_2eproto_sccs, descriptor_table_plan_2eproto_deps, 8, 1,
&descriptor_table_plan_2eproto_initialized, descriptor_table_protodef_plan_2eproto, "plan.proto", 1736,
&descriptor_table_plan_2eproto_once, descriptor_table_plan_2eproto_sccs, descriptor_table_plan_2eproto_deps, 10, 1,
schemas, file_default_instances, TableStruct_plan_2eproto::offsets,
file_level_metadata_plan_2eproto, 10, file_level_enum_descriptors_plan_2eproto, file_level_service_descriptors_plan_2eproto,
file_level_metadata_plan_2eproto, 12, file_level_enum_descriptors_plan_2eproto, file_level_service_descriptors_plan_2eproto,
};
// Force running AddDescriptors() at dynamic initialization time.
@ -4254,6 +4320,668 @@ void PlanNode::InternalSwap(PlanNode* other) {
}
// ===================================================================
void RetrieveRequest::InitAsDefaultInstance() {
::milvus::proto::plan::_RetrieveRequest_default_instance_._instance.get_mutable()->ids_ = const_cast< ::milvus::proto::schema::IDs*>(
::milvus::proto::schema::IDs::internal_default_instance());
}
class RetrieveRequest::_Internal {
public:
static const ::milvus::proto::schema::IDs& ids(const RetrieveRequest* msg);
};
const ::milvus::proto::schema::IDs&
RetrieveRequest::_Internal::ids(const RetrieveRequest* msg) {
return *msg->ids_;
}
void RetrieveRequest::clear_ids() {
if (GetArenaNoVirtual() == nullptr && ids_ != nullptr) {
delete ids_;
}
ids_ = nullptr;
}
RetrieveRequest::RetrieveRequest()
: ::PROTOBUF_NAMESPACE_ID::Message(), _internal_metadata_(nullptr) {
SharedCtor();
// @@protoc_insertion_point(constructor:milvus.proto.plan.RetrieveRequest)
}
RetrieveRequest::RetrieveRequest(const RetrieveRequest& from)
: ::PROTOBUF_NAMESPACE_ID::Message(),
_internal_metadata_(nullptr),
output_fields_(from.output_fields_) {
_internal_metadata_.MergeFrom(from._internal_metadata_);
if (from.has_ids()) {
ids_ = new ::milvus::proto::schema::IDs(*from.ids_);
} else {
ids_ = nullptr;
}
// @@protoc_insertion_point(copy_constructor:milvus.proto.plan.RetrieveRequest)
}
void RetrieveRequest::SharedCtor() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_RetrieveRequest_plan_2eproto.base);
ids_ = nullptr;
}
RetrieveRequest::~RetrieveRequest() {
// @@protoc_insertion_point(destructor:milvus.proto.plan.RetrieveRequest)
SharedDtor();
}
void RetrieveRequest::SharedDtor() {
if (this != internal_default_instance()) delete ids_;
}
void RetrieveRequest::SetCachedSize(int size) const {
_cached_size_.Set(size);
}
const RetrieveRequest& RetrieveRequest::default_instance() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&::scc_info_RetrieveRequest_plan_2eproto.base);
return *internal_default_instance();
}
void RetrieveRequest::Clear() {
// @@protoc_insertion_point(message_clear_start:milvus.proto.plan.RetrieveRequest)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
output_fields_.Clear();
if (GetArenaNoVirtual() == nullptr && ids_ != nullptr) {
delete ids_;
}
ids_ = nullptr;
_internal_metadata_.Clear();
}
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* RetrieveRequest::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) {
#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure
while (!ctx->Done(&ptr)) {
::PROTOBUF_NAMESPACE_ID::uint32 tag;
ptr = ::PROTOBUF_NAMESPACE_ID::internal::ReadTag(ptr, &tag);
CHK_(ptr);
switch (tag >> 3) {
// .milvus.proto.schema.IDs ids = 1;
case 1:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 10)) {
ptr = ctx->ParseMessage(mutable_ids(), ptr);
CHK_(ptr);
} else goto handle_unusual;
continue;
// repeated string output_fields = 2;
case 2:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 18)) {
ptr -= 1;
do {
ptr += 1;
ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParserUTF8(add_output_fields(), ptr, ctx, "milvus.proto.plan.RetrieveRequest.output_fields");
CHK_(ptr);
if (!ctx->DataAvailable(ptr)) break;
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 18);
} else goto handle_unusual;
continue;
default: {
handle_unusual:
if ((tag & 7) == 4 || tag == 0) {
ctx->SetLastTag(tag);
goto success;
}
ptr = UnknownFieldParse(tag, &_internal_metadata_, ptr, ctx);
CHK_(ptr != nullptr);
continue;
}
} // switch
} // while
success:
return ptr;
failure:
ptr = nullptr;
goto success;
#undef CHK_
}
#else // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
bool RetrieveRequest::MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) {
#define DO_(EXPRESSION) if (!PROTOBUF_PREDICT_TRUE(EXPRESSION)) goto failure
::PROTOBUF_NAMESPACE_ID::uint32 tag;
// @@protoc_insertion_point(parse_start:milvus.proto.plan.RetrieveRequest)
for (;;) {
::std::pair<::PROTOBUF_NAMESPACE_ID::uint32, bool> p = input->ReadTagWithCutoffNoLastTag(127u);
tag = p.first;
if (!p.second) goto handle_unusual;
switch (::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::GetTagFieldNumber(tag)) {
// .milvus.proto.schema.IDs ids = 1;
case 1: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (10 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage(
input, mutable_ids()));
} else {
goto handle_unusual;
}
break;
}
// repeated string output_fields = 2;
case 2: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (18 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadString(
input, this->add_output_fields()));
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->output_fields(this->output_fields_size() - 1).data(),
static_cast<int>(this->output_fields(this->output_fields_size() - 1).length()),
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::PARSE,
"milvus.proto.plan.RetrieveRequest.output_fields"));
} else {
goto handle_unusual;
}
break;
}
default: {
handle_unusual:
if (tag == 0) {
goto success;
}
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SkipField(
input, tag, _internal_metadata_.mutable_unknown_fields()));
break;
}
}
}
success:
// @@protoc_insertion_point(parse_success:milvus.proto.plan.RetrieveRequest)
return true;
failure:
// @@protoc_insertion_point(parse_failure:milvus.proto.plan.RetrieveRequest)
return false;
#undef DO_
}
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void RetrieveRequest::SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const {
// @@protoc_insertion_point(serialize_start:milvus.proto.plan.RetrieveRequest)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
// .milvus.proto.schema.IDs ids = 1;
if (this->has_ids()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray(
1, _Internal::ids(this), output);
}
// repeated string output_fields = 2;
for (int i = 0, n = this->output_fields_size(); i < n; i++) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->output_fields(i).data(), static_cast<int>(this->output_fields(i).length()),
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE,
"milvus.proto.plan.RetrieveRequest.output_fields");
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteString(
2, this->output_fields(i), output);
}
if (_internal_metadata_.have_unknown_fields()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields(
_internal_metadata_.unknown_fields(), output);
}
// @@protoc_insertion_point(serialize_end:milvus.proto.plan.RetrieveRequest)
}
::PROTOBUF_NAMESPACE_ID::uint8* RetrieveRequest::InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const {
// @@protoc_insertion_point(serialize_to_array_start:milvus.proto.plan.RetrieveRequest)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
// .milvus.proto.schema.IDs ids = 1;
if (this->has_ids()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::
InternalWriteMessageToArray(
1, _Internal::ids(this), target);
}
// repeated string output_fields = 2;
for (int i = 0, n = this->output_fields_size(); i < n; i++) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->output_fields(i).data(), static_cast<int>(this->output_fields(i).length()),
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE,
"milvus.proto.plan.RetrieveRequest.output_fields");
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::
WriteStringToArray(2, this->output_fields(i), target);
}
if (_internal_metadata_.have_unknown_fields()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray(
_internal_metadata_.unknown_fields(), target);
}
// @@protoc_insertion_point(serialize_to_array_end:milvus.proto.plan.RetrieveRequest)
return target;
}
size_t RetrieveRequest::ByteSizeLong() const {
// @@protoc_insertion_point(message_byte_size_start:milvus.proto.plan.RetrieveRequest)
size_t total_size = 0;
if (_internal_metadata_.have_unknown_fields()) {
total_size +=
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::ComputeUnknownFieldsSize(
_internal_metadata_.unknown_fields());
}
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
// repeated string output_fields = 2;
total_size += 1 *
::PROTOBUF_NAMESPACE_ID::internal::FromIntSize(this->output_fields_size());
for (int i = 0, n = this->output_fields_size(); i < n; i++) {
total_size += ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize(
this->output_fields(i));
}
// .milvus.proto.schema.IDs ids = 1;
if (this->has_ids()) {
total_size += 1 +
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(
*ids_);
}
int cached_size = ::PROTOBUF_NAMESPACE_ID::internal::ToCachedSize(total_size);
SetCachedSize(cached_size);
return total_size;
}
void RetrieveRequest::MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) {
// @@protoc_insertion_point(generalized_merge_from_start:milvus.proto.plan.RetrieveRequest)
GOOGLE_DCHECK_NE(&from, this);
const RetrieveRequest* source =
::PROTOBUF_NAMESPACE_ID::DynamicCastToGenerated<RetrieveRequest>(
&from);
if (source == nullptr) {
// @@protoc_insertion_point(generalized_merge_from_cast_fail:milvus.proto.plan.RetrieveRequest)
::PROTOBUF_NAMESPACE_ID::internal::ReflectionOps::Merge(from, this);
} else {
// @@protoc_insertion_point(generalized_merge_from_cast_success:milvus.proto.plan.RetrieveRequest)
MergeFrom(*source);
}
}
void RetrieveRequest::MergeFrom(const RetrieveRequest& from) {
// @@protoc_insertion_point(class_specific_merge_from_start:milvus.proto.plan.RetrieveRequest)
GOOGLE_DCHECK_NE(&from, this);
_internal_metadata_.MergeFrom(from._internal_metadata_);
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
output_fields_.MergeFrom(from.output_fields_);
if (from.has_ids()) {
mutable_ids()->::milvus::proto::schema::IDs::MergeFrom(from.ids());
}
}
void RetrieveRequest::CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) {
// @@protoc_insertion_point(generalized_copy_from_start:milvus.proto.plan.RetrieveRequest)
if (&from == this) return;
Clear();
MergeFrom(from);
}
void RetrieveRequest::CopyFrom(const RetrieveRequest& from) {
// @@protoc_insertion_point(class_specific_copy_from_start:milvus.proto.plan.RetrieveRequest)
if (&from == this) return;
Clear();
MergeFrom(from);
}
bool RetrieveRequest::IsInitialized() const {
return true;
}
void RetrieveRequest::InternalSwap(RetrieveRequest* other) {
using std::swap;
_internal_metadata_.Swap(&other->_internal_metadata_);
output_fields_.InternalSwap(CastToBase(&other->output_fields_));
swap(ids_, other->ids_);
}
::PROTOBUF_NAMESPACE_ID::Metadata RetrieveRequest::GetMetadata() const {
return GetMetadataStatic();
}
// ===================================================================
void RetrieveResults::InitAsDefaultInstance() {
::milvus::proto::plan::_RetrieveResults_default_instance_._instance.get_mutable()->ids_ = const_cast< ::milvus::proto::schema::IDs*>(
::milvus::proto::schema::IDs::internal_default_instance());
}
class RetrieveResults::_Internal {
public:
static const ::milvus::proto::schema::IDs& ids(const RetrieveResults* msg);
};
const ::milvus::proto::schema::IDs&
RetrieveResults::_Internal::ids(const RetrieveResults* msg) {
return *msg->ids_;
}
void RetrieveResults::clear_ids() {
if (GetArenaNoVirtual() == nullptr && ids_ != nullptr) {
delete ids_;
}
ids_ = nullptr;
}
void RetrieveResults::clear_fields_data() {
fields_data_.Clear();
}
RetrieveResults::RetrieveResults()
: ::PROTOBUF_NAMESPACE_ID::Message(), _internal_metadata_(nullptr) {
SharedCtor();
// @@protoc_insertion_point(constructor:milvus.proto.plan.RetrieveResults)
}
RetrieveResults::RetrieveResults(const RetrieveResults& from)
: ::PROTOBUF_NAMESPACE_ID::Message(),
_internal_metadata_(nullptr),
fields_data_(from.fields_data_) {
_internal_metadata_.MergeFrom(from._internal_metadata_);
if (from.has_ids()) {
ids_ = new ::milvus::proto::schema::IDs(*from.ids_);
} else {
ids_ = nullptr;
}
// @@protoc_insertion_point(copy_constructor:milvus.proto.plan.RetrieveResults)
}
void RetrieveResults::SharedCtor() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_RetrieveResults_plan_2eproto.base);
ids_ = nullptr;
}
RetrieveResults::~RetrieveResults() {
// @@protoc_insertion_point(destructor:milvus.proto.plan.RetrieveResults)
SharedDtor();
}
void RetrieveResults::SharedDtor() {
if (this != internal_default_instance()) delete ids_;
}
void RetrieveResults::SetCachedSize(int size) const {
_cached_size_.Set(size);
}
const RetrieveResults& RetrieveResults::default_instance() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&::scc_info_RetrieveResults_plan_2eproto.base);
return *internal_default_instance();
}
void RetrieveResults::Clear() {
// @@protoc_insertion_point(message_clear_start:milvus.proto.plan.RetrieveResults)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
fields_data_.Clear();
if (GetArenaNoVirtual() == nullptr && ids_ != nullptr) {
delete ids_;
}
ids_ = nullptr;
_internal_metadata_.Clear();
}
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* RetrieveResults::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) {
#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure
while (!ctx->Done(&ptr)) {
::PROTOBUF_NAMESPACE_ID::uint32 tag;
ptr = ::PROTOBUF_NAMESPACE_ID::internal::ReadTag(ptr, &tag);
CHK_(ptr);
switch (tag >> 3) {
// .milvus.proto.schema.IDs ids = 1;
case 1:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 10)) {
ptr = ctx->ParseMessage(mutable_ids(), ptr);
CHK_(ptr);
} else goto handle_unusual;
continue;
// repeated .milvus.proto.schema.FieldData fields_data = 2;
case 2:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 18)) {
ptr -= 1;
do {
ptr += 1;
ptr = ctx->ParseMessage(add_fields_data(), ptr);
CHK_(ptr);
if (!ctx->DataAvailable(ptr)) break;
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 18);
} else goto handle_unusual;
continue;
default: {
handle_unusual:
if ((tag & 7) == 4 || tag == 0) {
ctx->SetLastTag(tag);
goto success;
}
ptr = UnknownFieldParse(tag, &_internal_metadata_, ptr, ctx);
CHK_(ptr != nullptr);
continue;
}
} // switch
} // while
success:
return ptr;
failure:
ptr = nullptr;
goto success;
#undef CHK_
}
#else // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
bool RetrieveResults::MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) {
#define DO_(EXPRESSION) if (!PROTOBUF_PREDICT_TRUE(EXPRESSION)) goto failure
::PROTOBUF_NAMESPACE_ID::uint32 tag;
// @@protoc_insertion_point(parse_start:milvus.proto.plan.RetrieveResults)
for (;;) {
::std::pair<::PROTOBUF_NAMESPACE_ID::uint32, bool> p = input->ReadTagWithCutoffNoLastTag(127u);
tag = p.first;
if (!p.second) goto handle_unusual;
switch (::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::GetTagFieldNumber(tag)) {
// .milvus.proto.schema.IDs ids = 1;
case 1: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (10 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage(
input, mutable_ids()));
} else {
goto handle_unusual;
}
break;
}
// repeated .milvus.proto.schema.FieldData fields_data = 2;
case 2: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (18 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage(
input, add_fields_data()));
} else {
goto handle_unusual;
}
break;
}
default: {
handle_unusual:
if (tag == 0) {
goto success;
}
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SkipField(
input, tag, _internal_metadata_.mutable_unknown_fields()));
break;
}
}
}
success:
// @@protoc_insertion_point(parse_success:milvus.proto.plan.RetrieveResults)
return true;
failure:
// @@protoc_insertion_point(parse_failure:milvus.proto.plan.RetrieveResults)
return false;
#undef DO_
}
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void RetrieveResults::SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const {
// @@protoc_insertion_point(serialize_start:milvus.proto.plan.RetrieveResults)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
// .milvus.proto.schema.IDs ids = 1;
if (this->has_ids()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray(
1, _Internal::ids(this), output);
}
// repeated .milvus.proto.schema.FieldData fields_data = 2;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->fields_data_size()); i < n; i++) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray(
2,
this->fields_data(static_cast<int>(i)),
output);
}
if (_internal_metadata_.have_unknown_fields()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields(
_internal_metadata_.unknown_fields(), output);
}
// @@protoc_insertion_point(serialize_end:milvus.proto.plan.RetrieveResults)
}
::PROTOBUF_NAMESPACE_ID::uint8* RetrieveResults::InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const {
// @@protoc_insertion_point(serialize_to_array_start:milvus.proto.plan.RetrieveResults)
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
// .milvus.proto.schema.IDs ids = 1;
if (this->has_ids()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::
InternalWriteMessageToArray(
1, _Internal::ids(this), target);
}
// repeated .milvus.proto.schema.FieldData fields_data = 2;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->fields_data_size()); i < n; i++) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::
InternalWriteMessageToArray(
2, this->fields_data(static_cast<int>(i)), target);
}
if (_internal_metadata_.have_unknown_fields()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray(
_internal_metadata_.unknown_fields(), target);
}
// @@protoc_insertion_point(serialize_to_array_end:milvus.proto.plan.RetrieveResults)
return target;
}
size_t RetrieveResults::ByteSizeLong() const {
// @@protoc_insertion_point(message_byte_size_start:milvus.proto.plan.RetrieveResults)
size_t total_size = 0;
if (_internal_metadata_.have_unknown_fields()) {
total_size +=
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::ComputeUnknownFieldsSize(
_internal_metadata_.unknown_fields());
}
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
// repeated .milvus.proto.schema.FieldData fields_data = 2;
{
unsigned int count = static_cast<unsigned int>(this->fields_data_size());
total_size += 1UL * count;
for (unsigned int i = 0; i < count; i++) {
total_size +=
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(
this->fields_data(static_cast<int>(i)));
}
}
// .milvus.proto.schema.IDs ids = 1;
if (this->has_ids()) {
total_size += 1 +
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(
*ids_);
}
int cached_size = ::PROTOBUF_NAMESPACE_ID::internal::ToCachedSize(total_size);
SetCachedSize(cached_size);
return total_size;
}
void RetrieveResults::MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) {
// @@protoc_insertion_point(generalized_merge_from_start:milvus.proto.plan.RetrieveResults)
GOOGLE_DCHECK_NE(&from, this);
const RetrieveResults* source =
::PROTOBUF_NAMESPACE_ID::DynamicCastToGenerated<RetrieveResults>(
&from);
if (source == nullptr) {
// @@protoc_insertion_point(generalized_merge_from_cast_fail:milvus.proto.plan.RetrieveResults)
::PROTOBUF_NAMESPACE_ID::internal::ReflectionOps::Merge(from, this);
} else {
// @@protoc_insertion_point(generalized_merge_from_cast_success:milvus.proto.plan.RetrieveResults)
MergeFrom(*source);
}
}
void RetrieveResults::MergeFrom(const RetrieveResults& from) {
// @@protoc_insertion_point(class_specific_merge_from_start:milvus.proto.plan.RetrieveResults)
GOOGLE_DCHECK_NE(&from, this);
_internal_metadata_.MergeFrom(from._internal_metadata_);
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
(void) cached_has_bits;
fields_data_.MergeFrom(from.fields_data_);
if (from.has_ids()) {
mutable_ids()->::milvus::proto::schema::IDs::MergeFrom(from.ids());
}
}
void RetrieveResults::CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) {
// @@protoc_insertion_point(generalized_copy_from_start:milvus.proto.plan.RetrieveResults)
if (&from == this) return;
Clear();
MergeFrom(from);
}
void RetrieveResults::CopyFrom(const RetrieveResults& from) {
// @@protoc_insertion_point(class_specific_copy_from_start:milvus.proto.plan.RetrieveResults)
if (&from == this) return;
Clear();
MergeFrom(from);
}
bool RetrieveResults::IsInitialized() const {
return true;
}
void RetrieveResults::InternalSwap(RetrieveResults* other) {
using std::swap;
_internal_metadata_.Swap(&other->_internal_metadata_);
CastToBase(&fields_data_)->InternalSwap(CastToBase(&other->fields_data_));
swap(ids_, other->ids_);
}
::PROTOBUF_NAMESPACE_ID::Metadata RetrieveResults::GetMetadata() const {
return GetMetadataStatic();
}
// @@protoc_insertion_point(namespace_scope)
} // namespace plan
} // namespace proto
@ -4289,6 +5017,12 @@ template<> PROTOBUF_NOINLINE ::milvus::proto::plan::VectorANNS* Arena::CreateMay
template<> PROTOBUF_NOINLINE ::milvus::proto::plan::PlanNode* Arena::CreateMaybeMessage< ::milvus::proto::plan::PlanNode >(Arena* arena) {
return Arena::CreateInternal< ::milvus::proto::plan::PlanNode >(arena);
}
template<> PROTOBUF_NOINLINE ::milvus::proto::plan::RetrieveRequest* Arena::CreateMaybeMessage< ::milvus::proto::plan::RetrieveRequest >(Arena* arena) {
return Arena::CreateInternal< ::milvus::proto::plan::RetrieveRequest >(arena);
}
template<> PROTOBUF_NOINLINE ::milvus::proto::plan::RetrieveResults* Arena::CreateMaybeMessage< ::milvus::proto::plan::RetrieveResults >(Arena* arena) {
return Arena::CreateInternal< ::milvus::proto::plan::RetrieveResults >(arena);
}
PROTOBUF_NAMESPACE_CLOSE
// @@protoc_insertion_point(global_scope)

View File

@ -49,7 +49,7 @@ struct TableStruct_plan_2eproto {
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::AuxillaryParseTableField aux[]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[10]
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[12]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::FieldMetadata field_metadata[];
static const ::PROTOBUF_NAMESPACE_ID::internal::SerializationTable serialization_table[];
@ -80,6 +80,12 @@ extern QueryInfoDefaultTypeInternal _QueryInfo_default_instance_;
class RangeExpr;
class RangeExprDefaultTypeInternal;
extern RangeExprDefaultTypeInternal _RangeExpr_default_instance_;
class RetrieveRequest;
class RetrieveRequestDefaultTypeInternal;
extern RetrieveRequestDefaultTypeInternal _RetrieveRequest_default_instance_;
class RetrieveResults;
class RetrieveResultsDefaultTypeInternal;
extern RetrieveResultsDefaultTypeInternal _RetrieveResults_default_instance_;
class TermExpr;
class TermExprDefaultTypeInternal;
extern TermExprDefaultTypeInternal _TermExpr_default_instance_;
@ -100,6 +106,8 @@ template<> ::milvus::proto::plan::GenericValue* Arena::CreateMaybeMessage<::milv
template<> ::milvus::proto::plan::PlanNode* Arena::CreateMaybeMessage<::milvus::proto::plan::PlanNode>(Arena*);
template<> ::milvus::proto::plan::QueryInfo* Arena::CreateMaybeMessage<::milvus::proto::plan::QueryInfo>(Arena*);
template<> ::milvus::proto::plan::RangeExpr* Arena::CreateMaybeMessage<::milvus::proto::plan::RangeExpr>(Arena*);
template<> ::milvus::proto::plan::RetrieveRequest* Arena::CreateMaybeMessage<::milvus::proto::plan::RetrieveRequest>(Arena*);
template<> ::milvus::proto::plan::RetrieveResults* Arena::CreateMaybeMessage<::milvus::proto::plan::RetrieveResults>(Arena*);
template<> ::milvus::proto::plan::TermExpr* Arena::CreateMaybeMessage<::milvus::proto::plan::TermExpr>(Arena*);
template<> ::milvus::proto::plan::UnaryExpr* Arena::CreateMaybeMessage<::milvus::proto::plan::UnaryExpr>(Arena*);
template<> ::milvus::proto::plan::VectorANNS* Arena::CreateMaybeMessage<::milvus::proto::plan::VectorANNS>(Arena*);
@ -1865,6 +1873,306 @@ class PlanNode :
friend struct ::TableStruct_plan_2eproto;
};
// -------------------------------------------------------------------
class RetrieveRequest :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.plan.RetrieveRequest) */ {
public:
RetrieveRequest();
virtual ~RetrieveRequest();
RetrieveRequest(const RetrieveRequest& from);
RetrieveRequest(RetrieveRequest&& from) noexcept
: RetrieveRequest() {
*this = ::std::move(from);
}
inline RetrieveRequest& operator=(const RetrieveRequest& from) {
CopyFrom(from);
return *this;
}
inline RetrieveRequest& operator=(RetrieveRequest&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const RetrieveRequest& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const RetrieveRequest* internal_default_instance() {
return reinterpret_cast<const RetrieveRequest*>(
&_RetrieveRequest_default_instance_);
}
static constexpr int kIndexInFileMessages =
10;
friend void swap(RetrieveRequest& a, RetrieveRequest& b) {
a.Swap(&b);
}
inline void Swap(RetrieveRequest* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline RetrieveRequest* New() const final {
return CreateMaybeMessage<RetrieveRequest>(nullptr);
}
RetrieveRequest* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<RetrieveRequest>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const RetrieveRequest& from);
void MergeFrom(const RetrieveRequest& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
#else
bool MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final;
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final;
::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(RetrieveRequest* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "milvus.proto.plan.RetrieveRequest";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_plan_2eproto);
return ::descriptor_table_plan_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kOutputFieldsFieldNumber = 2,
kIdsFieldNumber = 1,
};
// repeated string output_fields = 2;
int output_fields_size() const;
void clear_output_fields();
const std::string& output_fields(int index) const;
std::string* mutable_output_fields(int index);
void set_output_fields(int index, const std::string& value);
void set_output_fields(int index, std::string&& value);
void set_output_fields(int index, const char* value);
void set_output_fields(int index, const char* value, size_t size);
std::string* add_output_fields();
void add_output_fields(const std::string& value);
void add_output_fields(std::string&& value);
void add_output_fields(const char* value);
void add_output_fields(const char* value, size_t size);
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>& output_fields() const;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>* mutable_output_fields();
// .milvus.proto.schema.IDs ids = 1;
bool has_ids() const;
void clear_ids();
const ::milvus::proto::schema::IDs& ids() const;
::milvus::proto::schema::IDs* release_ids();
::milvus::proto::schema::IDs* mutable_ids();
void set_allocated_ids(::milvus::proto::schema::IDs* ids);
// @@protoc_insertion_point(class_scope:milvus.proto.plan.RetrieveRequest)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string> output_fields_;
::milvus::proto::schema::IDs* ids_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_plan_2eproto;
};
// -------------------------------------------------------------------
class RetrieveResults :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.plan.RetrieveResults) */ {
public:
RetrieveResults();
virtual ~RetrieveResults();
RetrieveResults(const RetrieveResults& from);
RetrieveResults(RetrieveResults&& from) noexcept
: RetrieveResults() {
*this = ::std::move(from);
}
inline RetrieveResults& operator=(const RetrieveResults& from) {
CopyFrom(from);
return *this;
}
inline RetrieveResults& operator=(RetrieveResults&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const RetrieveResults& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const RetrieveResults* internal_default_instance() {
return reinterpret_cast<const RetrieveResults*>(
&_RetrieveResults_default_instance_);
}
static constexpr int kIndexInFileMessages =
11;
friend void swap(RetrieveResults& a, RetrieveResults& b) {
a.Swap(&b);
}
inline void Swap(RetrieveResults* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline RetrieveResults* New() const final {
return CreateMaybeMessage<RetrieveResults>(nullptr);
}
RetrieveResults* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<RetrieveResults>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const RetrieveResults& from);
void MergeFrom(const RetrieveResults& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
#else
bool MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final;
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final;
::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(RetrieveResults* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "milvus.proto.plan.RetrieveResults";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_plan_2eproto);
return ::descriptor_table_plan_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kFieldsDataFieldNumber = 2,
kIdsFieldNumber = 1,
};
// repeated .milvus.proto.schema.FieldData fields_data = 2;
int fields_data_size() const;
void clear_fields_data();
::milvus::proto::schema::FieldData* mutable_fields_data(int index);
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::schema::FieldData >*
mutable_fields_data();
const ::milvus::proto::schema::FieldData& fields_data(int index) const;
::milvus::proto::schema::FieldData* add_fields_data();
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::schema::FieldData >&
fields_data() const;
// .milvus.proto.schema.IDs ids = 1;
bool has_ids() const;
void clear_ids();
const ::milvus::proto::schema::IDs& ids() const;
::milvus::proto::schema::IDs* release_ids();
::milvus::proto::schema::IDs* mutable_ids();
void set_allocated_ids(::milvus::proto::schema::IDs* ids);
// @@protoc_insertion_point(class_scope:milvus.proto.plan.RetrieveResults)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::schema::FieldData > fields_data_;
::milvus::proto::schema::IDs* ids_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_plan_2eproto;
};
// ===================================================================
@ -2929,6 +3237,196 @@ inline void PlanNode::clear_has_node() {
inline PlanNode::NodeCase PlanNode::node_case() const {
return PlanNode::NodeCase(_oneof_case_[0]);
}
// -------------------------------------------------------------------
// RetrieveRequest
// .milvus.proto.schema.IDs ids = 1;
inline bool RetrieveRequest::has_ids() const {
return this != internal_default_instance() && ids_ != nullptr;
}
inline const ::milvus::proto::schema::IDs& RetrieveRequest::ids() const {
const ::milvus::proto::schema::IDs* p = ids_;
// @@protoc_insertion_point(field_get:milvus.proto.plan.RetrieveRequest.ids)
return p != nullptr ? *p : *reinterpret_cast<const ::milvus::proto::schema::IDs*>(
&::milvus::proto::schema::_IDs_default_instance_);
}
inline ::milvus::proto::schema::IDs* RetrieveRequest::release_ids() {
// @@protoc_insertion_point(field_release:milvus.proto.plan.RetrieveRequest.ids)
::milvus::proto::schema::IDs* temp = ids_;
ids_ = nullptr;
return temp;
}
inline ::milvus::proto::schema::IDs* RetrieveRequest::mutable_ids() {
if (ids_ == nullptr) {
auto* p = CreateMaybeMessage<::milvus::proto::schema::IDs>(GetArenaNoVirtual());
ids_ = p;
}
// @@protoc_insertion_point(field_mutable:milvus.proto.plan.RetrieveRequest.ids)
return ids_;
}
inline void RetrieveRequest::set_allocated_ids(::milvus::proto::schema::IDs* ids) {
::PROTOBUF_NAMESPACE_ID::Arena* message_arena = GetArenaNoVirtual();
if (message_arena == nullptr) {
delete reinterpret_cast< ::PROTOBUF_NAMESPACE_ID::MessageLite*>(ids_);
}
if (ids) {
::PROTOBUF_NAMESPACE_ID::Arena* submessage_arena = nullptr;
if (message_arena != submessage_arena) {
ids = ::PROTOBUF_NAMESPACE_ID::internal::GetOwnedMessage(
message_arena, ids, submessage_arena);
}
} else {
}
ids_ = ids;
// @@protoc_insertion_point(field_set_allocated:milvus.proto.plan.RetrieveRequest.ids)
}
// repeated string output_fields = 2;
inline int RetrieveRequest::output_fields_size() const {
return output_fields_.size();
}
inline void RetrieveRequest::clear_output_fields() {
output_fields_.Clear();
}
inline const std::string& RetrieveRequest::output_fields(int index) const {
// @@protoc_insertion_point(field_get:milvus.proto.plan.RetrieveRequest.output_fields)
return output_fields_.Get(index);
}
inline std::string* RetrieveRequest::mutable_output_fields(int index) {
// @@protoc_insertion_point(field_mutable:milvus.proto.plan.RetrieveRequest.output_fields)
return output_fields_.Mutable(index);
}
inline void RetrieveRequest::set_output_fields(int index, const std::string& value) {
// @@protoc_insertion_point(field_set:milvus.proto.plan.RetrieveRequest.output_fields)
output_fields_.Mutable(index)->assign(value);
}
inline void RetrieveRequest::set_output_fields(int index, std::string&& value) {
// @@protoc_insertion_point(field_set:milvus.proto.plan.RetrieveRequest.output_fields)
output_fields_.Mutable(index)->assign(std::move(value));
}
inline void RetrieveRequest::set_output_fields(int index, const char* value) {
GOOGLE_DCHECK(value != nullptr);
output_fields_.Mutable(index)->assign(value);
// @@protoc_insertion_point(field_set_char:milvus.proto.plan.RetrieveRequest.output_fields)
}
inline void RetrieveRequest::set_output_fields(int index, const char* value, size_t size) {
output_fields_.Mutable(index)->assign(
reinterpret_cast<const char*>(value), size);
// @@protoc_insertion_point(field_set_pointer:milvus.proto.plan.RetrieveRequest.output_fields)
}
inline std::string* RetrieveRequest::add_output_fields() {
// @@protoc_insertion_point(field_add_mutable:milvus.proto.plan.RetrieveRequest.output_fields)
return output_fields_.Add();
}
inline void RetrieveRequest::add_output_fields(const std::string& value) {
output_fields_.Add()->assign(value);
// @@protoc_insertion_point(field_add:milvus.proto.plan.RetrieveRequest.output_fields)
}
inline void RetrieveRequest::add_output_fields(std::string&& value) {
output_fields_.Add(std::move(value));
// @@protoc_insertion_point(field_add:milvus.proto.plan.RetrieveRequest.output_fields)
}
inline void RetrieveRequest::add_output_fields(const char* value) {
GOOGLE_DCHECK(value != nullptr);
output_fields_.Add()->assign(value);
// @@protoc_insertion_point(field_add_char:milvus.proto.plan.RetrieveRequest.output_fields)
}
inline void RetrieveRequest::add_output_fields(const char* value, size_t size) {
output_fields_.Add()->assign(reinterpret_cast<const char*>(value), size);
// @@protoc_insertion_point(field_add_pointer:milvus.proto.plan.RetrieveRequest.output_fields)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>&
RetrieveRequest::output_fields() const {
// @@protoc_insertion_point(field_list:milvus.proto.plan.RetrieveRequest.output_fields)
return output_fields_;
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>*
RetrieveRequest::mutable_output_fields() {
// @@protoc_insertion_point(field_mutable_list:milvus.proto.plan.RetrieveRequest.output_fields)
return &output_fields_;
}
// -------------------------------------------------------------------
// RetrieveResults
// .milvus.proto.schema.IDs ids = 1;
inline bool RetrieveResults::has_ids() const {
return this != internal_default_instance() && ids_ != nullptr;
}
inline const ::milvus::proto::schema::IDs& RetrieveResults::ids() const {
const ::milvus::proto::schema::IDs* p = ids_;
// @@protoc_insertion_point(field_get:milvus.proto.plan.RetrieveResults.ids)
return p != nullptr ? *p : *reinterpret_cast<const ::milvus::proto::schema::IDs*>(
&::milvus::proto::schema::_IDs_default_instance_);
}
inline ::milvus::proto::schema::IDs* RetrieveResults::release_ids() {
// @@protoc_insertion_point(field_release:milvus.proto.plan.RetrieveResults.ids)
::milvus::proto::schema::IDs* temp = ids_;
ids_ = nullptr;
return temp;
}
inline ::milvus::proto::schema::IDs* RetrieveResults::mutable_ids() {
if (ids_ == nullptr) {
auto* p = CreateMaybeMessage<::milvus::proto::schema::IDs>(GetArenaNoVirtual());
ids_ = p;
}
// @@protoc_insertion_point(field_mutable:milvus.proto.plan.RetrieveResults.ids)
return ids_;
}
inline void RetrieveResults::set_allocated_ids(::milvus::proto::schema::IDs* ids) {
::PROTOBUF_NAMESPACE_ID::Arena* message_arena = GetArenaNoVirtual();
if (message_arena == nullptr) {
delete reinterpret_cast< ::PROTOBUF_NAMESPACE_ID::MessageLite*>(ids_);
}
if (ids) {
::PROTOBUF_NAMESPACE_ID::Arena* submessage_arena = nullptr;
if (message_arena != submessage_arena) {
ids = ::PROTOBUF_NAMESPACE_ID::internal::GetOwnedMessage(
message_arena, ids, submessage_arena);
}
} else {
}
ids_ = ids;
// @@protoc_insertion_point(field_set_allocated:milvus.proto.plan.RetrieveResults.ids)
}
// repeated .milvus.proto.schema.FieldData fields_data = 2;
inline int RetrieveResults::fields_data_size() const {
return fields_data_.size();
}
inline ::milvus::proto::schema::FieldData* RetrieveResults::mutable_fields_data(int index) {
// @@protoc_insertion_point(field_mutable:milvus.proto.plan.RetrieveResults.fields_data)
return fields_data_.Mutable(index);
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::schema::FieldData >*
RetrieveResults::mutable_fields_data() {
// @@protoc_insertion_point(field_mutable_list:milvus.proto.plan.RetrieveResults.fields_data)
return &fields_data_;
}
inline const ::milvus::proto::schema::FieldData& RetrieveResults::fields_data(int index) const {
// @@protoc_insertion_point(field_get:milvus.proto.plan.RetrieveResults.fields_data)
return fields_data_.Get(index);
}
inline ::milvus::proto::schema::FieldData* RetrieveResults::add_fields_data() {
// @@protoc_insertion_point(field_add:milvus.proto.plan.RetrieveResults.fields_data)
return fields_data_.Add();
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::schema::FieldData >&
RetrieveResults::fields_data() const {
// @@protoc_insertion_point(field_list:milvus.proto.plan.RetrieveResults.fields_data)
return fields_data_;
}
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif // __GNUC__
@ -2950,6 +3448,10 @@ inline PlanNode::NodeCase PlanNode::node_case() const {
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// @@protoc_insertion_point(namespace_scope)

View File

@ -479,6 +479,16 @@ GetNumOfQueries(const PlaceholderGroup* group) {
return group->at(0).num_of_queries_;
}
[[maybe_unused]] std::unique_ptr<RetrievePlan>
CreateRetrievePlan(const Schema& schema, proto::plan::RetrieveRequest&& request) {
auto plan = std::make_unique<RetrievePlan>();
plan->ids_ = std::unique_ptr<proto::schema::IDs>(request.release_ids());
for (auto& field_name : request.output_fields()) {
plan->field_offsets_.push_back(schema.get_offset(FieldName(field_name)));
}
return plan;
}
void
Plan::check_identical(Plan& other) {
Assert(&schema_ == &other.schema_);

View File

@ -14,6 +14,7 @@
#include <string_view>
#include <string>
#include "common/Schema.h"
#include "pb/plan.pb.h"
namespace milvus::query {
// NOTE: APIs for C wrapper
@ -21,6 +22,7 @@ namespace milvus::query {
// Incomplete Definition, shouldn't be instantiated
struct Plan;
struct PlaceholderGroup;
struct RetrievePlan;
std::unique_ptr<Plan>
CreatePlan(const Schema& schema, const std::string& dsl);
@ -35,6 +37,9 @@ ParsePlaceholderGroup(const Plan* plan, const std::string& placeholder_group_blo
int64_t
GetNumOfQueries(const PlaceholderGroup*);
std::unique_ptr<RetrievePlan>
CreateRetrievePlan(const Schema& schema, proto::plan::RetrieveRequest&& request);
// Query Overall TopK from Plan
// Used to alloc result memory at Go side
int64_t

View File

@ -80,6 +80,11 @@ struct Placeholder {
}
};
struct RetrievePlan {
std::unique_ptr<proto::schema::IDs> ids_;
std::vector<FieldOffset> field_offsets_;
};
using PlanPtr = std::unique_ptr<Plan>;
struct PlaceholderGroup : std::vector<Placeholder> {

View File

@ -449,4 +449,30 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
do_insert(reserved_offset, size, row_ids.data(), timestamps.data(), columns_data);
}
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
SegmentGrowingImpl::search_ids(const IdArray& id_array, Timestamp timestamp) const {
Assert(id_array.has_int_id());
auto& src_int_arr = id_array.int_id();
auto res_id_arr = std::make_unique<IdArray>();
auto res_int_id_arr = res_id_arr->mutable_int_id();
std::vector<SegOffset> res_offsets;
for (auto uid : src_int_arr.data()) {
auto [iter_b, iter_e] = uid2offset_.equal_range(uid);
SegOffset the_offset(-1);
for (auto iter = iter_b; iter != iter_e; ++iter) {
auto offset = SegOffset(iter->second);
if (record_.timestamps_[offset.get()] < timestamp) {
the_offset = std::max(the_offset, offset);
}
}
// if not found, skip
if (the_offset == SegOffset(-1)) {
continue;
}
res_int_id_arr->add_data(uid);
res_offsets.push_back(the_offset);
}
return {std::move(res_id_arr), std::move(res_offsets)};
}
} // namespace milvus::segcore

View File

@ -178,6 +178,9 @@ class SegmentGrowingImpl : public SegmentGrowing {
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false);
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;
protected:
int64_t
num_chunk() const override;

View File

@ -190,11 +190,13 @@ SegmentInternalInterface::BulkSubScript(FieldOffset field_offset, const SegOffse
}
}
std::unique_ptr<proto::milvus::RetrieveResults>
SegmentInternalInterface::GetEntityById(const std::vector<FieldOffset>& field_offsets, const IdArray& id_array) const {
auto results = std::make_unique<proto::milvus::RetrieveResults>();
std::unique_ptr<proto::plan::RetrieveResults>
SegmentInternalInterface::GetEntityById(const std::vector<FieldOffset>& field_offsets,
const IdArray& id_array,
Timestamp timestamp) const {
auto results = std::make_unique<proto::plan::RetrieveResults>();
auto [ids_, seg_offsets] = search_ids(id_array);
auto [ids_, seg_offsets] = search_ids(id_array, timestamp);
results->set_allocated_ids(ids_.release());
auto fields_data = results->mutable_fields_data();
@ -204,11 +206,4 @@ SegmentInternalInterface::GetEntityById(const std::vector<FieldOffset>& field_of
}
return results;
}
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
SegmentInternalInterface::search_ids(const IdArray& id_array) const {
// TODO: protobuf in a nutshell
PanicInfo("unimplemented");
}
} // namespace milvus::segcore

View File

@ -39,8 +39,10 @@ class SegmentInterface {
const Timestamp timestamps[],
int64_t num_groups) const = 0;
virtual std::unique_ptr<proto::milvus::RetrieveResults>
GetEntityById(const std::vector<FieldOffset>& field_offsets, const IdArray& id_array) const = 0;
virtual std::unique_ptr<proto::plan::RetrieveResults>
GetEntityById(const std::vector<FieldOffset>& field_offsets,
const IdArray& id_array,
Timestamp timestamp) const = 0;
virtual int64_t
GetMemoryUsageInBytes() const = 0;
@ -86,8 +88,10 @@ class SegmentInternalInterface : public SegmentInterface {
void
FillTargetEntry(const query::Plan* plan, QueryResult& results) const override;
std::unique_ptr<proto::milvus::RetrieveResults>
GetEntityById(const std::vector<FieldOffset>& field_offsets, const IdArray& id_array) const override;
std::unique_ptr<proto::plan::RetrieveResults>
GetEntityById(const std::vector<FieldOffset>& field_offsets,
const IdArray& id_array,
Timestamp timestamp) const override;
public:
virtual void
@ -134,7 +138,7 @@ class SegmentInternalInterface : public SegmentInterface {
BulkSubScript(FieldOffset field_offset, const SegOffset* seg_offsets, int64_t count) const;
virtual std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array) const;
search_ids(const IdArray& id_array, Timestamp timestamp) const = 0;
virtual void
check_search(const query::Plan* plan) const = 0;

View File

@ -403,42 +403,13 @@ SegmentSealedImpl::HasFieldData(FieldId field_id) const {
}
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
SegmentSealedImpl::search_ids(const IdArray& id_array) const {
SegmentSealedImpl::search_ids(const IdArray& id_array, Timestamp timestamp) const {
AssertInfo(id_array.has_int_id(), "string ids are not implemented");
auto arr = id_array.int_id();
Assert(primary_key_index_);
return primary_key_index_->do_search_ids(id_array);
}
// void
// SegmentSealedImpl::build_index_if_primary_key(FieldId field_id) {
// auto create_index = [](const int64_t* data, int64_t size) {
// Assert(size);
// auto pk_index = std::make_unique<ScalarIndexVector>();
// pk_index->append_data(data, size, SegOffset(0));
// pk_index->build();
// return pk_index;
// };
//
// if (SystemProperty::Instance().IsSystem(field_id)) {
// Assert(SystemProperty::Instance().GetSystemFieldType(field_id) == SystemFieldType::RowId);
// Assert(schema_->get_is_auto_id());
// Assert(row_count_opt_.has_value());
// auto row_count = row_count_opt_.value();
// Assert(row_count == row_ids_.size());
// primary_key_index_ = create_index(row_ids_.data(), row_count);
//
// } else if (this->schema_->get_primary_key_offset() == schema_->get_offset(field_id)) {
// auto pk_offset = schema_->get_offset(field_id);
// auto& field_data = field_datas_[pk_offset.get()];
// Assert(row_count_opt_.has_value());
// auto row_count = row_count_opt_.value();
// Assert(field_data.size() == row_count * sizeof(int64_t));
//
// primary_key_index_ = create_index((const int64_t*)field_data.data(), row_count);
// }
// }
SegmentSealedPtr
CreateSealedSegment(SchemaPtr schema) {
return std::make_unique<SegmentSealedImpl>(schema);

View File

@ -109,8 +109,8 @@ class SegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 1;
}
virtual std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array) const;
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;
// virtual void
// build_index_if_primary_key(FieldId field_id);

View File

@ -130,3 +130,30 @@ DeletePlaceholderGroup(CPlaceholderGroup cPlaceholder_group) {
delete placeHolder_group;
// std::cout << "delete placeholder" << std::endl;
}
CStatus
CreateRetrievePlan(CCollection c_col, CProto retrieve_request, CRetrievePlan* output) {
auto col = (milvus::segcore::Collection*)c_col;
try {
milvus::proto::plan::RetrieveRequest request;
request.ParseFromArray(retrieve_request.proto_blob, retrieve_request.proto_size);
auto plan = milvus::query::CreateRetrievePlan(*col->get_schema(), std::move(request));
*output = plan.release();
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
}
}
void
DeleteRetrievePlan(CRetrievePlan c_plan) {
auto plan = (milvus::query::RetrievePlan*)c_plan;
delete plan;
}

View File

@ -20,6 +20,7 @@ extern "C" {
typedef void* CPlan;
typedef void* CPlaceholderGroup;
typedef void* CRetrievePlan;
CStatus
CreatePlan(CCollection col, const char* dsl, CPlan* res_plan);
@ -49,6 +50,12 @@ DeletePlan(CPlan plan);
void
DeletePlaceholderGroup(CPlaceholderGroup placeholder_group);
CStatus
CreateRetrievePlan(CCollection c_col, CProto retrieve_request, CRetrievePlan* output);
void
DeleteRetrievePlan(CRetrievePlan plan);
#ifdef __cplusplus
}
#endif

View File

@ -21,6 +21,7 @@
#include <knowhere/index/vector_index/VecIndex.h>
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include "common/Types.h"
#include "common/CGoHelper.h"
////////////////////////////// common interfaces //////////////////////////////
CSegmentInterface
@ -69,7 +70,6 @@ Search(CSegmentInterface c_segment,
uint64_t* timestamps,
int num_groups,
CQueryResult* result) {
auto status = CStatus();
auto query_result = std::make_unique<milvus::QueryResult>();
try {
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
@ -85,19 +85,10 @@ Search(CSegmentInterface c_segment,
}
}
*result = query_result.release();
status.error_code = Success;
status.error_msg = "";
return milvus::SuccessCStatus();
} catch (std::exception& e) {
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return milvus::FailureCStatus(UnexpectedError, e.what());
}
// result_ids and result_distances have been allocated memory in goLang,
// so we don't need to malloc here.
// memcpy(result_ids, query_result.result_ids_.data(), query_result.get_row_count() * sizeof(long int));
// memcpy(result_distances, query_result.result_distances_.data(), query_result.get_row_count() * sizeof(float));
return status;
}
CStatus
@ -106,16 +97,12 @@ FillTargetEntry(CSegmentInterface c_segment, CPlan c_plan, CQueryResult c_result
auto plan = (milvus::query::Plan*)c_plan;
auto result = (milvus::QueryResult*)c_result;
auto status = CStatus();
try {
segment->FillTargetEntry(plan, *result);
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(UnexpectedError, e.what());
}
return status;
}
int64_t
@ -157,18 +144,10 @@ Insert(CSegmentInterface c_segment,
dataChunk.raw_data = raw_data;
dataChunk.sizeof_per_row = sizeof_per_row;
dataChunk.count = count;
auto res = segment->Insert(reserved_offset, size, row_ids, timestamps, dataChunk);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
segment->Insert(reserved_offset, size, row_ids, timestamps, dataChunk);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
@ -177,15 +156,9 @@ PreInsert(CSegmentInterface c_segment, int64_t size, int64_t* offset) {
try {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
*offset = segment->PreInsert(size);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
return milvus::SuccessCStatus();
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
@ -199,16 +172,9 @@ Delete(CSegmentInterface c_segment,
try {
auto res = segment->Delete(reserved_offset, size, row_ids, timestamps);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
return milvus::SuccessCStatus();
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
@ -229,34 +195,23 @@ LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_in
auto load_info =
LoadFieldDataInfo{load_field_data_info.field_id, load_field_data_info.blob, load_field_data_info.row_count};
segment->LoadFieldData(load_info);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
return milvus::SuccessCStatus();
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
CStatus
UpdateSealedSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info) {
auto status = CStatus();
try {
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
auto load_index_info = (LoadIndexInfo*)c_load_index_info;
segment->LoadIndex(*load_index_info);
status.error_code = Success;
status.error_msg = "";
return status;
return milvus::SuccessCStatus();
} catch (std::exception& e) {
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
@ -267,33 +222,22 @@ DropFieldData(CSegmentInterface c_segment, int64_t field_id) {
auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
segment->DropFieldData(milvus::FieldId(field_id));
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
return milvus::SuccessCStatus();
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
CStatus
DropSealedSegmentIndex(CSegmentInterface c_segment, int64_t field_id) {
auto status = CStatus();
try {
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
segment->DropIndex(milvus::FieldId(field_id));
status.error_code = Success;
status.error_msg = "";
return status;
return milvus::SuccessCStatus();
} catch (std::exception& e) {
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
@ -306,14 +250,10 @@ UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info
auto segment = dynamic_cast<milvus::segcore::SegmentGrowing*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
auto load_index_info = (LoadIndexInfo*)c_load_index_info;
auto res = segment->LoadIndexing(*load_index_info);
status.error_code = res.code();
status.error_msg = "";
return status;
segment->LoadIndexing(*load_index_info);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
@ -335,3 +275,15 @@ IsOpened(CSegmentInterface c_segment) {
auto status = segment->get_state();
return status == milvus::segcore::SegmentGrowing::SegmentState::Open;
}
CProtoResult
GetEntityByIds(CSegmentInterface c_segment, CRetrievePlan c_plan, uint64_t timestamp) {
try {
auto segment = (const milvus::segcore::SegmentInterface*)c_segment;
auto plan = (const milvus::query::RetrievePlan*)c_plan;
auto result = segment->GetEntityById(plan->field_offsets_, *plan->ids_, timestamp);
return milvus::AllocCProtoResult(*result);
} catch (std::exception& e) {
return CProtoResult{milvus::FailureCStatus(UnexpectedError, e.what())};
}
}

View File

@ -44,6 +44,9 @@ Search(CSegmentInterface c_segment,
int num_groups,
CQueryResult* result);
CProtoResult
GetEntityByIds(CSegmentInterface c_segment, CRetrievePlan plan, uint64_t timestamp);
CStatus
FillTargetEntry(CSegmentInterface c_segment, CPlan c_plan, CQueryResult result);

View File

@ -68,7 +68,7 @@ TEST(GetEntityByIds, AUTOID) {
req_ids_arr->add_data(-1);
std::vector<FieldOffset> target_offsets{FieldOffset(0), FieldOffset(1)};
auto retrieve_results = segment->GetEntityById(target_offsets, *req_ids);
auto retrieve_results = segment->GetEntityById(target_offsets, *req_ids, 0);
auto ids = retrieve_results->ids().int_id();
Assert(retrieve_results->fields_data_size() == target_offsets.size());
FieldOffset field_offset(0);
@ -126,7 +126,7 @@ TEST(GetEntityByIds, PrimaryKey) {
req_ids_arr->add_data(-1);
std::vector<FieldOffset> target_offsets{FieldOffset(0), FieldOffset(1)};
auto retrieve_results = segment->GetEntityById(target_offsets, *req_ids);
auto retrieve_results = segment->GetEntityById(target_offsets, *req_ids, 0);
auto ids = retrieve_results->ids().int_id();
Assert(retrieve_results->fields_data_size() == target_offsets.size());
FieldOffset field_offset(0);

View File

@ -214,6 +214,7 @@ func (index *CIndexQuery) QueryOnBinaryVecIndex(vectors []byte) (QueryResult, er
if err != nil {
return nil, err
}
return res, nil
}

5
internal/proto/cgo.proto Normal file
View File

@ -0,0 +1,5 @@
syntax = "proto3";
package milvus.proto.plan;
option go_package = "github.com/milvus-io/milvus/internal/proto/cgo";
import "schema.proto";

View File

@ -85,4 +85,14 @@ message PlanNode {
oneof node {
VectorANNS vector_anns = 1;
}
}
}
message RetrieveRequest {
schema.IDs ids = 1;
repeated string output_fields = 2;
}
message RetrieveResults {
schema.IDs ids = 1;
repeated schema.FieldData fields_data = 2;
}

View File

@ -761,6 +761,100 @@ func (*PlanNode) XXX_OneofWrappers() []interface{} {
}
}
type RetrieveRequest struct {
Ids *schemapb.IDs `protobuf:"bytes,1,opt,name=ids,proto3" json:"ids,omitempty"`
OutputFields []string `protobuf:"bytes,2,rep,name=output_fields,json=outputFields,proto3" json:"output_fields,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *RetrieveRequest) Reset() { *m = RetrieveRequest{} }
func (m *RetrieveRequest) String() string { return proto.CompactTextString(m) }
func (*RetrieveRequest) ProtoMessage() {}
func (*RetrieveRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_2d655ab2f7683c23, []int{10}
}
func (m *RetrieveRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RetrieveRequest.Unmarshal(m, b)
}
func (m *RetrieveRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_RetrieveRequest.Marshal(b, m, deterministic)
}
func (m *RetrieveRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_RetrieveRequest.Merge(m, src)
}
func (m *RetrieveRequest) XXX_Size() int {
return xxx_messageInfo_RetrieveRequest.Size(m)
}
func (m *RetrieveRequest) XXX_DiscardUnknown() {
xxx_messageInfo_RetrieveRequest.DiscardUnknown(m)
}
var xxx_messageInfo_RetrieveRequest proto.InternalMessageInfo
func (m *RetrieveRequest) GetIds() *schemapb.IDs {
if m != nil {
return m.Ids
}
return nil
}
func (m *RetrieveRequest) GetOutputFields() []string {
if m != nil {
return m.OutputFields
}
return nil
}
type RetrieveResults struct {
Ids *schemapb.IDs `protobuf:"bytes,1,opt,name=ids,proto3" json:"ids,omitempty"`
FieldsData []*schemapb.FieldData `protobuf:"bytes,2,rep,name=fields_data,json=fieldsData,proto3" json:"fields_data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *RetrieveResults) Reset() { *m = RetrieveResults{} }
func (m *RetrieveResults) String() string { return proto.CompactTextString(m) }
func (*RetrieveResults) ProtoMessage() {}
func (*RetrieveResults) Descriptor() ([]byte, []int) {
return fileDescriptor_2d655ab2f7683c23, []int{11}
}
func (m *RetrieveResults) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RetrieveResults.Unmarshal(m, b)
}
func (m *RetrieveResults) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_RetrieveResults.Marshal(b, m, deterministic)
}
func (m *RetrieveResults) XXX_Merge(src proto.Message) {
xxx_messageInfo_RetrieveResults.Merge(m, src)
}
func (m *RetrieveResults) XXX_Size() int {
return xxx_messageInfo_RetrieveResults.Size(m)
}
func (m *RetrieveResults) XXX_DiscardUnknown() {
xxx_messageInfo_RetrieveResults.DiscardUnknown(m)
}
var xxx_messageInfo_RetrieveResults proto.InternalMessageInfo
func (m *RetrieveResults) GetIds() *schemapb.IDs {
if m != nil {
return m.Ids
}
return nil
}
func (m *RetrieveResults) GetFieldsData() []*schemapb.FieldData {
if m != nil {
return m.FieldsData
}
return nil
}
func init() {
proto.RegisterEnum("milvus.proto.plan.RangeExpr_OpType", RangeExpr_OpType_name, RangeExpr_OpType_value)
proto.RegisterEnum("milvus.proto.plan.UnaryExpr_UnaryOp", UnaryExpr_UnaryOp_name, UnaryExpr_UnaryOp_value)
@ -775,62 +869,69 @@ func init() {
proto.RegisterType((*Expr)(nil), "milvus.proto.plan.Expr")
proto.RegisterType((*VectorANNS)(nil), "milvus.proto.plan.VectorANNS")
proto.RegisterType((*PlanNode)(nil), "milvus.proto.plan.PlanNode")
proto.RegisterType((*RetrieveRequest)(nil), "milvus.proto.plan.RetrieveRequest")
proto.RegisterType((*RetrieveResults)(nil), "milvus.proto.plan.RetrieveResults")
}
func init() { proto.RegisterFile("plan.proto", fileDescriptor_2d655ab2f7683c23) }
var fileDescriptor_2d655ab2f7683c23 = []byte{
// 829 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x4f, 0x6f, 0x23, 0x35,
0x14, 0xcf, 0xcc, 0xe4, 0xcf, 0xcc, 0x4b, 0x36, 0x1b, 0x7c, 0x21, 0x50, 0x56, 0x8d, 0x66, 0x11,
0x44, 0x42, 0x9b, 0x8a, 0xec, 0xd2, 0x95, 0x16, 0x2d, 0xa2, 0x85, 0xd5, 0xb6, 0xd2, 0x2a, 0x5d,
0x86, 0xd0, 0x03, 0x97, 0x91, 0x33, 0xe3, 0x24, 0x16, 0x8e, 0xed, 0x7a, 0x9c, 0xa8, 0x3d, 0x73,
0xe3, 0xc6, 0xe7, 0xe0, 0x0b, 0x71, 0xe7, 0x8b, 0x20, 0xdb, 0xd3, 0x49, 0x83, 0xd2, 0x22, 0x24,
0x6e, 0x7e, 0x7f, 0x7e, 0xef, 0xf7, 0xfe, 0xf9, 0x01, 0x48, 0x86, 0xf9, 0x48, 0x2a, 0xa1, 0x05,
0xfa, 0x60, 0x45, 0xd9, 0x66, 0x5d, 0x38, 0x69, 0x64, 0x0c, 0x1f, 0x77, 0x8a, 0x6c, 0x49, 0x56,
0xd8, 0xa9, 0x62, 0x09, 0x9d, 0xb7, 0x84, 0x13, 0x45, 0xb3, 0x4b, 0xcc, 0xd6, 0x04, 0x1d, 0x40,
0x38, 0x13, 0x82, 0xa5, 0x1b, 0xcc, 0xfa, 0xde, 0xc0, 0x1b, 0x86, 0x67, 0xb5, 0xa4, 0x65, 0x34,
0x97, 0x98, 0xa1, 0x27, 0x10, 0x51, 0xae, 0x8f, 0x5f, 0x58, 0xab, 0x3f, 0xf0, 0x86, 0xc1, 0x59,
0x2d, 0x09, 0xad, 0xaa, 0x34, 0xcf, 0x99, 0xc0, 0xda, 0x9a, 0x83, 0x81, 0x37, 0xf4, 0x8c, 0xd9,
0xaa, 0x2e, 0x31, 0x3b, 0x6d, 0x40, 0xb0, 0xc1, 0x2c, 0x26, 0x10, 0xfd, 0xb0, 0x26, 0xea, 0xe6,
0x9c, 0xcf, 0x05, 0x42, 0x50, 0xd7, 0x42, 0xfe, 0x62, 0xa9, 0x82, 0xc4, 0xbe, 0xd1, 0x21, 0xb4,
0x57, 0x44, 0x2b, 0x9a, 0xa5, 0xfa, 0x46, 0x12, 0x1b, 0x28, 0x4a, 0xc0, 0xa9, 0xa6, 0x37, 0x92,
0xa0, 0xa7, 0xf0, 0xa8, 0x20, 0x58, 0x65, 0xcb, 0x54, 0x62, 0x85, 0x57, 0x45, 0xbf, 0x6e, 0x5d,
0x3a, 0x4e, 0xf9, 0xde, 0xea, 0xe2, 0x0c, 0xe0, 0x3b, 0xc1, 0xd6, 0x2b, 0x6e, 0x79, 0x3e, 0x82,
0x70, 0x4e, 0x09, 0xcb, 0x53, 0x9a, 0x97, 0x5c, 0x2d, 0x2b, 0x9f, 0xe7, 0xe8, 0x15, 0x44, 0x39,
0xd6, 0xd8, 0x91, 0x99, 0xa2, 0xba, 0xe3, 0x27, 0xa3, 0x9d, 0xb6, 0x95, 0x0d, 0xfb, 0x1e, 0x6b,
0x6c, 0xf8, 0x93, 0x30, 0x2f, 0x5f, 0xf1, 0x1f, 0x3e, 0x44, 0x09, 0xe6, 0x0b, 0xf2, 0xe6, 0x5a,
0x2a, 0xf4, 0x0d, 0xb4, 0x33, 0x4b, 0x99, 0x52, 0x3e, 0x17, 0x96, 0xa7, 0xfd, 0xcf, 0x58, 0x76,
0x36, 0xdb, 0xc4, 0x12, 0xc8, 0xb6, 0x49, 0x7e, 0x05, 0x81, 0x90, 0x45, 0xdf, 0x1f, 0x04, 0xc3,
0xee, 0xf8, 0xe9, 0x1e, 0x5c, 0x45, 0x35, 0xba, 0x90, 0x36, 0x13, 0xe3, 0x8f, 0x5e, 0x42, 0x73,
0x63, 0x66, 0x57, 0xf4, 0x83, 0x41, 0x30, 0x6c, 0x8f, 0x0f, 0xf7, 0x20, 0xef, 0xce, 0x38, 0x29,
0xdd, 0x63, 0x0e, 0x4d, 0x17, 0x07, 0xb5, 0xa1, 0x75, 0xce, 0x37, 0x98, 0xd1, 0xbc, 0x57, 0x43,
0x8f, 0xa1, 0xfd, 0x56, 0x11, 0xac, 0x89, 0x9a, 0x2e, 0x31, 0xef, 0x79, 0xa8, 0x07, 0x9d, 0x52,
0xf1, 0xe6, 0x6a, 0x8d, 0x59, 0xcf, 0x47, 0x1d, 0x08, 0xdf, 0x91, 0xa2, 0xb0, 0xf6, 0x00, 0x3d,
0x82, 0xc8, 0x48, 0xce, 0x58, 0x47, 0x11, 0x34, 0xdc, 0xb3, 0x61, 0xfc, 0x26, 0x42, 0x3b, 0xa9,
0x19, 0xff, 0xea, 0x41, 0x38, 0x25, 0x6a, 0xf5, 0xbf, 0x34, 0x6b, 0x5b, 0xb5, 0xff, 0xdf, 0xaa,
0xfe, 0xdd, 0x83, 0xe8, 0x27, 0x8e, 0xd5, 0x8d, 0x4d, 0xe3, 0x05, 0xf8, 0x42, 0x5a, 0xf6, 0xee,
0xf8, 0xd3, 0x3d, 0x21, 0x2a, 0x4f, 0xf7, 0xba, 0x90, 0x89, 0x2f, 0x24, 0x7a, 0x06, 0x8d, 0x6c,
0x49, 0x59, 0x6e, 0xf7, 0xa5, 0x3d, 0xfe, 0x70, 0x0f, 0xd0, 0x60, 0x12, 0xe7, 0x15, 0x1f, 0x42,
0xab, 0x44, 0xef, 0x76, 0xba, 0x05, 0xc1, 0x44, 0xe8, 0x9e, 0x17, 0xff, 0xe9, 0x01, 0x9c, 0xd2,
0x2a, 0xa9, 0xe3, 0x3b, 0x49, 0x7d, 0xb6, 0x27, 0xf6, 0xd6, 0xb5, 0x7c, 0x96, 0x69, 0x7d, 0x01,
0x75, 0x46, 0xe6, 0xfa, 0xdf, 0xb2, 0xb2, 0x4e, 0xa6, 0x06, 0x45, 0x17, 0x4b, 0x6d, 0x3f, 0xd8,
0x43, 0x35, 0x58, 0xaf, 0xf8, 0x18, 0xc2, 0x5b, 0xae, 0xdd, 0x22, 0xba, 0x00, 0xef, 0xc4, 0x82,
0x66, 0x98, 0x9d, 0xf0, 0xbc, 0xe7, 0xd9, 0x6d, 0x70, 0xf2, 0x85, 0xea, 0xf9, 0xf1, 0x6f, 0x3e,
0xd4, 0x6d, 0x51, 0xaf, 0x01, 0x94, 0xd9, 0xdf, 0x94, 0x5c, 0x4b, 0x55, 0xce, 0xfb, 0x93, 0x87,
0x96, 0xfc, 0xac, 0x96, 0x44, 0xaa, 0xfa, 0x5c, 0xaf, 0x20, 0xd2, 0x44, 0xad, 0x1c, 0xda, 0x15,
0x78, 0xb0, 0x07, 0x7d, 0xbb, 0x5f, 0xe6, 0xf2, 0xe8, 0xdb, 0x5d, 0x7b, 0x0d, 0xb0, 0x36, 0xa9,
0x3b, 0x70, 0x70, 0x2f, 0x75, 0x35, 0x6c, 0x43, 0xbd, 0xae, 0xc6, 0xf1, 0x2d, 0xb4, 0x67, 0x74,
0x8b, 0xaf, 0xdf, 0xbb, 0xaa, 0xdb, 0xb9, 0x9c, 0xd5, 0x12, 0x98, 0x55, 0xd2, 0x69, 0x13, 0xea,
0x06, 0x1a, 0xff, 0xe5, 0x01, 0x5c, 0x92, 0x4c, 0x0b, 0x75, 0x32, 0x99, 0xfc, 0x88, 0x0e, 0x20,
0xa2, 0x45, 0xea, 0xfc, 0xdc, 0xb5, 0x4d, 0x42, 0x5a, 0xb8, 0x28, 0x3b, 0x27, 0xcb, 0xdf, 0x3d,
0x59, 0x2f, 0x01, 0xa4, 0x22, 0x39, 0xcd, 0xb0, 0xb6, 0xbf, 0xfe, 0xc1, 0xf9, 0xdd, 0x71, 0x45,
0x5f, 0x03, 0x5c, 0x99, 0xdb, 0xeb, 0xfe, 0x5c, 0xfd, 0xde, 0x46, 0x54, 0x07, 0x3a, 0x89, 0xae,
0xaa, 0x5b, 0xfd, 0x39, 0x3c, 0x96, 0x0c, 0x67, 0x64, 0x29, 0x58, 0x4e, 0x54, 0xaa, 0xf1, 0xa2,
0xdf, 0xb0, 0x87, 0xb7, 0x7b, 0x47, 0x3d, 0xc5, 0x8b, 0x78, 0x0a, 0xe1, 0x7b, 0x86, 0xf9, 0x44,
0xe4, 0xc4, 0xf4, 0x6e, 0x63, 0x0b, 0x4e, 0x31, 0xe7, 0xc5, 0x03, 0xdf, 0x7c, 0xdb, 0x16, 0xd3,
0x3b, 0x87, 0x39, 0xe1, 0xbc, 0x30, 0xbd, 0xe3, 0x22, 0x27, 0xa7, 0xcf, 0x7f, 0xfe, 0x72, 0x41,
0xf5, 0x72, 0x3d, 0x1b, 0x65, 0x62, 0x75, 0xe4, 0x02, 0x3c, 0xa3, 0xa2, 0x7c, 0x1d, 0x51, 0xae,
0x89, 0xe2, 0x98, 0x1d, 0xd9, 0x98, 0x47, 0x26, 0xa6, 0x9c, 0xcd, 0x9a, 0x56, 0x7a, 0xfe, 0x77,
0x00, 0x00, 0x00, 0xff, 0xff, 0x17, 0x9f, 0xfa, 0x86, 0x14, 0x07, 0x00, 0x00,
// 911 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0x4f, 0x6f, 0x1b, 0x45,
0x14, 0xcf, 0xee, 0x3a, 0xf1, 0xee, 0xb3, 0xeb, 0x98, 0xb9, 0x60, 0x08, 0x25, 0xd6, 0x16, 0x81,
0x05, 0xaa, 0x23, 0xdc, 0x92, 0x4a, 0x45, 0x05, 0x12, 0x5a, 0x9a, 0x48, 0x95, 0x53, 0x06, 0x93,
0x03, 0x97, 0xd5, 0x78, 0x77, 0x6c, 0x8f, 0x18, 0xcf, 0x6c, 0x66, 0x67, 0xad, 0xe6, 0xc2, 0x85,
0x1b, 0x37, 0x3e, 0x07, 0x5f, 0x88, 0x3b, 0x5f, 0x04, 0xcd, 0xcc, 0xc6, 0x7f, 0x90, 0x13, 0x54,
0xa9, 0xb7, 0x99, 0xf7, 0xde, 0xef, 0xfd, 0xde, 0xbf, 0x79, 0x03, 0x90, 0x73, 0x22, 0xfa, 0xb9,
0x92, 0x5a, 0xa2, 0xf7, 0xe6, 0x8c, 0x2f, 0xca, 0xc2, 0xdd, 0xfa, 0x46, 0xf1, 0x61, 0xb3, 0x48,
0x67, 0x74, 0x4e, 0x9c, 0x28, 0xce, 0xa1, 0xf9, 0x92, 0x0a, 0xaa, 0x58, 0x7a, 0x49, 0x78, 0x49,
0xd1, 0x01, 0x84, 0x63, 0x29, 0x79, 0xb2, 0x20, 0xbc, 0xe3, 0x75, 0xbd, 0x5e, 0x78, 0xb6, 0x83,
0xeb, 0x46, 0x72, 0x49, 0x38, 0xba, 0x0f, 0x11, 0x13, 0xfa, 0xf8, 0xb1, 0xd5, 0xfa, 0x5d, 0xaf,
0x17, 0x9c, 0xed, 0xe0, 0xd0, 0x8a, 0x2a, 0xf5, 0x84, 0x4b, 0xa2, 0xad, 0x3a, 0xe8, 0x7a, 0x3d,
0xcf, 0xa8, 0xad, 0xe8, 0x92, 0xf0, 0xd3, 0x5d, 0x08, 0x16, 0x84, 0xc7, 0x14, 0xa2, 0x1f, 0x4b,
0xaa, 0xae, 0xcf, 0xc5, 0x44, 0x22, 0x04, 0x35, 0x2d, 0xf3, 0x5f, 0x2d, 0x55, 0x80, 0xed, 0x19,
0x1d, 0x42, 0x63, 0x4e, 0xb5, 0x62, 0x69, 0xa2, 0xaf, 0x73, 0x6a, 0x1d, 0x45, 0x18, 0x9c, 0x68,
0x74, 0x9d, 0x53, 0xf4, 0x00, 0xee, 0x15, 0x94, 0xa8, 0x74, 0x96, 0xe4, 0x44, 0x91, 0x79, 0xd1,
0xa9, 0x59, 0x93, 0xa6, 0x13, 0xbe, 0xb6, 0xb2, 0x38, 0x05, 0xf8, 0x5e, 0xf2, 0x72, 0x2e, 0x2c,
0xcf, 0x07, 0x10, 0x4e, 0x18, 0xe5, 0x59, 0xc2, 0xb2, 0x8a, 0xab, 0x6e, 0xef, 0xe7, 0x19, 0x7a,
0x0a, 0x51, 0x46, 0x34, 0x71, 0x64, 0x26, 0xa9, 0xd6, 0xe0, 0x7e, 0x7f, 0xa3, 0x6c, 0x55, 0xc1,
0x9e, 0x13, 0x4d, 0x0c, 0x3f, 0x0e, 0xb3, 0xea, 0x14, 0xff, 0xe5, 0x43, 0x84, 0x89, 0x98, 0xd2,
0x17, 0x6f, 0x72, 0x85, 0xbe, 0x81, 0x46, 0x6a, 0x29, 0x13, 0x26, 0x26, 0xd2, 0xf2, 0x34, 0xfe,
0xeb, 0xcb, 0xf6, 0x66, 0x15, 0x18, 0x86, 0x74, 0x15, 0xe4, 0x57, 0x10, 0xc8, 0xbc, 0xe8, 0xf8,
0xdd, 0xa0, 0xd7, 0x1a, 0x3c, 0xd8, 0x82, 0x5b, 0x52, 0xf5, 0x2f, 0x72, 0x1b, 0x89, 0xb1, 0x47,
0x4f, 0x60, 0x6f, 0x61, 0x7a, 0x57, 0x74, 0x82, 0x6e, 0xd0, 0x6b, 0x0c, 0x0e, 0xb7, 0x20, 0xd7,
0x7b, 0x8c, 0x2b, 0xf3, 0x58, 0xc0, 0x9e, 0xf3, 0x83, 0x1a, 0x50, 0x3f, 0x17, 0x0b, 0xc2, 0x59,
0xd6, 0xde, 0x41, 0xfb, 0xd0, 0x78, 0xa9, 0x28, 0xd1, 0x54, 0x8d, 0x66, 0x44, 0xb4, 0x3d, 0xd4,
0x86, 0x66, 0x25, 0x78, 0x71, 0x55, 0x12, 0xde, 0xf6, 0x51, 0x13, 0xc2, 0x57, 0xb4, 0x28, 0xac,
0x3e, 0x40, 0xf7, 0x20, 0x32, 0x37, 0xa7, 0xac, 0xa1, 0x08, 0x76, 0xdd, 0x71, 0xd7, 0xd8, 0x0d,
0xa5, 0x76, 0xb7, 0xbd, 0xf8, 0x77, 0x0f, 0xc2, 0x11, 0x55, 0xf3, 0x77, 0x52, 0xac, 0x55, 0xd6,
0xfe, 0xdb, 0x65, 0xfd, 0xa7, 0x07, 0xd1, 0xcf, 0x82, 0xa8, 0x6b, 0x1b, 0xc6, 0x63, 0xf0, 0x65,
0x6e, 0xd9, 0x5b, 0x83, 0x4f, 0xb6, 0xb8, 0x58, 0x5a, 0xba, 0xd3, 0x45, 0x8e, 0x7d, 0x99, 0xa3,
0x87, 0xb0, 0x9b, 0xce, 0x18, 0xcf, 0xec, 0xbc, 0x34, 0x06, 0xef, 0x6f, 0x01, 0x1a, 0x0c, 0x76,
0x56, 0xf1, 0x21, 0xd4, 0x2b, 0xf4, 0x66, 0xa5, 0xeb, 0x10, 0x0c, 0xa5, 0x6e, 0x7b, 0xf1, 0xdf,
0x1e, 0xc0, 0x29, 0x5b, 0x06, 0x75, 0xbc, 0x16, 0xd4, 0xa7, 0x5b, 0x7c, 0xaf, 0x4c, 0xab, 0x63,
0x15, 0xd6, 0x17, 0x50, 0xe3, 0x74, 0xa2, 0xff, 0x2f, 0x2a, 0x6b, 0x64, 0x72, 0x50, 0x6c, 0x3a,
0xd3, 0xf6, 0x81, 0xdd, 0x95, 0x83, 0xb5, 0x8a, 0x8f, 0x21, 0xbc, 0xe1, 0xda, 0x4c, 0xa2, 0x05,
0xf0, 0x4a, 0x4e, 0x59, 0x4a, 0xf8, 0x89, 0xc8, 0xda, 0x9e, 0x9d, 0x06, 0x77, 0xbf, 0x50, 0x6d,
0x3f, 0xfe, 0xc3, 0x87, 0x9a, 0x4d, 0xea, 0x19, 0x80, 0x32, 0xf3, 0x9b, 0xd0, 0x37, 0xb9, 0xaa,
0xfa, 0xfd, 0xd1, 0x5d, 0x43, 0x7e, 0xb6, 0x83, 0x23, 0xb5, 0x7c, 0x5c, 0x4f, 0x21, 0xd2, 0x54,
0xcd, 0x1d, 0xda, 0x25, 0x78, 0xb0, 0x05, 0x7d, 0x33, 0x5f, 0x66, 0xf3, 0xe8, 0x9b, 0x59, 0x7b,
0x06, 0x50, 0x9a, 0xd0, 0x1d, 0x38, 0xb8, 0x95, 0x7a, 0xd9, 0x6c, 0x43, 0x5d, 0x2e, 0xdb, 0xf1,
0x1d, 0x34, 0xc6, 0x6c, 0x85, 0xaf, 0xdd, 0x3a, 0xaa, 0xab, 0xbe, 0x9c, 0xed, 0x60, 0x18, 0x2f,
0x6f, 0xa7, 0x7b, 0x50, 0x33, 0xd0, 0xf8, 0x1f, 0x0f, 0xe0, 0x92, 0xa6, 0x5a, 0xaa, 0x93, 0xe1,
0xf0, 0x27, 0x74, 0x00, 0x11, 0x2b, 0x12, 0x67, 0xe7, 0xb6, 0x2d, 0x0e, 0x59, 0xe1, 0xbc, 0x6c,
0xac, 0x2c, 0x7f, 0x73, 0x65, 0x3d, 0x01, 0xc8, 0x15, 0xcd, 0x58, 0x4a, 0xb4, 0x7d, 0xf5, 0x77,
0xf6, 0x6f, 0xcd, 0x14, 0x7d, 0x0d, 0x70, 0x65, 0x76, 0xaf, 0x7b, 0x73, 0xb5, 0x5b, 0x0b, 0xb1,
0x5c, 0xd0, 0x38, 0xba, 0x5a, 0xee, 0xea, 0xcf, 0x60, 0x3f, 0xe7, 0x24, 0xa5, 0x33, 0xc9, 0x33,
0xaa, 0x12, 0x4d, 0xa6, 0x9d, 0x5d, 0xbb, 0x78, 0x5b, 0x6b, 0xe2, 0x11, 0x99, 0xc6, 0x23, 0x08,
0x5f, 0x73, 0x22, 0x86, 0x32, 0xa3, 0xa6, 0x76, 0x0b, 0x9b, 0x70, 0x42, 0x84, 0x28, 0xee, 0x78,
0xe6, 0xab, 0xb2, 0x98, 0xda, 0x39, 0xcc, 0x89, 0x10, 0x85, 0xa9, 0x9d, 0x90, 0x19, 0x8d, 0xc7,
0xb0, 0x8f, 0xcd, 0x1f, 0x40, 0x17, 0x14, 0xd3, 0xab, 0x92, 0x16, 0x1a, 0x7d, 0x0e, 0x01, 0xcb,
0x6e, 0x9c, 0x76, 0xb6, 0x2e, 0xed, 0xf3, 0xe7, 0x05, 0x36, 0x46, 0xe6, 0xd3, 0x90, 0xa5, 0xce,
0x4b, 0x9d, 0xd8, 0x2a, 0xba, 0xb5, 0x11, 0xe1, 0xa6, 0x13, 0xfe, 0x60, 0x65, 0xf1, 0x6f, 0xeb,
0x1c, 0x45, 0xc9, 0x75, 0xf1, 0x56, 0x1c, 0xdf, 0x42, 0xc3, 0x39, 0x4f, 0xcc, 0x0f, 0x51, 0x2d,
0xa6, 0x8f, 0xb7, 0x62, 0x2c, 0xa1, 0xf9, 0x51, 0x30, 0x38, 0x88, 0x39, 0x9f, 0x3e, 0xfa, 0xe5,
0xcb, 0x29, 0xd3, 0xb3, 0x72, 0xdc, 0x4f, 0xe5, 0xfc, 0xc8, 0xe1, 0x1e, 0x32, 0x59, 0x9d, 0x8e,
0x98, 0xd0, 0x54, 0x09, 0xc2, 0x8f, 0xac, 0xab, 0x23, 0x53, 0xb7, 0x7c, 0x3c, 0xde, 0xb3, 0xb7,
0x47, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xeb, 0x2d, 0x71, 0x52, 0xf8, 0x07, 0x00, 0x00,
}

View File

@ -0,0 +1,104 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include "segcore/collection_c.h"
#include "common/type_c.h"
#include "segcore/segment_c.h"
*/
import "C"
import (
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"unsafe"
)
// ProtoCGo is protobuf created by go side,
// passed to c side
// memory is managed by go GC
type ProtoCGo struct {
CProto C.CProto
blob []byte
}
func MarshalForCGo(msg proto.Message) (*ProtoCGo, error) {
blob, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
protoCGo := &ProtoCGo{
blob: blob,
CProto: C.CProto{
proto_size: (C.int64_t)(len(blob)),
proto_blob: unsafe.Pointer(&blob[0]),
},
}
return protoCGo, nil
}
func (protoCGo *ProtoCGo) destruct() {
// NOTE: at ProtoCGo, blob is go heap memory, no need to destruct
protoCGo.blob = nil
}
func HandleCStatus(status *C.CStatus, extraInfo string) error {
if status.error_code == 0 {
return nil
}
errorCode := status.error_code
errorName, ok := commonpb.ErrorCode_name[int32(errorCode)]
if !ok {
errorName = "UnknownError"
}
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
log.Error(logMsg)
return errors.New(finalMsg)
}
func HandleCProtoResult(cRes *C.CProtoResult, msg proto.Message) error {
// Standalone CProto is protobuf created by C side,
// Passed from c side
// memory is managed manually
err := HandleCStatus(&cRes.status, "")
if err != nil {
return err
}
cpro := cRes.proto
blob := C.GoBytes(unsafe.Pointer(cpro.proto_blob), C.int32_t(cpro.proto_size))
defer C.free(cpro.proto_blob)
return proto.Unmarshal(blob, msg)
}
// TestBoolArray this function will accept a BoolArray input,
// and return a BoolArray output
// which negates all elements of the input
func TestBoolArray(cpb *ProtoCGo) (*schemapb.BoolArray, error) {
res := C.CTestBoolArrayPb(cpb.CProto)
ba := new(schemapb.BoolArray)
err := HandleCProtoResult(&res, ba)
return ba, err
}

View File

@ -0,0 +1,26 @@
package querynode
import (
"testing"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
)
func TestCGoHelper_Naive(t *testing.T) {
pb := schemapb.BoolArray{
Data: []bool{true, false, true, true, true},
}
cpb, err := MarshalForCGo(&pb)
assert.Nil(t, err)
// this function will accept a BoolArray input,
// and return a BoolArray output
// which negates all elements of the input
ba, err := TestBoolArray(cpb)
assert.Nil(t, err)
for index, b := range ba.Data {
assert.Equal(t, b, !pb.Data[index])
}
}

View File

@ -1,46 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include <malloc.h>
#include "common/type_c.h"
*/
import "C"
import (
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"unsafe"
)
func HandleCStatus(status *C.CStatus, extraInfo string) error {
if status.error_code == 0 {
return nil
}
errorCode := status.error_code
errorName, ok := commonpb.ErrorCode_name[int32(errorCode)]
if !ok {
errorName = "UnknownError"
}
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
log.Error(logMsg)
return errors.New(finalMsg)
}

View File

@ -21,9 +21,9 @@ package querynode
*/
import "C"
import (
"unsafe"
"errors"
"github.com/milvus-io/milvus/internal/proto/planpb"
"unsafe"
)
type Plan struct {
@ -103,3 +103,27 @@ func (pg *searchRequest) getNumOfQuery() int64 {
func (pg *searchRequest) delete() {
C.DeletePlaceholderGroup(pg.cPlaceholderGroup)
}
type RetrievePlan struct {
RetrievePlanPtr C.CRetrievePlan
Timestamp uint64
}
func createRetrievePlan(col *Collection, msg *planpb.RetrieveRequest, timestamp uint64) (*RetrievePlan, error) {
protoCGo, err := MarshalForCGo(msg)
if err != nil {
return nil, err
}
plan := new(RetrievePlan)
plan.Timestamp = timestamp
status := C.CreateRetrievePlan(col.collectionPtr, protoCGo.CProto, &plan.RetrievePlanPtr)
err2 := HandleCStatus(&status, "create retrieve plan failed")
if err2 != nil {
return nil, err2
}
return plan, nil
}
func (plan *RetrievePlan) delete() {
C.DeleteRetrievePlan(plan.RetrievePlanPtr)
}

View File

@ -13,16 +13,21 @@ package querynode
import (
"context"
"errors"
"fmt"
"sync"
"github.com/golang/protobuf/proto"
oplog "github.com/opentracing/opentracing-go/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
oplog "github.com/opentracing/opentracing-go/log"
"go.uber.org/zap"
)
type retrieveCollection struct {
@ -288,8 +293,132 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() {
}
}
func mergeRetrieveResults(dataArr []*planpb.RetrieveResults) (*planpb.RetrieveResults, error) {
var final *planpb.RetrieveResults
for _, data := range dataArr {
if data == nil {
continue
}
if final == nil {
final = proto.Clone(data).(*planpb.RetrieveResults)
continue
}
proto.Merge(final.Ids, data.Ids)
if len(final.FieldsData) != len(data.FieldsData) {
return nil, fmt.Errorf("mismatch FieldData in RetrieveResults")
}
for i := range final.FieldsData {
proto.Merge(final.FieldsData[i], data.FieldsData[i])
}
}
return final, nil
}
func (rc *retrieveCollection) retrieve(retrieveMsg *msgstream.RetrieveMsg) error {
// TODO(yukun)
// step 1: get retrieve object and defer destruction
// step 2: for each segment, call retrieve to get ids proto buffer
// step 3: merge all proto in go
// step 4: publish results
// retrieveProtoBlob, err := proto.Marshal(&retrieveMsg.RetrieveRequest)
sp, ctx := trace.StartSpanFromContext(retrieveMsg.TraceCtx())
defer sp.Finish()
retrieveMsg.SetTraceCtx(ctx)
timestamp := retrieveMsg.Base.Timestamp
collectionID := retrieveMsg.CollectionID
collection, err := rc.historicalReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
req := &planpb.RetrieveRequest{
Ids: retrieveMsg.Ids,
OutputFields: retrieveMsg.OutputFields,
}
plan, err := createRetrievePlan(collection, req, timestamp)
if err != nil {
return err
}
defer plan.delete()
var partitionIDsInHistorical []UniqueID
var partitionIDsInStreaming []UniqueID
partitionIDsInQuery := retrieveMsg.PartitionIDs
if len(partitionIDsInQuery) == 0 {
partitionIDsInHistoricalCol, err1 := rc.historicalReplica.getPartitionIDs(collectionID)
partitionIDsInStreamingCol, err2 := rc.streamingReplica.getPartitionIDs(collectionID)
if err1 != nil && err2 != nil {
return err2
}
if len(partitionIDsInHistoricalCol) == 0 {
return errors.New("none of this collection's partition has been loaded")
}
partitionIDsInHistorical = partitionIDsInHistoricalCol
partitionIDsInStreaming = partitionIDsInStreamingCol
} else {
for _, id := range partitionIDsInQuery {
_, err1 := rc.historicalReplica.getPartitionByID(id)
if err1 == nil {
partitionIDsInHistorical = append(partitionIDsInHistorical, id)
}
_, err2 := rc.streamingReplica.getPartitionByID(id)
if err2 == nil {
partitionIDsInStreaming = append(partitionIDsInStreaming, id)
}
if err1 != nil && err2 != nil {
return err2
}
}
}
var mergeList []*planpb.RetrieveResults
for _, partitionID := range partitionIDsInHistorical {
segmentIDs, err := rc.historicalReplica.getSegmentIDs(partitionID)
if err != nil {
return err
}
for _, segmentID := range segmentIDs {
segment, err := rc.historicalReplica.getSegmentByID(segmentID)
if err != nil {
return err
}
result, err := segment.segmentGetEntityByIds(plan)
if err != nil {
return err
}
mergeList = append(mergeList, result)
}
}
for _, partitionID := range partitionIDsInStreaming {
segmentIDs, err := rc.streamingReplica.getSegmentIDs(partitionID)
if err != nil {
return err
}
for _, segmentID := range segmentIDs {
segment, err := rc.streamingReplica.getSegmentByID(segmentID)
if err != nil {
return err
}
result, err := segment.segmentGetEntityByIds(plan)
if err != nil {
return err
}
mergeList = append(mergeList, result)
}
}
result, err := mergeRetrieveResults(mergeList)
if err != nil {
return err
}
resultChannelInt := 0
retrieveResultMsg := &msgstream.RetrieveResultMsg{
BaseMsg: msgstream.BaseMsg{Ctx: retrieveMsg.Ctx, HashValues: []uint32{uint32(resultChannelInt)}},
@ -300,12 +429,14 @@ func (rc *retrieveCollection) retrieve(retrieveMsg *msgstream.RetrieveMsg) error
SourceID: retrieveMsg.Base.SourceID,
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Ids: result.Ids,
FieldsData: result.FieldsData,
ResultChannelID: retrieveMsg.ResultChannelID,
},
}
err := rc.publishRetrieveResult(retrieveResultMsg, retrieveMsg.CollectionID)
if err != nil {
return err
err3 := rc.publishRetrieveResult(retrieveResultMsg, retrieveMsg.CollectionID)
if err3 != nil {
return err3
}
return nil
}

View File

@ -0,0 +1,54 @@
package querynode
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
func TestRetrieve_Merge(t *testing.T) {
col1 := &schemapb.FieldData{
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{1, 2, 3},
},
},
},
},
}
col2 := &schemapb.FieldData{
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: []float32{1, 1, 2, 2, 3, 3},
},
},
},
},
}
subRes := &planpb.RetrieveResults{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{1, 2, 3},
},
},
},
FieldsData: []*schemapb.FieldData{
col1,
col2,
},
}
finalRes, err := mergeRetrieveResults([]*planpb.RetrieveResults{subRes, subRes})
assert.NoError(t, err)
println(finalRes.String())
}

View File

@ -306,17 +306,15 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
searchResults := make([]*SearchResult, 0)
matchedSegments := make([]*Segment, 0)
//log.Debug("search msg's partitionID = ", partitionIDsInQuery)
partitionIDsInHistoricalCol, err1 := s.historicalReplica.getPartitionIDs(collectionID)
partitionIDsInStreamingCol, err2 := s.streamingReplica.getPartitionIDs(collectionID)
if err1 != nil && err2 != nil {
return err2
}
var searchPartitionIDsInHistorical []UniqueID
var searchPartitionIDsInStreaming []UniqueID
partitionIDsInQuery := searchMsg.PartitionIDs
if len(partitionIDsInQuery) == 0 {
partitionIDsInHistoricalCol, err1 := s.historicalReplica.getPartitionIDs(collectionID)
partitionIDsInStreamingCol, err2 := s.streamingReplica.getPartitionIDs(collectionID)
if err1 != nil && err2 != nil {
return err2
}
if len(partitionIDsInHistoricalCol) == 0 {
return errors.New("none of this collection's partition has been loaded")
}
@ -324,11 +322,11 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
searchPartitionIDsInStreaming = partitionIDsInStreamingCol
} else {
for _, id := range partitionIDsInQuery {
_, err1 = s.historicalReplica.getPartitionByID(id)
_, err1 := s.historicalReplica.getPartitionByID(id)
if err1 == nil {
searchPartitionIDsInHistorical = append(searchPartitionIDsInHistorical, id)
}
_, err2 = s.streamingReplica.getPartitionByID(id)
_, err2 := s.streamingReplica.getPartitionByID(id)
if err2 == nil {
searchPartitionIDsInStreaming = append(searchPartitionIDsInStreaming, id)
}

View File

@ -24,6 +24,7 @@ package querynode
import "C"
import (
"fmt"
"github.com/milvus-io/milvus/internal/proto/planpb"
"strconv"
"sync"
"unsafe"
@ -234,6 +235,16 @@ func (s *Segment) segmentSearch(plan *Plan,
return &searchResult, nil
}
func (s *Segment) segmentGetEntityByIds(plan *RetrievePlan) (*planpb.RetrieveResults, error) {
resProto := C.GetEntityByIds(s.segmentPtr, plan.RetrievePlanPtr, C.uint64_t(plan.Timestamp))
result := new(planpb.RetrieveResults)
err := HandleCProtoResult(&resProto, result)
if err != nil {
return nil, err
}
return result, nil
}
func (s *Segment) fillTargetEntry(plan *Plan,
result *SearchResult) error {
if s.segmentPtr == nil {

View File

@ -22,6 +22,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
//-------------------------------------------------------------------------------------- constructor and destructor
@ -103,6 +105,66 @@ func TestSegment_getRowCount(t *testing.T) {
deleteCollection(collection)
}
func TestSegment_retrieve(t *testing.T) {
collectionID := UniqueID(0)
collectionMeta := genTestCollectionMeta(collectionID, false)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
assert.Equal(t, collection.ID(), collectionID)
segmentID := UniqueID(0)
segment := newSegment(collection, segmentID, defaultPartitionID, collectionID, segmentTypeGrowing)
assert.Equal(t, segmentID, segment.segmentID)
ids := []int64{}
timestamps := []Timestamp{}
const DIM = 16
const N = 100
var records []*commonpb.Blob
for i := 0; i < N; i++ {
ids = append(ids, int64(i))
timestamps = append(timestamps, 0)
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
var rawData []byte
for _, ele := range vec {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele+float32(i)*float32(N)))
rawData = append(rawData, buf...)
}
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, 1)
rawData = append(rawData, bs...)
blob := &commonpb.Blob{
Value: rawData,
}
records = append(records, blob)
}
offset, err := segment.segmentPreInsert(N)
assert.Nil(t, err)
assert.Equal(t, offset, int64(0))
err = segment.segmentInsert(offset, &ids, &timestamps, &records)
assert.NoError(t, err)
reqIds := &planpb.RetrieveRequest{
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{2, 3, 1},
},
},
},
OutputFields: []string{"vec"},
}
plan, err := createRetrievePlan(collection, reqIds, 100)
defer plan.delete()
assert.NoError(t, err)
res, err := segment.segmentGetEntityByIds(plan)
assert.NoError(t, err)
assert.Equal(t, res.Ids.GetIntId().Data, []int64{2, 3, 1})
}
func TestSegment_getDeletedCount(t *testing.T) {
collectionID := UniqueID(0)
collectionMeta := genTestCollectionMeta(collectionID, false)