Enable primary_key switch

Signed-off-by: FluorineDog <guilin.gou@zilliz.com>
This commit is contained in:
FluorineDog 2020-11-28 09:16:00 +08:00 committed by yefu.chen
parent 235e723eb6
commit c9fb34142c
53 changed files with 920 additions and 479 deletions

View File

@ -19,6 +19,8 @@ etcd:
address: localhost
port: 2379
rootPath: by-dev
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
segThreshold: 10000
pulsar:

View File

@ -3,7 +3,6 @@ package allocator
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
@ -215,7 +214,6 @@ func (ta *Allocator) sync(timeout bool) {
if !timeout {
ta.tChan.Reset()
}
fmt.Println("synced")
}
func (ta *Allocator) finishSyncRequest() {

View File

@ -77,7 +77,6 @@ func (ia *IDAllocator) processFunc(req request) error {
idRequest := req.(*idRequest)
idRequest.id = ia.idStart
ia.idStart++
fmt.Println("process ID")
return nil
}

View File

@ -173,7 +173,6 @@ func (sa *SegIDAssigner) syncSegments() {
sa.segReqs = sa.segReqs[0:0]
fmt.Println("OOOOO", req.PerChannelReq)
resp, err := sa.masterClient.AssignSegmentID(ctx, req)
log.Printf("resp: %v", resp)
if resp.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS {
log.Panic("GRPC AssignSegmentID Failed")
@ -249,7 +248,6 @@ func (sa *SegIDAssigner) processFunc(req request) error {
}
}
segRequest.segInfo = resultSegInfo
fmt.Println("process segmentID")
return nil
}

View File

@ -2,7 +2,6 @@ package allocator
import (
"context"
"fmt"
"log"
"time"
@ -54,7 +53,6 @@ func (ta *TimestampAllocator) checkFunc(timeout bool) bool {
}
func (ta *TimestampAllocator) syncTs() {
fmt.Println("sync TS")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
req := &internalpb.TsoRequest{
PeerID: 1,
@ -62,7 +60,6 @@ func (ta *TimestampAllocator) syncTs() {
Count: ta.countPerRPC,
}
resp, err := ta.masterClient.AllocTimestamp(ctx, req)
log.Printf("resp: %v", resp)
cancel()
if err != nil {
@ -77,7 +74,6 @@ func (ta *TimestampAllocator) processFunc(req request) error {
tsoRequest := req.(*tsoRequest)
tsoRequest.timestamp = ta.lastTsBegin
ta.lastTsBegin++
fmt.Println("process tso")
return nil
}
@ -95,7 +91,6 @@ func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) {
}
req.count = count
ta.reqs <- req
fmt.Println("YYYYY ", len(ta.reqs))
req.Wait()
if !req.IsValid() {

View File

@ -29,3 +29,4 @@ add_subdirectory( pb )
add_subdirectory( segcore )
add_subdirectory( cache )
add_subdirectory( query )
add_subdirectory( common )

View File

@ -0,0 +1,8 @@
set(COMMON_SRC
Schema.cpp
)
add_library(milvus_common
${COMMON_SRC}
)
target_link_libraries(milvus_common milvus_proto)

View File

@ -0,0 +1,46 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "common/Schema.h"
#include <google/protobuf/text_format.h>
namespace milvus {
std::shared_ptr<Schema>
Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) {
auto schema = std::make_shared<Schema>();
schema->set_auto_id(schema_proto.autoid());
for (const milvus::proto::schema::FieldSchema& child : schema_proto.fields()) {
const auto& type_params = child.type_params();
int64_t dim = -1;
auto data_type = DataType(child.data_type());
for (const auto& type_param : type_params) {
if (type_param.key() == "dim") {
dim = strtoll(type_param.value().c_str(), nullptr, 10);
}
}
if (field_is_vector(data_type)) {
AssertInfo(dim != -1, "dim not found");
} else {
AssertInfo(dim == 1 || dim == -1, "Invalid dim field. Should be 1 or not exists");
dim = 1;
}
if (child.is_primary_key()) {
AssertInfo(!schema->primary_key_offset_opt_.has_value(), "repetitive primary key");
schema->primary_key_offset_opt_ = schema->size();
}
schema->AddField(child.name(), data_type, dim);
}
return schema;
}
} // namespace milvus

View File

@ -16,6 +16,8 @@
#include <string>
#include <unordered_map>
#include <memory>
#include <pb/schema.pb.h>
#include <optional>
namespace milvus {
@ -37,6 +39,11 @@ class Schema {
total_sizeof_ += field_sizeof;
}
void
set_auto_id(bool is_auto_id) {
is_auto_id_ = is_auto_id;
}
auto
begin() {
return fields_.begin();
@ -46,6 +53,13 @@ class Schema {
end() {
return fields_.end();
}
public:
bool
get_is_auto_id() const {
return is_auto_id_;
}
auto
begin() const {
return fields_.begin();
@ -100,6 +114,15 @@ class Schema {
return (*this)[offset];
}
std::optional<int>
get_primary_key_offset() const {
return primary_key_offset_opt_;
}
public:
static std::shared_ptr<Schema>
ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto);
private:
// this is where data holds
std::vector<FieldMeta> fields_;
@ -109,6 +132,8 @@ class Schema {
std::unordered_map<std::string, int> offsets_;
std::vector<int> sizeof_infos_;
int total_sizeof_ = 0;
bool is_auto_id_ = true;
std::optional<int> primary_key_offset_opt_;
};
using SchemaPtr = std::shared_ptr<Schema>;

View File

@ -72,6 +72,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_schema_2eproto::offsets[] PROT
~0u, // no _oneof_case_
~0u, // no _weak_field_map_
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldSchema, name_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldSchema, is_primary_key_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldSchema, description_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldSchema, data_type_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldSchema, type_params_),
@ -88,7 +89,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_schema_2eproto::offsets[] PROT
};
static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = {
{ 0, -1, sizeof(::milvus::proto::schema::FieldSchema)},
{ 10, -1, sizeof(::milvus::proto::schema::CollectionSchema)},
{ 11, -1, sizeof(::milvus::proto::schema::CollectionSchema)},
};
static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = {
@ -98,21 +99,21 @@ static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] =
const char descriptor_table_protodef_schema_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) =
"\n\014schema.proto\022\023milvus.proto.schema\032\014com"
"mon.proto\"\323\001\n\013FieldSchema\022\014\n\004name\030\001 \001(\t\022"
"\023\n\013description\030\002 \001(\t\0220\n\tdata_type\030\003 \001(\0162"
"\035.milvus.proto.schema.DataType\0226\n\013type_p"
"arams\030\004 \003(\0132!.milvus.proto.common.KeyVal"
"uePair\0227\n\014index_params\030\005 \003(\0132!.milvus.pr"
"oto.common.KeyValuePair\"w\n\020CollectionSch"
"ema\022\014\n\004name\030\001 \001(\t\022\023\n\013description\030\002 \001(\t\022\016"
"\n\006autoID\030\003 \001(\010\0220\n\006fields\030\004 \003(\0132 .milvus."
"proto.schema.FieldSchema*\221\001\n\010DataType\022\010\n"
"\004NONE\020\000\022\010\n\004BOOL\020\001\022\010\n\004INT8\020\002\022\t\n\005INT16\020\003\022\t"
"\n\005INT32\020\004\022\t\n\005INT64\020\005\022\t\n\005FLOAT\020\n\022\n\n\006DOUBL"
"E\020\013\022\n\n\006STRING\020\024\022\021\n\rVECTOR_BINARY\020d\022\020\n\014VE"
"CTOR_FLOAT\020eBBZ@github.com/zilliztech/mi"
"lvus-distributed/internal/proto/schemapb"
"b\006proto3"
"mon.proto\"\353\001\n\013FieldSchema\022\014\n\004name\030\001 \001(\t\022"
"\026\n\016is_primary_key\030\002 \001(\010\022\023\n\013description\030\003"
" \001(\t\0220\n\tdata_type\030\004 \001(\0162\035.milvus.proto.s"
"chema.DataType\0226\n\013type_params\030\005 \003(\0132!.mi"
"lvus.proto.common.KeyValuePair\0227\n\014index_"
"params\030\006 \003(\0132!.milvus.proto.common.KeyVa"
"luePair\"w\n\020CollectionSchema\022\014\n\004name\030\001 \001("
"\t\022\023\n\013description\030\002 \001(\t\022\016\n\006autoID\030\003 \001(\010\0220"
"\n\006fields\030\004 \003(\0132 .milvus.proto.schema.Fie"
"ldSchema*\221\001\n\010DataType\022\010\n\004NONE\020\000\022\010\n\004BOOL\020"
"\001\022\010\n\004INT8\020\002\022\t\n\005INT16\020\003\022\t\n\005INT32\020\004\022\t\n\005INT"
"64\020\005\022\t\n\005FLOAT\020\n\022\n\n\006DOUBLE\020\013\022\n\n\006STRING\020\024\022"
"\021\n\rVECTOR_BINARY\020d\022\020\n\014VECTOR_FLOAT\020eBBZ@"
"github.com/zilliztech/milvus-distributed"
"/internal/proto/schemapbb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_schema_2eproto_deps[1] = {
&::descriptor_table_common_2eproto,
@ -124,7 +125,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_sch
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_schema_2eproto_once;
static bool descriptor_table_schema_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_schema_2eproto = {
&descriptor_table_schema_2eproto_initialized, descriptor_table_protodef_schema_2eproto, "schema.proto", 608,
&descriptor_table_schema_2eproto_initialized, descriptor_table_protodef_schema_2eproto, "schema.proto", 632,
&descriptor_table_schema_2eproto_once, descriptor_table_schema_2eproto_sccs, descriptor_table_schema_2eproto_deps, 2, 1,
schemas, file_default_instances, TableStruct_schema_2eproto::offsets,
file_level_metadata_schema_2eproto, 2, file_level_enum_descriptors_schema_2eproto, file_level_service_descriptors_schema_2eproto,
@ -192,7 +193,9 @@ FieldSchema::FieldSchema(const FieldSchema& from)
if (!from.description().empty()) {
description_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.description_);
}
data_type_ = from.data_type_;
::memcpy(&is_primary_key_, &from.is_primary_key_,
static_cast<size_t>(reinterpret_cast<char*>(&data_type_) -
reinterpret_cast<char*>(&is_primary_key_)) + sizeof(data_type_));
// @@protoc_insertion_point(copy_constructor:milvus.proto.schema.FieldSchema)
}
@ -200,7 +203,9 @@ void FieldSchema::SharedCtor() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_FieldSchema_schema_2eproto.base);
name_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
description_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
data_type_ = 0;
::memset(&is_primary_key_, 0, static_cast<size_t>(
reinterpret_cast<char*>(&data_type_) -
reinterpret_cast<char*>(&is_primary_key_)) + sizeof(data_type_));
}
FieldSchema::~FieldSchema() {
@ -232,7 +237,9 @@ void FieldSchema::Clear() {
index_params_.Clear();
name_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
description_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
data_type_ = 0;
::memset(&is_primary_key_, 0, static_cast<size_t>(
reinterpret_cast<char*>(&data_type_) -
reinterpret_cast<char*>(&is_primary_key_)) + sizeof(data_type_));
_internal_metadata_.Clear();
}
@ -251,43 +258,50 @@ const char* FieldSchema::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID
CHK_(ptr);
} else goto handle_unusual;
continue;
// string description = 2;
// bool is_primary_key = 2;
case 2:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 18)) {
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 16)) {
is_primary_key_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr);
CHK_(ptr);
} else goto handle_unusual;
continue;
// string description = 3;
case 3:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 26)) {
ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParserUTF8(mutable_description(), ptr, ctx, "milvus.proto.schema.FieldSchema.description");
CHK_(ptr);
} else goto handle_unusual;
continue;
// .milvus.proto.schema.DataType data_type = 3;
case 3:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 24)) {
// .milvus.proto.schema.DataType data_type = 4;
case 4:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 32)) {
::PROTOBUF_NAMESPACE_ID::uint64 val = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr);
CHK_(ptr);
set_data_type(static_cast<::milvus::proto::schema::DataType>(val));
} else goto handle_unusual;
continue;
// repeated .milvus.proto.common.KeyValuePair type_params = 4;
case 4:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 34)) {
// repeated .milvus.proto.common.KeyValuePair type_params = 5;
case 5:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 42)) {
ptr -= 1;
do {
ptr += 1;
ptr = ctx->ParseMessage(add_type_params(), ptr);
CHK_(ptr);
if (!ctx->DataAvailable(ptr)) break;
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 34);
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 42);
} else goto handle_unusual;
continue;
// repeated .milvus.proto.common.KeyValuePair index_params = 5;
case 5:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 42)) {
// repeated .milvus.proto.common.KeyValuePair index_params = 6;
case 6:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 50)) {
ptr -= 1;
do {
ptr += 1;
ptr = ctx->ParseMessage(add_index_params(), ptr);
CHK_(ptr);
if (!ctx->DataAvailable(ptr)) break;
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 42);
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 50);
} else goto handle_unusual;
continue;
default: {
@ -335,9 +349,22 @@ bool FieldSchema::MergePartialFromCodedStream(
break;
}
// string description = 2;
// bool is_primary_key = 2;
case 2: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (18 & 0xFF)) {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (16 & 0xFF)) {
DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive<
bool, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_BOOL>(
input, &is_primary_key_)));
} else {
goto handle_unusual;
}
break;
}
// string description = 3;
case 3: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (26 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadString(
input, this->mutable_description()));
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
@ -350,9 +377,9 @@ bool FieldSchema::MergePartialFromCodedStream(
break;
}
// .milvus.proto.schema.DataType data_type = 3;
case 3: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (24 & 0xFF)) {
// .milvus.proto.schema.DataType data_type = 4;
case 4: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (32 & 0xFF)) {
int value = 0;
DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive<
int, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_ENUM>(
@ -364,9 +391,9 @@ bool FieldSchema::MergePartialFromCodedStream(
break;
}
// repeated .milvus.proto.common.KeyValuePair type_params = 4;
case 4: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (34 & 0xFF)) {
// repeated .milvus.proto.common.KeyValuePair type_params = 5;
case 5: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (42 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage(
input, add_type_params()));
} else {
@ -375,9 +402,9 @@ bool FieldSchema::MergePartialFromCodedStream(
break;
}
// repeated .milvus.proto.common.KeyValuePair index_params = 5;
case 5: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (42 & 0xFF)) {
// repeated .milvus.proto.common.KeyValuePair index_params = 6;
case 6: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (50 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage(
input, add_index_params()));
} else {
@ -423,36 +450,41 @@ void FieldSchema::SerializeWithCachedSizes(
1, this->name(), output);
}
// string description = 2;
// bool is_primary_key = 2;
if (this->is_primary_key() != 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteBool(2, this->is_primary_key(), output);
}
// string description = 3;
if (this->description().size() > 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->description().data(), static_cast<int>(this->description().length()),
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE,
"milvus.proto.schema.FieldSchema.description");
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteStringMaybeAliased(
2, this->description(), output);
3, this->description(), output);
}
// .milvus.proto.schema.DataType data_type = 3;
// .milvus.proto.schema.DataType data_type = 4;
if (this->data_type() != 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteEnum(
3, this->data_type(), output);
4, this->data_type(), output);
}
// repeated .milvus.proto.common.KeyValuePair type_params = 4;
// repeated .milvus.proto.common.KeyValuePair type_params = 5;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->type_params_size()); i < n; i++) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray(
4,
5,
this->type_params(static_cast<int>(i)),
output);
}
// repeated .milvus.proto.common.KeyValuePair index_params = 5;
// repeated .milvus.proto.common.KeyValuePair index_params = 6;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->index_params_size()); i < n; i++) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray(
5,
6,
this->index_params(static_cast<int>(i)),
output);
}
@ -481,7 +513,12 @@ void FieldSchema::SerializeWithCachedSizes(
1, this->name(), target);
}
// string description = 2;
// bool is_primary_key = 2;
if (this->is_primary_key() != 0) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteBoolToArray(2, this->is_primary_key(), target);
}
// string description = 3;
if (this->description().size() > 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->description().data(), static_cast<int>(this->description().length()),
@ -489,29 +526,29 @@ void FieldSchema::SerializeWithCachedSizes(
"milvus.proto.schema.FieldSchema.description");
target =
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteStringToArray(
2, this->description(), target);
3, this->description(), target);
}
// .milvus.proto.schema.DataType data_type = 3;
// .milvus.proto.schema.DataType data_type = 4;
if (this->data_type() != 0) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteEnumToArray(
3, this->data_type(), target);
4, this->data_type(), target);
}
// repeated .milvus.proto.common.KeyValuePair type_params = 4;
// repeated .milvus.proto.common.KeyValuePair type_params = 5;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->type_params_size()); i < n; i++) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::
InternalWriteMessageToArray(
4, this->type_params(static_cast<int>(i)), target);
5, this->type_params(static_cast<int>(i)), target);
}
// repeated .milvus.proto.common.KeyValuePair index_params = 5;
// repeated .milvus.proto.common.KeyValuePair index_params = 6;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->index_params_size()); i < n; i++) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::
InternalWriteMessageToArray(
5, this->index_params(static_cast<int>(i)), target);
6, this->index_params(static_cast<int>(i)), target);
}
if (_internal_metadata_.have_unknown_fields()) {
@ -535,7 +572,7 @@ size_t FieldSchema::ByteSizeLong() const {
// Prevent compiler warnings about cached_has_bits being unused
(void) cached_has_bits;
// repeated .milvus.proto.common.KeyValuePair type_params = 4;
// repeated .milvus.proto.common.KeyValuePair type_params = 5;
{
unsigned int count = static_cast<unsigned int>(this->type_params_size());
total_size += 1UL * count;
@ -546,7 +583,7 @@ size_t FieldSchema::ByteSizeLong() const {
}
}
// repeated .milvus.proto.common.KeyValuePair index_params = 5;
// repeated .milvus.proto.common.KeyValuePair index_params = 6;
{
unsigned int count = static_cast<unsigned int>(this->index_params_size());
total_size += 1UL * count;
@ -564,14 +601,19 @@ size_t FieldSchema::ByteSizeLong() const {
this->name());
}
// string description = 2;
// string description = 3;
if (this->description().size() > 0) {
total_size += 1 +
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize(
this->description());
}
// .milvus.proto.schema.DataType data_type = 3;
// bool is_primary_key = 2;
if (this->is_primary_key() != 0) {
total_size += 1 + 1;
}
// .milvus.proto.schema.DataType data_type = 4;
if (this->data_type() != 0) {
total_size += 1 +
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::EnumSize(this->data_type());
@ -614,6 +656,9 @@ void FieldSchema::MergeFrom(const FieldSchema& from) {
description_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.description_);
}
if (from.is_primary_key() != 0) {
set_is_primary_key(from.is_primary_key());
}
if (from.data_type() != 0) {
set_data_type(from.data_type());
}
@ -646,6 +691,7 @@ void FieldSchema::InternalSwap(FieldSchema* other) {
GetArenaNoVirtual());
description_.Swap(&other->description_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(),
GetArenaNoVirtual());
swap(is_primary_key_, other->is_primary_key_);
swap(data_type_, other->data_type_);
}

View File

@ -225,13 +225,14 @@ class FieldSchema :
// accessors -------------------------------------------------------
enum : int {
kTypeParamsFieldNumber = 4,
kIndexParamsFieldNumber = 5,
kTypeParamsFieldNumber = 5,
kIndexParamsFieldNumber = 6,
kNameFieldNumber = 1,
kDescriptionFieldNumber = 2,
kDataTypeFieldNumber = 3,
kDescriptionFieldNumber = 3,
kIsPrimaryKeyFieldNumber = 2,
kDataTypeFieldNumber = 4,
};
// repeated .milvus.proto.common.KeyValuePair type_params = 4;
// repeated .milvus.proto.common.KeyValuePair type_params = 5;
int type_params_size() const;
void clear_type_params();
::milvus::proto::common::KeyValuePair* mutable_type_params(int index);
@ -242,7 +243,7 @@ class FieldSchema :
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair >&
type_params() const;
// repeated .milvus.proto.common.KeyValuePair index_params = 5;
// repeated .milvus.proto.common.KeyValuePair index_params = 6;
int index_params_size() const;
void clear_index_params();
::milvus::proto::common::KeyValuePair* mutable_index_params(int index);
@ -264,7 +265,7 @@ class FieldSchema :
std::string* release_name();
void set_allocated_name(std::string* name);
// string description = 2;
// string description = 3;
void clear_description();
const std::string& description() const;
void set_description(const std::string& value);
@ -275,7 +276,12 @@ class FieldSchema :
std::string* release_description();
void set_allocated_description(std::string* description);
// .milvus.proto.schema.DataType data_type = 3;
// bool is_primary_key = 2;
void clear_is_primary_key();
bool is_primary_key() const;
void set_is_primary_key(bool value);
// .milvus.proto.schema.DataType data_type = 4;
void clear_data_type();
::milvus::proto::schema::DataType data_type() const;
void set_data_type(::milvus::proto::schema::DataType value);
@ -289,6 +295,7 @@ class FieldSchema :
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair > index_params_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr name_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr description_;
bool is_primary_key_;
int data_type_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_schema_2eproto;
@ -525,7 +532,21 @@ inline void FieldSchema::set_allocated_name(std::string* name) {
// @@protoc_insertion_point(field_set_allocated:milvus.proto.schema.FieldSchema.name)
}
// string description = 2;
// bool is_primary_key = 2;
inline void FieldSchema::clear_is_primary_key() {
is_primary_key_ = false;
}
inline bool FieldSchema::is_primary_key() const {
// @@protoc_insertion_point(field_get:milvus.proto.schema.FieldSchema.is_primary_key)
return is_primary_key_;
}
inline void FieldSchema::set_is_primary_key(bool value) {
is_primary_key_ = value;
// @@protoc_insertion_point(field_set:milvus.proto.schema.FieldSchema.is_primary_key)
}
// string description = 3;
inline void FieldSchema::clear_description() {
description_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
}
@ -576,7 +597,7 @@ inline void FieldSchema::set_allocated_description(std::string* description) {
// @@protoc_insertion_point(field_set_allocated:milvus.proto.schema.FieldSchema.description)
}
// .milvus.proto.schema.DataType data_type = 3;
// .milvus.proto.schema.DataType data_type = 4;
inline void FieldSchema::clear_data_type() {
data_type_ = 0;
}
@ -590,7 +611,7 @@ inline void FieldSchema::set_data_type(::milvus::proto::schema::DataType value)
// @@protoc_insertion_point(field_set:milvus.proto.schema.FieldSchema.data_type)
}
// repeated .milvus.proto.common.KeyValuePair type_params = 4;
// repeated .milvus.proto.common.KeyValuePair type_params = 5;
inline int FieldSchema::type_params_size() const {
return type_params_.size();
}
@ -617,7 +638,7 @@ FieldSchema::type_params() const {
return type_params_;
}
// repeated .milvus.proto.common.KeyValuePair index_params = 5;
// repeated .milvus.proto.common.KeyValuePair index_params = 6;
inline int FieldSchema::index_params_size() const {
return index_params_.size();
}

View File

@ -133,6 +133,12 @@ CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) {
PanicInfo("Unsupported DSL");
}
plan->plan_node_->predicate_ = std::move(predicate);
// TODO: target_entry parser
// if schema autoid is true,
// prepend target_entries_ with row_id
// else
// with primary_key
//
return plan;
}

View File

@ -35,6 +35,7 @@ struct Plan {
const Schema& schema_;
std::unique_ptr<VectorPlanNode> plan_node_;
std::map<std::string, FieldId> tag2field_; // PlaceholderName -> FieldId
std::vector<std::string> target_entries_;
// TODO: add move extra info
};

View File

@ -116,18 +116,21 @@ QueryBruteForceImpl(const segcore::SegmentSmallIndex& segment,
segcore::merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data());
}
// step 5: convert offset to uids
results.result_distances_ = std::move(final_dis);
results.internal_seg_offsets_ = std::move(final_uids);
results.topK_ = topK;
results.num_queries_ = num_queries;
// TODO: deprecated code begin
final_uids = results.internal_seg_offsets_;
for (auto& id : final_uids) {
if (id == -1) {
continue;
}
id = record.uids_[id];
}
results.result_ids_ = std::move(final_uids);
results.result_distances_ = std::move(final_dis);
results.topK_ = topK;
results.num_queries_ = num_queries;
// TODO: deprecated code end
return Status::OK();
}

View File

@ -19,6 +19,7 @@ add_library(milvus_segcore SHARED
target_link_libraries(milvus_segcore
tbb utils pthread knowhere log milvus_proto
dl backtrace
milvus_common
milvus_query
)

View File

@ -139,27 +139,7 @@ Collection::parse() {
}
collection_name_ = collection_meta.schema().name();
// TODO: delete print
std::cout << "create collection " << collection_meta.schema().name() << std::endl;
auto schema = std::make_shared<Schema>();
for (const milvus::proto::schema::FieldSchema& child : collection_meta.schema().fields()) {
const auto& type_params = child.type_params();
int64_t dim = 16;
for (const auto& type_param : type_params) {
if (type_param.key() == "dim") {
dim = strtoll(type_param.value().c_str(), nullptr, 10);
}
}
std::cout << "add Field, name :" << child.name() << ", datatype :" << child.data_type() << ", dim :" << dim
<< std::endl;
schema->AddField(std::string_view(child.name()), DataType(child.data_type()), dim);
}
/*
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
schema->AddField("age", DataType::INT32);
*/
schema_ = schema;
schema_ = Schema::ParseFrom(collection_meta.schema());
}
} // namespace milvus::segcore

View File

@ -72,6 +72,9 @@ class SegmentBase {
int num_groups,
QueryResult& results) = 0;
virtual Status
FillTargetEntry(const query::Plan* Plan, QueryResult& results) = 0;
// stop receive insert requests
virtual Status
Close() = 0;

View File

@ -84,6 +84,11 @@ class SegmentNaive : public SegmentBase {
Status
BuildIndex(IndexMetaPtr index_meta) override;
Status
FillTargetEntry(const query::Plan* Plan, QueryResult& results) override {
PanicInfo("unimplemented");
}
Status
DropRawData(std::string_view field_name) override {
// TODO: NO-OP

View File

@ -344,5 +344,39 @@ SegmentSmallIndex::Search(const query::Plan* plan,
results = visitor.get_moved_result(*plan->plan_node_);
return Status::OK();
}
Status
SegmentSmallIndex::FillTargetEntry(const query::Plan* plan, QueryResult& results) {
AssertInfo(plan, "empty plan");
auto size = results.result_distances_.size();
Assert(results.internal_seg_offsets_.size() == size);
Assert(results.result_offsets_.size() == size);
Assert(results.row_data_.size() == 0);
if (plan->schema_.get_is_auto_id()) {
auto& uids = record_.uids_;
for (int64_t i = 0; i < size; ++i) {
auto seg_offset = results.internal_seg_offsets_[i];
auto row_id = uids[seg_offset];
std::vector<char> blob(sizeof(row_id));
memcpy(blob.data(), &row_id, sizeof(row_id));
results.row_data_.emplace_back(std::move(blob));
}
} else {
auto key_offset_opt = schema_->get_primary_key_offset();
Assert(key_offset_opt.has_value());
auto key_offset = key_offset_opt.value();
auto field_meta = schema_->operator[](key_offset);
Assert(field_meta.get_data_type() == DataType::INT64);
auto uids = record_.get_scalar_entity<int64_t>(key_offset);
for (int64_t i = 0; i < size; ++i) {
auto seg_offset = results.internal_seg_offsets_[i];
auto row_id = uids->operator[](seg_offset);
std::vector<char> blob(sizeof(row_id));
memcpy(blob.data(), &row_id, sizeof(row_id));
results.row_data_.emplace_back(std::move(blob));
}
}
return Status::OK();
}
} // namespace milvus::segcore

View File

@ -33,29 +33,6 @@
#include <memory>
namespace milvus::segcore {
// struct ColumnBasedDataChunk {
// std::vector<std::vector<float>> entity_vecs;
//
// static ColumnBasedDataChunk
// from(const RowBasedRawData& source, const Schema& schema) {
// ColumnBasedDataChunk dest;
// auto count = source.count;
// auto raw_data = reinterpret_cast<const char*>(source.raw_data);
// auto align = source.sizeof_per_row;
// for (auto& field : schema) {
// auto len = field.get_sizeof();
// Assert(len % sizeof(float) == 0);
// std::vector<float> new_col(len * count / sizeof(float));
// for (int64_t i = 0; i < count; ++i) {
// memcpy(new_col.data() + i * len / sizeof(float), raw_data + i * align, len);
// }
// dest.entity_vecs.push_back(std::move(new_col));
// // offset the raw_data
// raw_data += len / sizeof(float);
// }
// return dest;
// }
//};
class SegmentSmallIndex : public SegmentBase {
public:
@ -164,20 +141,13 @@ class SegmentSmallIndex : public SegmentBase {
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false);
// Status
// QueryBruteForceImpl(query::QueryPtr query, Timestamp timestamp, QueryResult& results);
// Status
// QueryBruteForceImpl(const query::QueryInfo& info,
// const float* query_data,
// int64_t num_queries,
// Timestamp timestamp,
// QueryResult& results);
template <typename Type>
knowhere::IndexPtr
BuildVecIndexImpl(const IndexMeta::Entry& entry);
Status
FillTargetEntry(const query::Plan* Plan, QueryResult& results) override;
private:
SchemaPtr schema_;
std::atomic<SegmentState> state_ = SegmentState::Open;

View File

@ -141,8 +141,11 @@ struct QueryResult {
QueryResult() = default;
QueryResult(uint64_t num_queries, uint64_t topK) : topK_(topK), num_queries_(num_queries) {
auto count = get_row_count();
result_ids_.resize(count);
result_distances_.resize(count);
internal_seg_offsets_.resize(count);
// TODO: deprecated
result_ids_.resize(count);
}
[[nodiscard]] uint64_t
@ -153,10 +156,24 @@ struct QueryResult {
uint64_t num_queries_;
uint64_t topK_;
// uint64_t total_row_count_; // total_row_count_ = topK * num_queries_
engine::ResultIds result_ids_; // top1, top2, ..;
engine::ResultDistances result_distances_;
// engine::DataChunkPtr data_chunk_;
// vector<tuple<Score, SegId, Offset>> data_reduced;
// vector<tuple<Score, SegId, Offset, RawData>>
// map<SegId, vector<tuple<DataOffset, ResLoc>>>
uint64_t seg_id_;
std::vector<float> result_distances_;
// TODO(gexi): utilize these field
std::vector<int64_t> internal_seg_offsets_;
std::vector<int64_t> result_offsets_;
std::vector<std::vector<char>> row_data_;
// TODO: deprecated, use row_data directly
std::vector<idx_t> result_ids_;
};
using QueryResultPtr = std::shared_ptr<QueryResult>;
} // namespace engine

View File

@ -20,6 +20,7 @@
#include "query/generated/ExecPlanNodeVisitor.h"
#include "query/PlanImpl.h"
#include "segcore/SegmentSmallIndex.h"
#include "pb/schema.pb.h"
using namespace milvus;
using namespace milvus::query;
@ -322,3 +323,81 @@ TEST(Query, ExecWihtoutPredicate) {
Json json{results};
std::cout << json.dump(2);
}
TEST(Query, FillSegment) {
namespace pb = milvus::proto;
pb::schema::CollectionSchema proto;
proto.set_name("col");
proto.set_description("asdfhsalkgfhsadg");
proto.set_autoid(true);
{
auto field = proto.add_fields();
field->set_name("fakevec");
field->set_is_primary_key(false);
field->set_description("asdgfsagf");
field->set_data_type(pb::schema::DataType::VECTOR_FLOAT);
auto param = field->add_type_params();
param->set_key("dim");
param->set_value("16");
}
{
auto field = proto.add_fields();
field->set_name("the_key");
field->set_is_primary_key(true);
field->set_description("asdgfsagf");
field->set_data_type(pb::schema::DataType::INT32);
}
auto schema = Schema::ParseFrom(proto);
auto segment = CreateSegment(schema);
int N = 100000;
auto dataset = DataGen(schema, N);
segment->PreInsert(N);
segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_);
std::string dsl = R"({
"bool": {
"must": [
{
"vector": {
"fakevec": {
"metric_type": "L2",
"params": {
"nprobe": 10
},
"query": "$0",
"topk": 5
}
}
}
]
}
})";
auto plan = CreatePlan(*schema, dsl);
auto ph_proto = CreatePlaceholderGroup(10, 16, 443);
auto ph = ParsePlaceholderGroup(plan.get(), ph_proto.SerializeAsString());
std::vector<const PlaceholderGroup*> groups = {ph.get()};
std::vector<Timestamp> timestamps = {N * 2UL};
QueryResult result;
segment->Search(plan.get(), groups.data(), timestamps.data(), 1, result);
// TODO: deprecated result_ids_
ASSERT_EQ(result.result_ids_, result.internal_seg_offsets_);
auto topk = 5;
auto num_queries = 10;
result.result_offsets_.resize(topk * num_queries);
segment->FillTargetEntry(plan.get(), result);
auto ans = result.row_data_;
ASSERT_EQ(ans.size(), topk * num_queries);
int64_t std_index = 0;
for (auto& vec : ans) {
ASSERT_EQ(vec.size(), sizeof(int64_t));
int64_t val;
memcpy(&val, vec.data(), sizeof(int64_t));
auto std_val = result.result_ids_[std_index];
ASSERT_EQ(val, std_val);
++std_index;
}
}

View File

@ -33,7 +33,8 @@ func TestMaster_CollectionTask(t *testing.T) {
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root",
MetaRootPath: "/test/root/meta",
KvRootPath: "/test/root/kv",
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},

View File

@ -32,7 +32,8 @@ func TestMaster_CreateCollection(t *testing.T) {
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root",
MetaRootPath: "/test/root/meta",
KvRootPath: "/test/root/kv",
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},

View File

@ -82,8 +82,8 @@ func Init() {
func CreateServer(ctx context.Context) (*Master, error) {
//Init(etcdAddr, kvRootPath)
etcdAddress := Params.EtcdAddress
metaRootPath := Params.EtcdRootPath
kvRootPath := Params.EtcdRootPath
metaRootPath := Params.MetaRootPath
kvRootPath := Params.KvRootPath
pulsarAddr := Params.PulsarAddress
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})

View File

@ -17,7 +17,8 @@ type ParamTable struct {
Port int
EtcdAddress string
EtcdRootPath string
MetaRootPath string
KvRootPath string
PulsarAddress string
// nodeID
@ -75,7 +76,8 @@ func (p *ParamTable) Init() {
p.initPort()
p.initEtcdAddress()
p.initEtcdRootPath()
p.initMetaRootPath()
p.initKvRootPath()
p.initPulsarAddress()
p.initProxyIDList()
@ -138,12 +140,28 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = addr
}
func (p *ParamTable) initEtcdRootPath() {
path, err := p.Load("etcd.rootpath")
func (p *ParamTable) initMetaRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
p.EtcdRootPath = path
subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
p.MetaRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initKvRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.kvSubPath")
if err != nil {
panic(err)
}
p.KvRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initTopicNum() {

View File

@ -22,10 +22,16 @@ func TestParamTable_Port(t *testing.T) {
assert.Equal(t, port, 53100)
}
func TestParamTable_EtcdRootPath(t *testing.T) {
func TestParamTable_MetaRootPath(t *testing.T) {
Params.Init()
addr := Params.EtcdRootPath
assert.Equal(t, addr, "by-dev")
path := Params.MetaRootPath
assert.Equal(t, path, "by-dev/meta")
}
func TestParamTable_KVRootPath(t *testing.T) {
Params.Init()
path := Params.KvRootPath
assert.Equal(t, path, "by-dev/kv")
}
func TestParamTable_TopicNum(t *testing.T) {

View File

@ -35,7 +35,8 @@ func TestMaster_Partition(t *testing.T) {
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root",
MetaRootPath: "/test/root/meta",
KvRootPath: "/test/root/kv",
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},

View File

@ -236,7 +236,8 @@ func startupMaster() {
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: rootPath,
MetaRootPath: "/test/root/meta",
KvRootPath: "/test/root/kv",
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},

View File

@ -75,7 +75,6 @@ func (ttBarrier *softTimeTickBarrier) Start() error {
log.Printf("[softTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.PeerID)
continue
}
if ttmsg.Timestamp > oldT {
ttBarrier.peer2LastTt[ttmsg.PeerID] = ttmsg.Timestamp
@ -85,7 +84,6 @@ func (ttBarrier *softTimeTickBarrier) Start() error {
if ttBarrier.lastTt != 0 && ttBarrier.minTtInterval > ts-Timestamp(lastTt) {
continue
}
ttBarrier.outTt <- ts
}
}

View File

@ -30,10 +30,11 @@ enum DataType {
*/
message FieldSchema {
string name = 1;
string description = 2;
DataType data_type = 3;
repeated common.KeyValuePair type_params = 4;
repeated common.KeyValuePair index_params = 5;
bool is_primary_key = 2;
string description = 3;
DataType data_type = 4;
repeated common.KeyValuePair type_params = 5;
repeated common.KeyValuePair index_params = 6;
}
/**

View File

@ -79,10 +79,11 @@ func (DataType) EnumDescriptor() ([]byte, []int) {
// @brief Field schema
type FieldSchema struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"`
DataType DataType `protobuf:"varint,3,opt,name=data_type,json=dataType,proto3,enum=milvus.proto.schema.DataType" json:"data_type,omitempty"`
TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,4,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"`
IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,5,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"`
IsPrimaryKey bool `protobuf:"varint,2,opt,name=is_primary_key,json=isPrimaryKey,proto3" json:"is_primary_key,omitempty"`
Description string `protobuf:"bytes,3,opt,name=description,proto3" json:"description,omitempty"`
DataType DataType `protobuf:"varint,4,opt,name=data_type,json=dataType,proto3,enum=milvus.proto.schema.DataType" json:"data_type,omitempty"`
TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,5,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"`
IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,6,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -120,6 +121,13 @@ func (m *FieldSchema) GetName() string {
return ""
}
func (m *FieldSchema) GetIsPrimaryKey() bool {
if m != nil {
return m.IsPrimaryKey
}
return false
}
func (m *FieldSchema) GetDescription() string {
if m != nil {
return m.Description
@ -222,31 +230,34 @@ func init() {
func init() { proto.RegisterFile("schema.proto", fileDescriptor_1c5fb4d8cc22d66a) }
var fileDescriptor_1c5fb4d8cc22d66a = []byte{
// 416 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x91, 0x5f, 0x8b, 0xd4, 0x30,
0x14, 0xc5, 0xed, 0xfc, 0x63, 0xe6, 0x76, 0x94, 0x18, 0x45, 0x06, 0x41, 0xa8, 0xfb, 0x34, 0x08,
0xb6, 0x38, 0x2b, 0xcb, 0xe2, 0x93, 0xdb, 0xed, 0xac, 0x14, 0x87, 0x76, 0xc9, 0xd6, 0x05, 0x7d,
0x19, 0xd2, 0x26, 0x3a, 0x81, 0xf4, 0x0f, 0x6d, 0x2a, 0xce, 0x7e, 0x03, 0x1f, 0x7d, 0xf5, 0xd3,
0x4a, 0xd2, 0x0e, 0xac, 0x30, 0x0f, 0x82, 0x6f, 0xbf, 0x7b, 0x6f, 0xce, 0x4d, 0xce, 0x09, 0xcc,
0x9b, 0x6c, 0xc7, 0x73, 0xea, 0x56, 0x75, 0xa9, 0x4a, 0xfc, 0x24, 0x17, 0xf2, 0x7b, 0xdb, 0x74,
0x95, 0xdb, 0x8d, 0x9e, 0xcf, 0xb3, 0x32, 0xcf, 0xcb, 0xa2, 0x6b, 0x9e, 0xfc, 0x1c, 0x80, 0x7d,
0x25, 0xb8, 0x64, 0x37, 0x66, 0x8a, 0x31, 0x8c, 0x0a, 0x9a, 0xf3, 0x85, 0xe5, 0x58, 0xcb, 0x19,
0x31, 0x8c, 0x1d, 0xb0, 0x19, 0x6f, 0xb2, 0x5a, 0x54, 0x4a, 0x94, 0xc5, 0x62, 0x60, 0x46, 0xf7,
0x5b, 0xf8, 0x1d, 0xcc, 0x18, 0x55, 0x74, 0xab, 0xf6, 0x15, 0x5f, 0x0c, 0x1d, 0x6b, 0xf9, 0x68,
0xf5, 0xc2, 0x3d, 0x72, 0xb9, 0x1b, 0x50, 0x45, 0x93, 0x7d, 0xc5, 0xc9, 0x94, 0xf5, 0x84, 0x7d,
0xb0, 0xb5, 0x6c, 0x5b, 0xd1, 0x9a, 0xe6, 0xcd, 0x62, 0xe4, 0x0c, 0x97, 0xf6, 0xea, 0xe5, 0xdf,
0xea, 0xfe, 0xc9, 0x1f, 0xf9, 0xfe, 0x96, 0xca, 0x96, 0x5f, 0x53, 0x51, 0x13, 0xd0, 0xaa, 0x6b,
0x23, 0xc2, 0x01, 0xcc, 0x45, 0xc1, 0xf8, 0x8f, 0xc3, 0x92, 0xf1, 0xbf, 0x2e, 0xb1, 0x8d, 0xac,
0xdb, 0x72, 0xf2, 0xdb, 0x02, 0x74, 0x59, 0x4a, 0xc9, 0x33, 0x6d, 0xea, 0xbf, 0x02, 0x79, 0x06,
0x13, 0xda, 0xaa, 0x32, 0x0c, 0x4c, 0x1a, 0x53, 0xd2, 0x57, 0xf8, 0x1c, 0x26, 0x5f, 0x75, 0xda,
0x07, 0x9f, 0xce, 0xd1, 0x94, 0xee, 0x7d, 0x08, 0xe9, 0xcf, 0xbf, 0xfa, 0x65, 0xc1, 0xf4, 0x90,
0x1e, 0x9e, 0xc2, 0x28, 0x8a, 0xa3, 0x35, 0x7a, 0xa0, 0xc9, 0x8f, 0xe3, 0x0d, 0xb2, 0x34, 0x85,
0x51, 0x72, 0x8e, 0x06, 0x78, 0x06, 0xe3, 0x30, 0x4a, 0xde, 0x9c, 0xa1, 0x61, 0x8f, 0xa7, 0x2b,
0x34, 0xea, 0xf1, 0xec, 0x2d, 0x1a, 0x6b, 0xbc, 0xda, 0xc4, 0x17, 0x09, 0x02, 0x0c, 0x30, 0x09,
0xe2, 0x4f, 0xfe, 0x66, 0x8d, 0x6c, 0xcd, 0x37, 0x09, 0x09, 0xa3, 0x0f, 0xe8, 0x29, 0x7e, 0x0c,
0x0f, 0x6f, 0xd7, 0x97, 0x49, 0x4c, 0xb6, 0x7e, 0x18, 0x5d, 0x90, 0xcf, 0x88, 0x61, 0x04, 0xf3,
0xbe, 0xd5, 0x89, 0xb9, 0xef, 0x7f, 0x79, 0xff, 0x4d, 0xa8, 0x5d, 0x9b, 0xea, 0x6c, 0xbd, 0x3b,
0x21, 0xa5, 0xb8, 0x53, 0x3c, 0xdb, 0x79, 0x9d, 0xa9, 0xd7, 0x4c, 0x34, 0xaa, 0x16, 0x69, 0xab,
0x38, 0xf3, 0x44, 0xa1, 0x78, 0x5d, 0x50, 0xe9, 0x19, 0xa7, 0x5e, 0xe7, 0xb4, 0x4a, 0xd3, 0x89,
0xa9, 0x4f, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x74, 0xb3, 0xe8, 0x78, 0xba, 0x02, 0x00, 0x00,
// 451 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xdf, 0x6a, 0xd4, 0x40,
0x18, 0xc5, 0xcd, 0xfe, 0x09, 0xd9, 0x2f, 0xb1, 0x8c, 0xa3, 0x48, 0x10, 0x84, 0x58, 0xbc, 0x58,
0x04, 0x37, 0xb8, 0x95, 0x52, 0xbc, 0xb2, 0xe9, 0x6e, 0x25, 0x74, 0x49, 0x96, 0x69, 0x2c, 0xe8,
0x4d, 0x98, 0x4d, 0x46, 0x77, 0x30, 0xff, 0x48, 0x26, 0x62, 0xfa, 0x16, 0xde, 0xfa, 0x12, 0xbe,
0xa2, 0xcc, 0x24, 0x85, 0xaa, 0x45, 0xbc, 0xfb, 0x7d, 0xdf, 0xcc, 0x39, 0xcc, 0x39, 0x03, 0x56,
0x93, 0xec, 0x59, 0x4e, 0x17, 0x55, 0x5d, 0x8a, 0x12, 0x3f, 0xcc, 0x79, 0xf6, 0xb5, 0x6d, 0xfa,
0x69, 0xd1, 0x1f, 0x3d, 0xb1, 0x92, 0x32, 0xcf, 0xcb, 0xa2, 0x5f, 0x1e, 0xfe, 0x1c, 0x81, 0x79,
0xce, 0x59, 0x96, 0x5e, 0xaa, 0x53, 0x8c, 0x61, 0x52, 0xd0, 0x9c, 0xd9, 0x9a, 0xa3, 0xcd, 0x67,
0x44, 0x31, 0x7e, 0x0e, 0x07, 0xbc, 0x89, 0xab, 0x9a, 0xe7, 0xb4, 0xee, 0xe2, 0x2f, 0xac, 0xb3,
0x47, 0x8e, 0x36, 0x37, 0x88, 0xc5, 0x9b, 0x6d, 0xbf, 0xbc, 0x60, 0x1d, 0x76, 0xc0, 0x4c, 0x59,
0x93, 0xd4, 0xbc, 0x12, 0xbc, 0x2c, 0xec, 0xb1, 0x32, 0xb8, 0xbd, 0xc2, 0x6f, 0x60, 0x96, 0x52,
0x41, 0x63, 0xd1, 0x55, 0xcc, 0x9e, 0x38, 0xda, 0xfc, 0x60, 0xf9, 0x74, 0x71, 0xc7, 0x13, 0x17,
0x2b, 0x2a, 0x68, 0xd4, 0x55, 0x8c, 0x18, 0xe9, 0x40, 0xd8, 0x03, 0x53, 0xca, 0xe2, 0x8a, 0xd6,
0x34, 0x6f, 0xec, 0xa9, 0x33, 0x9e, 0x9b, 0xcb, 0x67, 0xbf, 0xab, 0x87, 0x60, 0x17, 0xac, 0xbb,
0xa2, 0x59, 0xcb, 0xb6, 0x94, 0xd7, 0x04, 0xa4, 0x6a, 0xab, 0x44, 0x78, 0x05, 0x16, 0x2f, 0x52,
0xf6, 0xed, 0xc6, 0x44, 0xff, 0x5f, 0x13, 0x53, 0xc9, 0x7a, 0x97, 0xc3, 0x1f, 0x1a, 0xa0, 0xb3,
0x32, 0xcb, 0x58, 0x22, 0x43, 0xfd, 0xa3, 0xb6, 0x3f, 0x0a, 0x19, 0xfd, 0x5d, 0xc8, 0x63, 0xd0,
0x69, 0x2b, 0x4a, 0x7f, 0xa5, 0xda, 0x32, 0xc8, 0x30, 0xe1, 0x13, 0xd0, 0x3f, 0xc9, 0x3f, 0x69,
0xec, 0x89, 0x7a, 0xa2, 0x73, 0x67, 0x4b, 0xb7, 0xbe, 0x8d, 0x0c, 0xf7, 0x5f, 0x7c, 0xd7, 0xc0,
0xb8, 0x69, 0x0f, 0x1b, 0x30, 0x09, 0xc2, 0x60, 0x8d, 0xee, 0x49, 0xf2, 0xc2, 0x70, 0x83, 0x34,
0x49, 0x7e, 0x10, 0x9d, 0xa0, 0x11, 0x9e, 0xc1, 0xd4, 0x0f, 0xa2, 0x57, 0xc7, 0x68, 0x3c, 0xe0,
0xd1, 0x12, 0x4d, 0x06, 0x3c, 0x7e, 0x8d, 0xa6, 0x12, 0xcf, 0x37, 0xe1, 0x69, 0x84, 0x00, 0x03,
0xe8, 0xab, 0xf0, 0xbd, 0xb7, 0x59, 0x23, 0x53, 0xf2, 0x65, 0x44, 0xfc, 0xe0, 0x1d, 0x7a, 0x84,
0x1f, 0xc0, 0xfd, 0xab, 0xf5, 0x59, 0x14, 0x92, 0xd8, 0xf3, 0x83, 0x53, 0xf2, 0x01, 0xa5, 0x18,
0x81, 0x35, 0xac, 0x7a, 0x31, 0xf3, 0xbc, 0x8f, 0x6f, 0x3f, 0x73, 0xb1, 0x6f, 0x77, 0xb2, 0x5b,
0xf7, 0x9a, 0x67, 0x19, 0xbf, 0x16, 0x2c, 0xd9, 0xbb, 0x7d, 0xa8, 0x97, 0x29, 0x6f, 0x44, 0xcd,
0x77, 0xad, 0x60, 0xa9, 0xcb, 0x0b, 0xc1, 0xea, 0x82, 0x66, 0xae, 0x4a, 0xea, 0xf6, 0x49, 0xab,
0xdd, 0x4e, 0x57, 0xf3, 0xd1, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0xa6, 0xc9, 0x55, 0xe0,
0x02, 0x00, 0x00,
}

View File

@ -4,10 +4,8 @@ import (
"context"
"errors"
"log"
"strconv"
"time"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
@ -122,8 +120,8 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
qt := &QueryTask{
Condition: NewTaskCondition(ctx),
SearchRequest: internalpb.SearchRequest{
MsgType: internalpb.MsgType_kSearch,
Query: &commonpb.Blob{},
ProxyID: Params.ProxyID(),
ResultChannelID: Params.ProxyID(),
},
queryMsgStream: p.queryMsgStream,
resultBuf: make(chan []*internalpb.SearchResult),
@ -131,11 +129,6 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
}
var cancel func()
qt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
// Hack with test, shit here but no other ways
reqID, _ := strconv.Atoi(req.CollectionName[len(req.CollectionName)-1:])
qt.ReqID = int64(reqID)
queryBytes, _ := proto.Marshal(req)
qt.SearchRequest.Query.Value = queryBytes
log.Printf("grpc address of query task: %p", qt)
defer cancel()

View File

@ -37,13 +37,13 @@ var testNum = 10
func startMaster(ctx context.Context) {
master.Init()
etcdAddr := master.Params.EtcdAddress
rootPath := master.Params.EtcdRootPath
metaRootPath := master.Params.MetaRootPath
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
if err != nil {
panic(err)
}
_, err = etcdCli.Delete(context.TODO(), rootPath, clientv3.WithPrefix())
_, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix())
if err != nil {
panic(err)
}

View File

@ -334,6 +334,17 @@ func (qt *QueryTask) PreExecute() error {
return err
}
}
qt.MsgType = internalpb.MsgType_kSearch
if qt.query.PartitionTags == nil || len(qt.query.PartitionTags) <= 0 {
qt.query.PartitionTags = []string{Params.defaultPartitionTag()}
}
queryBytes, err := proto.Marshal(qt.query)
if err != nil {
return err
}
qt.Query = &commonpb.Blob{
Value: queryBytes,
}
return nil
}
@ -341,6 +352,7 @@ func (qt *QueryTask) Execute() error {
var tsMsg msgstream.TsMsg = &msgstream.SearchMsg{
SearchRequest: qt.SearchRequest,
BaseMsg: msgstream.BaseMsg{
HashValues: []int32{int32(Params.ProxyID())},
BeginTimestamp: qt.Timestamp,
EndTimestamp: qt.Timestamp,
},
@ -351,8 +363,12 @@ func (qt *QueryTask) Execute() error {
Msgs: make([]msgstream.TsMsg, 1),
}
msgPack.Msgs[0] = tsMsg
qt.queryMsgStream.Produce(msgPack)
return nil
err := qt.queryMsgStream.Produce(msgPack)
log.Printf("[Proxy] length of searchMsg: %v", len(msgPack.Msgs))
if err != nil {
log.Printf("[Proxy] send search request failed: %v", err)
}
return err
}
func (qt *QueryTask) PostExecute() error {
@ -367,35 +383,41 @@ func (qt *QueryTask) PostExecute() error {
qt.result = &servicepb.QueryResult{}
return nil
}
n := len(searchResults[0].Hits) // n
if n <= 0 {
qt.result = &servicepb.QueryResult{}
return nil
}
var hits [][]*servicepb.Hits = make([][]*servicepb.Hits, rlen)
for i, sr := range searchResults {
hits := make([][]*servicepb.Hits, rlen)
for i, searchResult := range searchResults {
hits[i] = make([]*servicepb.Hits, n)
for j, bs := range sr.Hits {
for j, bs := range searchResult.Hits {
hits[i][j] = &servicepb.Hits{}
err := proto.Unmarshal(bs, hits[i][j])
if err != nil {
return err
}
}
}
k := len(hits[0][0].IDs)
queryResult := &servicepb.QueryResult{
qt.result = &servicepb.QueryResult{
Status: &commonpb.Status{
ErrorCode: 0,
},
Hits: make([][]byte, 0),
}
// reduce by score, TODO: use better algorithm
// use merge-sort here, the number of ways to merge is `rlen`
// in this process, we must make sure:
// len(queryResult.Hits) == n
// len(queryResult.Hits[i].Ids) == k for i in range(n)
for i := 0; i < n; n++ { // n
for i := 0; i < n; i++ { // n
locs := make([]int, rlen)
reducedHits := &servicepb.Hits{}
reducedHits := &servicepb.Hits{
IDs: make([]int64, 0),
RowData: make([][]byte, 0),
Scores: make([]float32, 0),
}
for j := 0; j < k; j++ { // k
choice, minDistance := 0, float32(math.MaxFloat32)
for q, loc := range locs { // query num, the number of ways to merge
@ -407,7 +429,9 @@ func (qt *QueryTask) PostExecute() error {
}
choiceOffset := locs[choice]
reducedHits.IDs = append(reducedHits.IDs, hits[choice][i].IDs[choiceOffset])
reducedHits.RowData = append(reducedHits.RowData, hits[choice][i].RowData[choiceOffset])
if hits[choice][i].RowData != nil && len(hits[choice][i].RowData) > 0 {
reducedHits.RowData = append(reducedHits.RowData, hits[choice][i].RowData[choiceOffset])
}
reducedHits.Scores = append(reducedHits.Scores, hits[choice][i].Scores[choiceOffset])
locs[choice]++
}
@ -415,12 +439,11 @@ func (qt *QueryTask) PostExecute() error {
if err != nil {
return err
}
queryResult.Hits = append(queryResult.Hits, reducedHitsBs)
qt.result.Hits = append(qt.result.Hits, reducedHitsBs)
}
qt.result = queryResult
return nil
}
}
//return nil
}
type HasCollectionTask struct {

View File

@ -293,7 +293,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
}
q.AddActiveTask(t)
log.Printf("query task add to active list ...")
log.Printf("task add to active list ...")
defer func() {
q.PopActiveTask(t.EndTs())
log.Printf("pop from active list ...")
@ -304,8 +304,9 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
log.Printf("execute definition task failed, error = %v", err)
return
}
log.Printf("scheduler task done ...")
log.Printf("task execution done ...")
err = t.PostExecute()
log.Printf("post execute task done ...")
}
func (sched *TaskScheduler) definitionLoop() {
@ -437,6 +438,6 @@ func (sched *TaskScheduler) Close() {
func (sched *TaskScheduler) TaskDoneTest(ts Timestamp) bool {
ddTaskDone := sched.DdQueue.TaskDoneTest(ts)
dmTaskDone := sched.DmQueue.TaskDoneTest(ts)
dqTaskDone := sched.DqQueue.TaskDoneTest(ts)
return ddTaskDone && dmTaskDone && dqTaskDone
//dqTaskDone := sched.DqQueue.TaskDoneTest(ts)
return ddTaskDone && dmTaskDone && true
}

View File

@ -56,7 +56,6 @@ func newTimeTick(ctx context.Context,
}
func (tt *timeTick) tick() error {
if tt.lastTick == tt.currentTick {
ts, err := tt.tsoAllocator.AllocOne()
if err != nil {
@ -84,7 +83,7 @@ func (tt *timeTick) tick() error {
if err != nil {
log.Printf("proxy send time tick error: %v", err)
} else {
log.Printf("proxy send time tick message")
//log.Printf("proxy send time tick message")
}
tt.lastTick = tt.currentTick
return nil

View File

@ -19,8 +19,9 @@ func TestCollectionReplica_getCollectionNum(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -30,8 +31,9 @@ func TestCollectionReplica_getCollectionNum(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -41,7 +43,8 @@ func TestCollectionReplica_getCollectionNum(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -72,8 +75,9 @@ func TestCollectionReplica_addCollection(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -83,8 +87,9 @@ func TestCollectionReplica_addCollection(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -94,7 +99,8 @@ func TestCollectionReplica_addCollection(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -130,8 +136,9 @@ func TestCollectionReplica_removeCollection(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -141,8 +148,9 @@ func TestCollectionReplica_removeCollection(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -152,7 +160,8 @@ func TestCollectionReplica_removeCollection(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -192,8 +201,9 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -203,8 +213,9 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -214,7 +225,8 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -256,8 +268,9 @@ func TestCollectionReplica_getCollectionByName(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -267,8 +280,9 @@ func TestCollectionReplica_getCollectionByName(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -278,7 +292,8 @@ func TestCollectionReplica_getCollectionByName(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -320,8 +335,9 @@ func TestCollectionReplica_hasCollection(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -331,8 +347,9 @@ func TestCollectionReplica_hasCollection(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -342,7 +359,8 @@ func TestCollectionReplica_hasCollection(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -378,8 +396,9 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -389,8 +408,9 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -400,7 +420,8 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -449,8 +470,9 @@ func TestCollectionReplica_addPartition(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -460,8 +482,9 @@ func TestCollectionReplica_addPartition(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -471,7 +494,8 @@ func TestCollectionReplica_addPartition(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -517,8 +541,9 @@ func TestCollectionReplica_removePartition(t *testing.T) {
collectionID := UniqueID(0)
partitionTag := "default"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -528,8 +553,9 @@ func TestCollectionReplica_removePartition(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -539,7 +565,8 @@ func TestCollectionReplica_removePartition(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -586,8 +613,9 @@ func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -597,8 +625,9 @@ func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -608,7 +637,8 @@ func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -659,8 +689,9 @@ func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -670,8 +701,9 @@ func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -681,7 +713,8 @@ func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -732,8 +765,9 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -743,8 +777,9 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -754,7 +789,8 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -800,8 +836,9 @@ func TestCollectionReplica_hasPartition(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -811,8 +848,9 @@ func TestCollectionReplica_hasPartition(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -822,7 +860,8 @@ func TestCollectionReplica_hasPartition(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -867,8 +906,9 @@ func TestCollectionReplica_addSegment(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -878,8 +918,9 @@ func TestCollectionReplica_addSegment(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -889,7 +930,8 @@ func TestCollectionReplica_addSegment(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -938,8 +980,9 @@ func TestCollectionReplica_removeSegment(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -949,8 +992,9 @@ func TestCollectionReplica_removeSegment(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -960,7 +1004,8 @@ func TestCollectionReplica_removeSegment(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -1011,8 +1056,9 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -1022,8 +1068,9 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -1033,7 +1080,8 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -1082,8 +1130,9 @@ func TestCollectionReplica_hasSegment(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -1093,8 +1142,9 @@ func TestCollectionReplica_hasSegment(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -1104,7 +1154,8 @@ func TestCollectionReplica_hasSegment(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -1157,8 +1208,9 @@ func TestCollectionReplica_freeAll(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -1168,8 +1220,9 @@ func TestCollectionReplica_freeAll(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -1179,7 +1232,8 @@ func TestCollectionReplica_freeAll(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -17,8 +17,9 @@ func TestCollection_Partitions(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -28,8 +29,9 @@ func TestCollection_Partitions(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -39,7 +41,8 @@ func TestCollection_Partitions(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -77,8 +80,9 @@ func TestCollection_Partitions(t *testing.T) {
func TestCollection_newCollection(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -88,8 +92,9 @@ func TestCollection_newCollection(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -99,7 +104,8 @@ func TestCollection_newCollection(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -123,8 +129,9 @@ func TestCollection_newCollection(t *testing.T) {
func TestCollection_deleteCollection(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -134,8 +141,9 @@ func TestCollection_deleteCollection(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -145,7 +153,8 @@ func TestCollection_deleteCollection(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -38,8 +38,9 @@ func TestDataSyncService_Start(t *testing.T) {
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -49,8 +50,9 @@ func TestDataSyncService_Start(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -60,7 +62,8 @@ func TestDataSyncService_Start(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -14,7 +14,7 @@ func (stNode *serviceTimeNode) Name() string {
}
func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
// fmt.Println("Do serviceTimeNode operation")
//fmt.Println("Do serviceTimeNode operation")
if len(in) != 1 {
log.Println("Invalid operate message input in serviceTimeNode, input length = ", len(in))
@ -29,6 +29,7 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
// update service time
(*(*stNode.replica).getTSafe()).set(serviceTimeMsg.timeRange.timestampMax)
//fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax))
return nil
}

View File

@ -31,7 +31,7 @@ type metaService struct {
func newMetaService(ctx context.Context, replica *collectionReplica) *metaService {
ETCDAddr := Params.etcdAddress()
ETCDRootPath := Params.etcdRootPath()
MetaRootPath := Params.metaRootPath()
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{ETCDAddr},
@ -40,7 +40,7 @@ func newMetaService(ctx context.Context, replica *collectionReplica) *metaServic
return &metaService{
ctx: ctx,
kvBase: kv.NewEtcdKV(cli, ETCDRootPath),
kvBase: kv.NewEtcdKV(cli, MetaRootPath),
replica: replica,
}
}
@ -71,21 +71,21 @@ func (mService *metaService) start() {
}
func GetCollectionObjID(key string) string {
ETCDRootPath := Params.etcdRootPath()
ETCDRootPath := Params.metaRootPath()
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
return strings.TrimPrefix(key, prefix)
}
func GetSegmentObjID(key string) string {
ETCDRootPath := Params.etcdRootPath()
ETCDRootPath := Params.metaRootPath()
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
return strings.TrimPrefix(key, prefix)
}
func isCollectionObj(key string) bool {
ETCDRootPath := Params.etcdRootPath()
ETCDRootPath := Params.metaRootPath()
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
prefix = strings.TrimSpace(prefix)
@ -95,7 +95,7 @@ func isCollectionObj(key string) bool {
}
func isSegmentObj(key string) bool {
ETCDRootPath := Params.etcdRootPath()
ETCDRootPath := Params.metaRootPath()
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
prefix = strings.TrimSpace(prefix)

View File

@ -64,24 +64,24 @@ func TestMetaService_getSegmentObjId(t *testing.T) {
}
func TestMetaService_isCollectionObj(t *testing.T) {
var key = "by-dev/collection/collection0"
var key = "by-dev/meta/collection/collection0"
var b1 = isCollectionObj(key)
assert.Equal(t, b1, true)
key = "by-dev/segment/segment0"
key = "by-dev/meta/segment/segment0"
var b2 = isCollectionObj(key)
assert.Equal(t, b2, false)
}
func TestMetaService_isSegmentObj(t *testing.T) {
var key = "by-dev/segment/segment0"
var key = "by-dev/meta/segment/segment0"
var b1 = isSegmentObj(key)
assert.Equal(t, b1, true)
key = "by-dev/collection/collection0"
key = "by-dev/meta/collection/collection0"
var b2 = isSegmentObj(key)
assert.Equal(t, b2, false)
@ -120,8 +120,9 @@ func TestMetaService_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T)
func TestMetaService_printCollectionStruct(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -131,8 +132,9 @@ func TestMetaService_printCollectionStruct(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -142,7 +144,8 @@ func TestMetaService_printCollectionStruct(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -228,8 +231,9 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -239,8 +243,9 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -250,7 +255,8 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -295,7 +301,7 @@ func TestMetaService_processCreate(t *testing.T) {
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0"
key1 := "by-dev/meta/collection/0"
msg1 := `schema: <
name: "test"
fields: <
@ -327,7 +333,7 @@ func TestMetaService_processCreate(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
key2 := "by-dev/segment/0"
key2 := "by-dev/meta/segment/0"
msg2 := `partition_tag: "default"
channel_start: 0
channel_end: 1
@ -351,8 +357,9 @@ func TestMetaService_processSegmentModify(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -362,8 +369,9 @@ func TestMetaService_processSegmentModify(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -373,7 +381,8 @@ func TestMetaService_processSegmentModify(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -529,7 +538,7 @@ func TestMetaService_processModify(t *testing.T) {
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0"
key1 := "by-dev/meta/collection/0"
msg1 := `schema: <
name: "test"
fields: <
@ -576,7 +585,7 @@ func TestMetaService_processModify(t *testing.T) {
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, false)
key2 := "by-dev/segment/0"
key2 := "by-dev/meta/segment/0"
msg2 := `partition_tag: "p1"
channel_start: 0
channel_end: 1
@ -656,8 +665,9 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -667,8 +677,9 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -678,7 +689,8 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -772,7 +784,7 @@ func TestMetaService_processDelete(t *testing.T) {
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0"
key1 := "by-dev/meta/collection/0"
msg1 := `schema: <
name: "test"
fields: <
@ -804,7 +816,7 @@ func TestMetaService_processDelete(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
key2 := "by-dev/segment/0"
key2 := "by-dev/meta/segment/0"
msg2 := `partition_tag: "default"
channel_start: 0
channel_end: 1

View File

@ -199,12 +199,16 @@ func (p *ParamTable) etcdAddress() string {
return etcdAddress
}
func (p *ParamTable) etcdRootPath() string {
etcdRootPath, err := p.Load("etcd.rootpath")
func (p *ParamTable) metaRootPath() string {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
return etcdRootPath
subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
return rootPath + "/" + subPath
}
func (p *ParamTable) gracefulTime() int64 {

View File

@ -120,3 +120,9 @@ func TestParamTable_statsChannelName(t *testing.T) {
name := Params.statsChannelName()
assert.Equal(t, name, "query-node-stats")
}
func TestParamTable_metaRootPath(t *testing.T) {
Params.Init()
path := Params.metaRootPath()
assert.Equal(t, path, "by-dev/meta")
}

View File

@ -17,8 +17,9 @@ func TestPartition_Segments(t *testing.T) {
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -28,8 +29,9 @@ func TestPartition_Segments(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -39,7 +41,8 @@ func TestPartition_Segments(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -16,8 +16,9 @@ import (
func TestPlan_Plan(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -27,8 +28,9 @@ func TestPlan_Plan(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -38,7 +40,8 @@ func TestPlan_Plan(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -71,8 +74,9 @@ func TestPlan_Plan(t *testing.T) {
func TestPlan_PlaceholderGroup(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -82,8 +86,9 @@ func TestPlan_PlaceholderGroup(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -93,7 +98,8 @@ func TestPlan_PlaceholderGroup(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -17,8 +17,9 @@ import (
func TestReduce_AllFunc(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -28,8 +29,9 @@ func TestReduce_AllFunc(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -39,7 +41,8 @@ func TestReduce_AllFunc(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -4,11 +4,10 @@ import "C"
import (
"context"
"errors"
"github.com/golang/protobuf/proto"
"log"
"sync"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
@ -146,7 +145,7 @@ func (ss *searchService) receiveSearchMsg() {
log.Println("publish FailedSearchResult failed, error message: ", err)
}
}
log.Println("Do search done, num of searchMsg = ", len(searchMsg))
log.Println("ReceiveSearchMsg, do search done, num of searchMsg = ", len(searchMsg))
}
}
}
@ -173,17 +172,16 @@ func (ss *searchService) doUnsolvedMsgSearch() {
}
for {
msgBufferLength := len(ss.msgBuffer)
if msgBufferLength <= 0 {
break
}
msg := <-ss.msgBuffer
if msg.EndTs() <= serviceTime {
searchMsg = append(searchMsg, msg)
continue
}
ss.unsolvedMsg = append(ss.unsolvedMsg, msg)
msgBufferLength := len(ss.msgBuffer)
if msgBufferLength <= 0 {
break
}
}
if len(searchMsg) <= 0 {
@ -199,7 +197,7 @@ func (ss *searchService) doUnsolvedMsgSearch() {
log.Println("publish FailedSearchResult failed, error message: ", err)
}
}
log.Println("Do search done, num of searchMsg = ", len(searchMsg))
log.Println("doUnsolvedMsgSearch, do search done, num of searchMsg = ", len(searchMsg))
}
}
}
@ -241,7 +239,10 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
return err
}
for _, segment := range partition.segments {
//fmt.Println("dsl = ", dsl)
searchResult, err := segment.segmentSearch(plan, placeholderGroups, []Timestamp{searchTimestamp})
if err != nil {
return err
}
@ -249,6 +250,15 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
}
}
if len(searchResults) <= 0 {
log.Println("search Failed, invalid partitionTag")
err = ss.publishFailedSearchResult(msg)
if err != nil {
log.Println("publish FailedSearchResult failed, error message: ", err)
}
return err
}
reducedSearchResult := reduceSearchResults(searchResults, int64(len(searchResults)))
marshaledHits := reducedSearchResult.reorganizeQueryResults(plan, placeholderGroups)
hitsBlob, err := marshaledHits.getHitsBlob()

View File

@ -30,8 +30,9 @@ func TestSearch_Search(t *testing.T) {
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -41,8 +42,9 @@ func TestSearch_Search(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -52,7 +54,8 @@ func TestSearch_Search(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -20,8 +20,9 @@ import (
//-------------------------------------------------------------------------------------- constructor and destructor
func TestSegment_newSegment(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -31,8 +32,9 @@ func TestSegment_newSegment(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -42,7 +44,8 @@ func TestSegment_newSegment(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -72,8 +75,9 @@ func TestSegment_newSegment(t *testing.T) {
func TestSegment_deleteSegment(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -83,8 +87,9 @@ func TestSegment_deleteSegment(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -94,7 +99,8 @@ func TestSegment_deleteSegment(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -126,8 +132,9 @@ func TestSegment_deleteSegment(t *testing.T) {
//-------------------------------------------------------------------------------------- stats functions
func TestSegment_getRowCount(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -137,8 +144,9 @@ func TestSegment_getRowCount(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -148,7 +156,8 @@ func TestSegment_getRowCount(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -211,8 +220,9 @@ func TestSegment_getRowCount(t *testing.T) {
func TestSegment_getDeletedCount(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -222,8 +232,9 @@ func TestSegment_getDeletedCount(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -233,7 +244,8 @@ func TestSegment_getDeletedCount(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -302,8 +314,9 @@ func TestSegment_getDeletedCount(t *testing.T) {
func TestSegment_getMemSize(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -313,8 +326,9 @@ func TestSegment_getMemSize(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -324,7 +338,8 @@ func TestSegment_getMemSize(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -388,8 +403,9 @@ func TestSegment_getMemSize(t *testing.T) {
//-------------------------------------------------------------------------------------- dm & search functions
func TestSegment_segmentInsert(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -399,8 +415,9 @@ func TestSegment_segmentInsert(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -410,7 +427,8 @@ func TestSegment_segmentInsert(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -469,8 +487,9 @@ func TestSegment_segmentInsert(t *testing.T) {
func TestSegment_segmentDelete(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -480,8 +499,9 @@ func TestSegment_segmentDelete(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -491,7 +511,8 @@ func TestSegment_segmentDelete(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -559,8 +580,9 @@ func TestSegment_segmentSearch(t *testing.T) {
defer cancel()
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -570,8 +592,9 @@ func TestSegment_segmentSearch(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -581,7 +604,8 @@ func TestSegment_segmentSearch(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -683,8 +707,9 @@ func TestSegment_segmentSearch(t *testing.T) {
//-------------------------------------------------------------------------------------- preDm functions
func TestSegment_segmentPreInsert(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -694,8 +719,9 @@ func TestSegment_segmentPreInsert(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -705,7 +731,8 @@ func TestSegment_segmentPreInsert(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -759,8 +786,9 @@ func TestSegment_segmentPreInsert(t *testing.T) {
func TestSegment_segmentPreDelete(t *testing.T) {
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -770,8 +798,9 @@ func TestSegment_segmentPreDelete(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -781,7 +810,8 @@ func TestSegment_segmentPreDelete(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Name: "collection0",
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -33,8 +33,8 @@ func TestStatsService_start(t *testing.T) {
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -44,8 +44,9 @@ func TestStatsService_start(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -55,7 +56,8 @@ func TestStatsService_start(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
@ -114,8 +116,9 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -125,8 +128,9 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
@ -136,7 +140,8 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},

View File

@ -10,7 +10,7 @@ type tSafeWatcher struct {
func newTSafeWatcher() *tSafeWatcher {
return &tSafeWatcher{
notifyChan: make(chan bool),
notifyChan: make(chan bool, 1),
}
}