From c35079d7e7bffc284b87320bb6048f7d17f0940d Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 20 Jan 2021 10:15:43 +0800 Subject: [PATCH] Update registerNode in indexservice Signed-off-by: cai.zhang --- .clang-tidy | 2 +- Makefile | 4 + .../distributed/indexnode/main.go | 1 - .../distributed/indexservice/main.go | 1 - internal/core/src/common/FieldMeta.h | 7 + internal/core/src/common/LoadInfo.h | 4 +- internal/core/src/common/Schema.h | 21 ++- internal/core/src/common/Types.h | 2 +- internal/core/src/query/Plan.cpp | 4 +- .../src/query/visitors/ExecExprVisitor.cpp | 4 +- internal/core/src/segcore/CMakeLists.txt | 3 +- .../core/src/segcore/SealedIndexingRecord.h | 5 +- .../core/src/segcore/SegmentGrowingImpl.cpp | 10 +- .../core/src/segcore/SegmentGrowingImpl.h | 4 +- internal/core/src/segcore/SegmentInterface.h | 22 ++- internal/core/src/segcore/SegmentSealed.h | 10 +- .../core/src/segcore/SegmentSealedImpl.cpp | 119 ++++++++++++ internal/core/src/segcore/SegmentSealedImpl.h | 84 +++++++++ internal/core/src/segcore/segment_c.cpp | 2 +- internal/core/unittest/test_sealed.cpp | 42 +++++ internal/core/unittest/test_span.cpp | 2 +- .../indexnode/{ => client}/client.go | 33 +--- internal/distributed/indexnode/paramtable.go | 177 ------------------ internal/distributed/indexnode/service.go | 56 +++--- .../indexservice/{ => client}/client.go | 35 +--- .../distributed/indexservice/paramtable.go | 144 -------------- internal/distributed/indexservice/service.go | 28 +-- internal/indexnode/indexnode.go | 49 +---- internal/indexnode/paramtable.go | 79 ++++++++ internal/indexservice/indexservice.go | 44 +++-- internal/indexservice/paramtable.go | 2 - 31 files changed, 448 insertions(+), 552 deletions(-) rename internal/distributed/indexnode/cmd/indexnode.go => cmd/distributed/indexnode/main.go (98%) rename internal/distributed/indexservice/cmd/indexserver.go => cmd/distributed/indexservice/main.go (98%) create mode 100644 internal/core/src/segcore/SegmentSealedImpl.cpp create mode 100644 internal/core/src/segcore/SegmentSealedImpl.h rename internal/distributed/indexnode/{ => client}/client.go (51%) delete mode 100644 internal/distributed/indexnode/paramtable.go rename internal/distributed/indexservice/{ => client}/client.go (68%) delete mode 100644 internal/distributed/indexservice/paramtable.go diff --git a/.clang-tidy b/.clang-tidy index 76ea553b13..5488f26a82 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -20,7 +20,7 @@ Checks: > -*, clang-diagnostic-*, -clang-diagnostic-error, clang-analyzer-*, -clang-analyzer-alpha*, google-*, -google-runtime-references, -google-readability-todo, - modernize-*, -modernize-pass-by-value, -modernize-use-equals-default, + modernize-*, -modernize-pass-by-value, -modernize-use-equals-default, -modernize-use-trailing-return-type, performance-faster-string-find, performance-for-range-copy, performance-implicit-conversion-in-loop, performance-inefficient-algorithm, performance-trivially-destructible, performance-inefficient-vector-operation, diff --git a/Makefile b/Makefile index 80ec5556ce..fd88db4bff 100644 --- a/Makefile +++ b/Makefile @@ -125,6 +125,10 @@ build-go: build-cpp @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null @echo "Building singlenode ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/singlenode $(PWD)/cmd/singlenode/main.go 1>/dev/null + @echo "Building distributed indexservice ..." + @mkdir -p $(INSTALL_PATH)/distributed && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/distributed/indexservice $(PWD)/cmd/distributed/indexservice/main.go 1>/dev/null + @echo "Building distributed indexnode ..." + @mkdir -p $(INSTALL_PATH)/distributed && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/distributed/indexnode $(PWD)/cmd/distributed/indexnode/main.go 1>/dev/null build-cpp: @(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)") diff --git a/internal/distributed/indexnode/cmd/indexnode.go b/cmd/distributed/indexnode/main.go similarity index 98% rename from internal/distributed/indexnode/cmd/indexnode.go rename to cmd/distributed/indexnode/main.go index 681b6780f9..6773b5422a 100644 --- a/internal/distributed/indexnode/cmd/indexnode.go +++ b/cmd/distributed/indexnode/main.go @@ -23,7 +23,6 @@ import ( ) func main() { - grpcindexnode.Init() ctx, cancel := context.WithCancel(context.Background()) svr, err := grpcindexnode.CreateIndexNode(ctx) if err != nil { diff --git a/internal/distributed/indexservice/cmd/indexserver.go b/cmd/distributed/indexservice/main.go similarity index 98% rename from internal/distributed/indexservice/cmd/indexserver.go rename to cmd/distributed/indexservice/main.go index d41523a33b..01422fe542 100644 --- a/internal/distributed/indexservice/cmd/indexserver.go +++ b/cmd/distributed/indexservice/main.go @@ -23,7 +23,6 @@ import ( ) func main() { - grpcindexserver.Init() ctx, cancel := context.WithCancel(context.Background()) svr, err := grpcindexserver.CreateIndexServer(ctx) if err != nil { diff --git a/internal/core/src/common/FieldMeta.h b/internal/core/src/common/FieldMeta.h index f1967facac..b9bd2102a5 100644 --- a/internal/core/src/common/FieldMeta.h +++ b/internal/core/src/common/FieldMeta.h @@ -84,6 +84,13 @@ datatype_is_vector(DataType datatype) { struct FieldMeta { public: + FieldMeta(const FieldMeta&) = delete; + FieldMeta(FieldMeta&&) = default; + FieldMeta& + operator=(const FieldMeta&) = delete; + FieldMeta& + operator=(FieldMeta&&) = default; + FieldMeta(const FieldName& name, FieldId id, DataType type) : name_(name), id_(id), type_(type) { Assert(!is_vector()); } diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index 1218f2cedf..7b5b0af166 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -26,6 +26,6 @@ struct LoadIndexInfo { // NOTE: Refer to common/SystemProperty.cpp for details struct LoadFieldDataInfo { int64_t field_id; - void* blob; - int64_t row_count; + void* blob = nullptr; + int64_t row_count = -1; }; diff --git a/internal/core/src/common/Schema.h b/internal/core/src/common/Schema.h index b7bf858471..2a23e0d428 100644 --- a/internal/core/src/common/Schema.h +++ b/internal/core/src/common/Schema.h @@ -23,20 +23,24 @@ namespace milvus { class Schema { public: - void + FieldId AddDebugField(const std::string& name, DataType data_type) { static int64_t debug_id = 1000; - this->AddField(FieldName(name), FieldId(debug_id), data_type); - debug_id++; + auto field_id = FieldId(debug_id); + debug_id += 2; + this->AddField(FieldName(name), field_id, data_type); + return field_id; } // auto gen field_id for convenience - void + FieldId AddDebugField(const std::string& name, DataType data_type, int64_t dim, MetricType metric_type) { - static int64_t debug_id = 2000; - auto field_meta = FieldMeta(FieldName(name), FieldId(debug_id), data_type, dim, metric_type); - debug_id++; + static int64_t debug_id = 2001; + auto field_id = FieldId(debug_id); + debug_id += 2; + auto field_meta = FieldMeta(FieldName(name), field_id, data_type, dim, metric_type); this->AddField(std::move(field_meta)); + return field_id; } // scalar type @@ -141,13 +145,14 @@ class Schema { void AddField(FieldMeta&& field_meta) { auto offset = fields_.size(); - fields_.emplace_back(field_meta); AssertInfo(!name_offsets_.count(field_meta.get_name()), "duplicated field name"); name_offsets_.emplace(field_meta.get_name(), offset); AssertInfo(!id_offsets_.count(field_meta.get_id()), "duplicated field id"); id_offsets_.emplace(field_meta.get_id(), offset); + auto field_sizeof = field_meta.get_sizeof(); sizeof_infos_.push_back(std::move(field_sizeof)); + fields_.emplace_back(std::move(field_meta)); total_sizeof_ += field_sizeof; } diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index ee41de8086..21aebec1e4 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -41,7 +41,7 @@ template constexpr std::false_type always_false{}; template -using aligned_vector = std::vector>; +using aligned_vector = std::vector>; /////////////////////////////////////////////////////////////////////////////////////////////////// struct QueryResult { diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index 8fc1cc804d..9d0434d013 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -200,7 +200,7 @@ Parser::ParseVecNode(const Json& out_body) { auto field_offset = schema.get_offset(field_name); auto vec_node = [&]() -> std::unique_ptr { - auto field_meta = schema.operator[](field_name); + auto& field_meta = schema.operator[](field_name); auto data_type = field_meta.get_data_type(); if (data_type == DataType::VECTOR_FLOAT) { return std::make_unique(); @@ -252,7 +252,7 @@ template ExprPtr Parser::ParseRangeNodeImpl(const FieldName& field_name, const Json& body) { auto expr = std::make_unique>(); - auto field_meta = schema[field_name]; + auto& field_meta = schema[field_name]; auto data_type = field_meta.get_data_type(); expr->data_type_ = data_type; expr->field_offset_ = schema.get_offset(field_name); diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 5d2b44c96f..0c3e85e4a2 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -130,7 +130,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl& expr, IndexFunc index_fu // RetType results(vec.num_chunk()); auto indexing_barrier = segment_.num_chunk_index_safe(field_offset); - auto chunk_size = segment_.chunk_size(); + auto chunk_size = segment_.size_per_chunk(); auto num_chunk = upper_div(row_count_, chunk_size); RetType results; @@ -290,7 +290,7 @@ ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType { auto& field_meta = schema[field_offset]; // auto vec_ptr = records.get_entity(field_offset); // auto& vec = *vec_ptr; - auto chunk_size = segment_.chunk_size(); + auto chunk_size = segment_.size_per_chunk(); auto num_chunk = upper_div(row_count_, chunk_size); RetType bitsets; diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index 2a5fa4b8f6..e51eb17b54 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -1,10 +1,11 @@ set(SEGCORE_FILES - SegmentGrowingImpl.cpp Collection.cpp collection_c.cpp segment_c.cpp SegmentGrowing.cpp + SegmentGrowingImpl.cpp + SegmentSealedImpl.cpp IndexingEntry.cpp InsertRecord.cpp Reduce.cpp diff --git a/internal/core/src/segcore/SealedIndexingRecord.h b/internal/core/src/segcore/SealedIndexingRecord.h index fcc9b4c4d1..edd6d85e0b 100644 --- a/internal/core/src/segcore/SealedIndexingRecord.h +++ b/internal/core/src/segcore/SealedIndexingRecord.h @@ -31,7 +31,10 @@ using SealedIndexingEntryPtr = std::unique_ptr; struct SealedIndexingRecord { void - add_entry(FieldOffset field_offset, SealedIndexingEntryPtr&& ptr) { + add_entry(FieldOffset field_offset, MetricType metric_type, knowhere::VecIndexPtr indexing) { + auto ptr = std::make_unique(); + ptr->indexing_ = indexing; + ptr->metric_type_ = metric_type; std::unique_lock lck(mutex_); entries_[field_offset] = std::move(ptr); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 0f4ab51335..51f692cd2f 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -270,7 +270,7 @@ SegmentGrowingImpl::FillTargetEntry(const query::Plan* plan, QueryResult& result auto key_offset_opt = schema_->get_primary_key_offset(); Assert(key_offset_opt.has_value()); auto key_offset = key_offset_opt.value(); - auto field_meta = schema_->operator[](key_offset); + auto& field_meta = schema_->operator[](key_offset); Assert(field_meta.get_data_type() == DataType::INT64); auto uids = record_.get_entity(key_offset); for (int64_t i = 0; i < size; ++i) { @@ -290,12 +290,8 @@ SegmentGrowingImpl::LoadIndexing(const LoadIndexInfo& info) { Assert(info.index_params.count("metric_type")); auto metric_type_str = info.index_params.at("metric_type"); - auto entry = std::make_unique(); - entry->metric_type_ = GetMetricType(metric_type_str); - entry->indexing_ = info.index; - - sealed_indexing_record_.add_entry(field_offset, std::move(entry)); + sealed_indexing_record_.add_entry(field_offset, GetMetricType(metric_type_str), info.index); return Status::OK(); } @@ -306,7 +302,7 @@ SegmentGrowingImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) } int64_t -SegmentGrowingImpl::get_safe_num_chunk() const { +SegmentGrowingImpl::num_chunk_data() const { auto size = get_insert_record().ack_responder_.GetAck(); return upper_div(size, chunk_size_); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 15d129c355..d7dec5f523 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -105,7 +105,7 @@ class SegmentGrowingImpl : public SegmentGrowing { } int64_t - chunk_size() const final { + size_per_chunk() const final { return chunk_size_; } @@ -126,7 +126,7 @@ class SegmentGrowingImpl : public SegmentGrowing { } int64_t - get_safe_num_chunk() const override; + num_chunk_data() const override; Status LoadIndexing(const LoadIndexInfo& info) override; diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 34c18b53ee..39a5327483 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -36,27 +36,21 @@ class SegmentInterface { virtual int64_t get_row_count() const = 0; + virtual const Schema& + get_schema() const = 0; + virtual ~SegmentInterface() = default; }; // internal API for DSL calculation class SegmentInternalInterface : public SegmentInterface { public: - virtual const Schema& - get_schema() const = 0; - - virtual int64_t - get_safe_num_chunk() const = 0; - template Span chunk_data(FieldOffset field_offset, int64_t chunk_id) const { return static_cast>(chunk_data_impl(field_offset, chunk_id)); } - virtual int64_t - num_chunk_index_safe(FieldOffset field_offset) const = 0; - template const knowhere::scalar::StructuredIndex& chunk_scalar_index(FieldOffset field_offset, int64_t chunk_id) const { @@ -68,8 +62,16 @@ class SegmentInternalInterface : public SegmentInterface { return *ptr; } + public: virtual int64_t - chunk_size() const = 0; + num_chunk_index_safe(FieldOffset field_offset) const = 0; + + virtual int64_t + num_chunk_data() const = 0; + + // return chunk_size for each chunk, renaming against confusion + virtual int64_t + size_per_chunk() const = 0; protected: // blob and row_count diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 741d2cd777..6114ecbaa8 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -16,12 +16,8 @@ namespace milvus::segcore { -class SegmentSealed : public SegmentInterface { +class SegmentSealed : public SegmentInternalInterface { public: - virtual const Schema& - get_schema() = 0; - virtual int64_t - get_row_count() = 0; virtual void LoadIndex(const LoadIndexInfo& info) = 0; virtual void @@ -31,8 +27,6 @@ class SegmentSealed : public SegmentInterface { using SegmentSealedPtr = std::unique_ptr; SegmentSealedPtr -CreateSealedSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024) { - return nullptr; -} +CreateSealedSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024); } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp new file mode 100644 index 0000000000..35fd8fa302 --- /dev/null +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -0,0 +1,119 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "segcore/SegmentSealedImpl.h" +namespace milvus::segcore { +void +SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { + auto field_id = FieldId(info.field_id); + auto field_offset = schema_->get_offset(field_id); + + Assert(info.index_params.count("metric_type")); + auto metric_type_str = info.index_params.at("metric_type"); + auto row_count = info.index->Count(); + Assert(row_count > 0); + + std::unique_lock lck(mutex_); + if (row_count_opt_.has_value()) { + AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns"); + } else { + row_count_opt_ = row_count; + } + Assert(!vec_indexings_.is_ready(field_offset)); + vec_indexings_.add_entry(field_offset, GetMetricType(metric_type_str), info.index); + ++ready_count_; +} + +void +SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { + // TODO + Assert(info.row_count > 0); + auto field_id = FieldId(info.field_id); + auto field_offset = schema_->get_offset(field_id); + auto& field_meta = schema_->operator[](field_offset); + Assert(!field_meta.is_vector()); + auto element_sizeof = field_meta.get_sizeof(); + auto length_in_bytes = element_sizeof * info.row_count; + aligned_vector vecdata(length_in_bytes); + memcpy(vecdata.data(), info.blob, length_in_bytes); + std::unique_lock lck(mutex_); + if (row_count_opt_.has_value()) { + AssertInfo(row_count_opt_.value() == info.row_count, "load data has different row count from other columns"); + } else { + row_count_opt_ = info.row_count; + } + AssertInfo(columns_data_[field_offset.get()].empty(), "already exists"); + columns_data_[field_offset.get()] = std::move(vecdata); + ++ready_count_; +} + +int64_t +SegmentSealedImpl::num_chunk_index_safe(FieldOffset field_offset) const { + // TODO: support scalar index + return 0; +} + +int64_t +SegmentSealedImpl::num_chunk_data() const { + PanicInfo("unimplemented"); +} + +int64_t +SegmentSealedImpl::size_per_chunk() const { + PanicInfo("unimplemented"); +} + +SpanBase +SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const { + PanicInfo("unimplemented"); +} + +const knowhere::Index* +SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const { + PanicInfo("unimplemented"); +} + +void +SegmentSealedImpl::FillTargetEntry(const query::Plan* Plan, QueryResult& results) const { + PanicInfo("unimplemented"); +} + +QueryResult +SegmentSealedImpl::Search(const query::Plan* Plan, + const query::PlaceholderGroup** placeholder_groups, + const Timestamp* timestamps, + int64_t num_groups) const { + PanicInfo("unimplemented"); +} + +int64_t +SegmentSealedImpl::GetMemoryUsageInBytes() const { + PanicInfo("unimplemented"); +} + +int64_t +SegmentSealedImpl::get_row_count() const { + std::shared_lock lck(mutex_); + AssertInfo(row_count_opt_.has_value(), "Data not loaded"); + return row_count_opt_.value(); +} + +const Schema& +SegmentSealedImpl::get_schema() const { + return *schema_; +} + +SegmentSealedPtr +CreateSealedSegment(SchemaPtr schema, int64_t chunk_size) { + return std::make_unique(schema); +} + +} // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h new file mode 100644 index 0000000000..15a288e1eb --- /dev/null +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -0,0 +1,84 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include "segcore/SegmentSealed.h" +#include "SealedIndexingRecord.h" +#include +#include + +namespace milvus::segcore { +class SegmentSealedImpl : public SegmentSealed { + public: + explicit SegmentSealedImpl(SchemaPtr schema) : schema_(schema), columns_data_(schema->size()) { + } + void + LoadIndex(const LoadIndexInfo& info) override; + void + LoadFieldData(const LoadFieldDataInfo& info) override; + + public: + void + FillTargetEntry(const query::Plan* Plan, QueryResult& results) const override; + + QueryResult + Search(const query::Plan* Plan, + const query::PlaceholderGroup* placeholder_groups[], + const Timestamp timestamps[], + int64_t num_groups) const override; + + int64_t + GetMemoryUsageInBytes() const override; + + int64_t + get_row_count() const override; + + const Schema& + get_schema() const override; + + public: + int64_t + num_chunk_index_safe(FieldOffset field_offset) const override; + + int64_t + num_chunk_data() const override; + + // return chunk_size for each chunk, renaming against confusion + int64_t + size_per_chunk() const override; + + protected: + // blob and row_count + SpanBase + chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const override; + + const knowhere::Index* + chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const override; + + private: + bool + is_all_ready() const { + return ready_count_ == schema_->size(); + } + + mutable std::shared_mutex mutex_; + std::atomic_int ready_count_ = 0; + + private: + // TOOD: generate index for scalar + std::optional row_count_opt_; + std::map scalar_indexings_; + SealedIndexingRecord vec_indexings_; + std::vector> columns_data_; + aligned_vector row_ids_; + SchemaPtr schema_; +}; +} // namespace milvus::segcore diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 28070f0e68..02db6516b1 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -208,7 +208,7 @@ PreDelete(CSegmentInterface c_segment, int64_t size) { return segment->PreDelete(size); } -////////////////////////////// interfaces for growing segment ////////////////////////////// +////////////////////////////// interfaces for sealed segment ////////////////////////////// CStatus LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info) { auto segment = (milvus::segcore::SegmentSealed*)c_segment; diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 09cafb2937..1ab497b4ab 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -18,6 +18,7 @@ #include #include #include +#include "segcore/SegmentSealedImpl.h" using namespace milvus; using namespace milvus::segcore; @@ -213,4 +214,45 @@ TEST(Sealed, with_predicate) { ASSERT_EQ(post_qr.internal_seg_offsets_[offset], 420000 + i); ASSERT_EQ(post_qr.result_distances_[offset], 0.0); } +} + +TEST(Sealed, LoadFieldData) { + auto dim = 16; + auto topK = 5; + int64_t N = 1000 * 1000; + auto metric_type = MetricType::METRIC_L2; + auto schema = std::make_shared(); + auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type); + auto counter_id = schema->AddDebugField("counter", DataType::INT64); + auto dataset = DataGen(schema, N); + + auto fakevec = dataset.get_col(0); + + auto counter = dataset.get_col(1); + auto indexing = std::make_shared(); + + auto conf = knowhere::Config{{knowhere::meta::DIM, dim}, + {knowhere::meta::TOPK, topK}, + {knowhere::IndexParams::nlist, 100}, + {knowhere::IndexParams::nprobe, 10}, + {knowhere::Metric::TYPE, milvus::knowhere::Metric::L2}, + {knowhere::meta::DEVICEID, 0}}; + auto database = knowhere::GenDataset(N, dim, fakevec.data()); + indexing->Train(database, conf); + indexing->AddWithoutIds(database, conf); + + auto segment = CreateSealedSegment(schema); + LoadFieldDataInfo field_info; + field_info.field_id = counter_id.get(); + field_info.row_count = N; + field_info.blob = counter.data(); + segment->LoadFieldData(field_info); + + LoadIndexInfo vec_info; + vec_info.field_id = fakevec_id.get(); + vec_info.field_name = "fakevec"; + vec_info.index = indexing; + vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2; + segment->LoadIndex(vec_info); + int i = 1 + 1; } \ No newline at end of file diff --git a/internal/core/unittest/test_span.cpp b/internal/core/unittest/test_span.cpp index 7d9a6f3be7..d77c9baded 100644 --- a/internal/core/unittest/test_span.cpp +++ b/internal/core/unittest/test_span.cpp @@ -33,7 +33,7 @@ TEST(Span, Naive) { auto age_ptr = dataset.get_col(1); auto float_ptr = dataset.get_col(2); SegmentInternalInterface& interface = *segment; - auto num_chunk = interface.get_safe_num_chunk(); + auto num_chunk = interface.num_chunk_data(); ASSERT_EQ(num_chunk, upper_div(N, chunk_size)); auto row_count = interface.get_row_count(); ASSERT_EQ(N, row_count); diff --git a/internal/distributed/indexnode/client.go b/internal/distributed/indexnode/client/client.go similarity index 51% rename from internal/distributed/indexnode/client.go rename to internal/distributed/indexnode/client/client.go index 31ef7c0023..9263fde8a2 100644 --- a/internal/distributed/indexnode/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -1,4 +1,4 @@ -package grpcindexnode +package grpcindexnodeclient import ( "context" @@ -7,7 +7,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "google.golang.org/grpc" ) @@ -15,32 +14,6 @@ type Client struct { grpcClient indexpb.IndexNodeClient } -func (c Client) Init() { - //TODO:??? - panic("implement me") -} - -func (c Client) Start() { - //TODO:??? - panic("implement me") -} - -func (c Client) Stop() { - panic("implement me") -} - -func (c Client) GetComponentStates() (*internalpb2.ComponentStates, error) { - panic("implement me") -} - -func (c Client) GetTimeTickChannel() (string, error) { - panic("implement me") -} - -func (c Client) GetStatisticsChannel() (string, error) { - panic("implement me") -} - func (c Client) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) { ctx := context.TODO() @@ -53,9 +26,9 @@ func NewClient(nodeAddress string) *Client { defer cancel() conn, err := grpc.DialContext(ctx1, nodeAddress, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { - log.Printf("IndexNode connect to IndexService failed, error= %v", err) + log.Printf("Connect to IndexNode failed, error= %v", err) } - log.Printf("IndexNode connected to IndexService, IndexService=%s", Params.Address) + log.Printf("Connected to IndexService, IndexService=%s", nodeAddress) return &Client{ grpcClient: indexpb.NewIndexNodeClient(conn), } diff --git a/internal/distributed/indexnode/paramtable.go b/internal/distributed/indexnode/paramtable.go deleted file mode 100644 index a2914c0c25..0000000000 --- a/internal/distributed/indexnode/paramtable.go +++ /dev/null @@ -1,177 +0,0 @@ -package grpcindexnode - -import ( - "net" - "strconv" - - "github.com/zilliztech/milvus-distributed/internal/util/paramtable" -) - -type ParamTable struct { - paramtable.BaseTable - - Address string - Port int - ServiceAddress string - ServicePort int - - NodeID int64 - - MasterAddress string - - EtcdAddress string - MetaRootPath string - - MinIOAddress string - MinIOAccessKeyID string - MinIOSecretAccessKey string - MinIOUseSSL bool - MinioBucketName string -} - -var Params ParamTable - -func (pt *ParamTable) Init() { - pt.BaseTable.Init() - pt.initAddress() - pt.initPort() - pt.initIndexServerAddr() - pt.initIndexServerPort() - pt.initEtcdAddress() - pt.initMasterAddress() - pt.initMetaRootPath() - pt.initMinIOAddress() - pt.initMinIOAccessKeyID() - pt.initMinIOSecretAccessKey() - pt.initMinIOUseSSL() - pt.initMinioBucketName() -} - -func (pt *ParamTable) initAddress() { - addr, err := pt.Load("indexNode.address") - if err != nil { - panic(err) - } - - hostName, _ := net.LookupHost(addr) - if len(hostName) <= 0 { - if ip := net.ParseIP(addr); ip == nil { - panic("invalid ip indexBuilder.address") - } - } - - port, err := pt.Load("indexNode.port") - if err != nil { - panic(err) - } - _, err = strconv.Atoi(port) - if err != nil { - panic(err) - } - - pt.Address = addr + ":" + port -} - -func (pt *ParamTable) initPort() { - pt.Port = pt.ParseInt("indexNode.port") -} - -func (pt *ParamTable) initIndexServerAddr() { - addr, err := pt.Load("indexServer.address") - if err != nil { - panic(err) - } - - hostName, _ := net.LookupHost(addr) - if len(hostName) <= 0 { - if ip := net.ParseIP(addr); ip == nil { - panic("invalid ip indexBuilder.address") - } - } - - port, err := pt.Load("indexServer.port") - if err != nil { - panic(err) - } - _, err = strconv.Atoi(port) - if err != nil { - panic(err) - } - - pt.ServiceAddress = addr + ":" + port -} - -func (pt ParamTable) initIndexServerPort() { - pt.ServicePort = pt.ParseInt("indexServer.port") -} - -func (pt *ParamTable) initEtcdAddress() { - addr, err := pt.Load("_EtcdAddress") - if err != nil { - panic(err) - } - pt.EtcdAddress = addr -} - -func (pt *ParamTable) initMetaRootPath() { - rootPath, err := pt.Load("etcd.rootPath") - if err != nil { - panic(err) - } - subPath, err := pt.Load("etcd.metaSubPath") - if err != nil { - panic(err) - } - pt.MetaRootPath = rootPath + "/" + subPath -} - -func (pt *ParamTable) initMasterAddress() { - ret, err := pt.Load("_MasterAddress") - if err != nil { - panic(err) - } - pt.MasterAddress = ret -} - -func (pt *ParamTable) initMinIOAddress() { - ret, err := pt.Load("_MinioAddress") - if err != nil { - panic(err) - } - pt.MinIOAddress = ret -} - -func (pt *ParamTable) initMinIOAccessKeyID() { - ret, err := pt.Load("minio.accessKeyID") - if err != nil { - panic(err) - } - pt.MinIOAccessKeyID = ret -} - -func (pt *ParamTable) initMinIOSecretAccessKey() { - ret, err := pt.Load("minio.secretAccessKey") - if err != nil { - panic(err) - } - pt.MinIOSecretAccessKey = ret -} - -func (pt *ParamTable) initMinIOUseSSL() { - ret, err := pt.Load("minio.useSSL") - if err != nil { - panic(err) - } - pt.MinIOUseSSL, err = strconv.ParseBool(ret) - if err != nil { - panic(err) - } -} - -func (pt *ParamTable) initMinioBucketName() { - bucketName, err := pt.Load("minio.bucketName") - if err != nil { - panic(err) - } - pt.MinioBucketName = bucketName -} diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 4e3ca26fd1..187049c176 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -7,7 +7,7 @@ import ( "strconv" "sync" - grpcindexservice "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice" + serviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" "github.com/zilliztech/milvus-distributed/internal/indexnode" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" @@ -19,27 +19,29 @@ type Server struct { grpcServer *grpc.Server - indexNodeLoopCtx context.Context - indexNodeLoopCancel func() - indexNodeLoopWg sync.WaitGroup + loopCtx context.Context + loopCancel func() + loopWg sync.WaitGroup } -func NewGrpcServer(ctx context.Context, indexID int64) *Server { - +func NewGrpcServer(ctx context.Context, nodeID int64) *Server { + ctx1, cancel := context.WithCancel(ctx) return &Server{ - node: indexnode.NewIndexNode(ctx, indexID), + loopCtx: ctx1, + loopCancel: cancel, + node: indexnode.NewIndexNode(ctx, nodeID), } } func registerNode() error { - indexServiceClient := grpcindexservice.NewClient(Params.ServiceAddress) + indexServiceClient := serviceclient.NewClient(indexnode.Params.ServiceAddress) request := &indexpb.RegisterNodeRequest{ Base: nil, Address: &commonpb.Address{ - Ip: Params.Address, - Port: int64(Params.Port), + Ip: indexnode.Params.NodeIP, + Port: int64(indexnode.Params.NodePort), }, } resp, err := indexServiceClient.RegisterNode(request) @@ -48,17 +50,17 @@ func registerNode() error { return err } - Params.NodeID = resp.InitParams.NodeID - log.Println("Register indexNode successful with nodeID=", Params.NodeID) + indexnode.Params.NodeID = resp.InitParams.NodeID + log.Println("Register indexNode successful with nodeID=", indexnode.Params.NodeID) - err = Params.LoadFromKVPair(resp.InitParams.StartParams) + err = indexnode.Params.LoadFromKVPair(resp.InitParams.StartParams) return err } func (s *Server) grpcLoop() { - defer s.indexNodeLoopWg.Done() + defer s.loopWg.Done() - lis, err := net.Listen("tcp", ":"+strconv.Itoa(Params.Port)) + lis, err := net.Listen("tcp", ":"+strconv.Itoa(indexnode.Params.NodePort)) if err != nil { log.Fatalf("IndexNode grpc server fatal error=%v", err) } @@ -68,45 +70,41 @@ func (s *Server) grpcLoop() { if err = s.grpcServer.Serve(lis); err != nil { log.Fatalf("IndexNode grpc server fatal error=%v", err) } - log.Println("IndexNode grpc server starting...") } func (s *Server) startIndexNode() error { - s.indexNodeLoopWg.Add(1) + s.loopWg.Add(1) //TODO: How to make sure that grpc server has started successfully go s.grpcLoop() + log.Println("IndexNode grpc server start successfully") + err := registerNode() if err != nil { return err } - Params.Init() + indexnode.Params.Init() return nil } -func Init() { - Params.Init() +func (s *Server) Init() { + indexnode.Params.Init() + log.Println("IndexNode init successfully, nodeAddress=", indexnode.Params.NodeAddress) } func CreateIndexNode(ctx context.Context) (*Server, error) { - ctx1, cancel := context.WithCancel(ctx) - s := &Server{ - indexNodeLoopCtx: ctx1, - indexNodeLoopCancel: cancel, - } - - return s, nil + return NewGrpcServer(ctx, indexnode.Params.NodeID), nil } func (s *Server) Start() error { - + s.Init() return s.startIndexNode() } func (s *Server) Stop() { - s.indexNodeLoopWg.Wait() + s.loopWg.Wait() } func (s *Server) Close() { diff --git a/internal/distributed/indexservice/client.go b/internal/distributed/indexservice/client/client.go similarity index 68% rename from internal/distributed/indexservice/client.go rename to internal/distributed/indexservice/client/client.go index 05356938c9..e245ecd330 100644 --- a/internal/distributed/indexservice/client.go +++ b/internal/distributed/indexservice/client/client.go @@ -1,4 +1,4 @@ -package grpcindexservice +package grpcindexserviceclient import ( "context" @@ -6,41 +6,14 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - - "google.golang.org/grpc" - "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "google.golang.org/grpc" ) type Client struct { grpcClient indexpb.IndexServiceClient } -func (g Client) Init() { - panic("implement me") -} - -func (g Client) Start() { - panic("implement me") -} - -func (g Client) Stop() { - panic("implement me") -} - -func (g Client) GetComponentStates() (*internalpb2.ComponentStates, error) { - panic("implement me") -} - -func (g Client) GetTimeTickChannel() (string, error) { - panic("implement me") -} - -func (g Client) GetStatisticsChannel() (string, error) { - panic("implement me") -} - func (g Client) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) { ctx := context.TODO() @@ -80,9 +53,9 @@ func NewClient(address string) *Client { defer cancel() conn, err := grpc.DialContext(ctx1, address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { - log.Printf("IndexNode connect to IndexService failed, error= %v", err) + log.Printf("Connect to IndexService failed, error= %v", err) } - log.Printf("IndexNode connected to IndexService, IndexService=%s", Params.Address) + log.Printf("Connected to IndexService, IndexService=%s", address) return &Client{ grpcClient: indexpb.NewIndexServiceClient(conn), diff --git a/internal/distributed/indexservice/paramtable.go b/internal/distributed/indexservice/paramtable.go deleted file mode 100644 index 37640a906b..0000000000 --- a/internal/distributed/indexservice/paramtable.go +++ /dev/null @@ -1,144 +0,0 @@ -package grpcindexservice - -import ( - "net" - "strconv" - - "github.com/zilliztech/milvus-distributed/internal/util/paramtable" -) - -type ParamTable struct { - paramtable.BaseTable - - Address string - Port int - - NodeID int64 - - MasterAddress string - - EtcdAddress string - MetaRootPath string - - MinIOAddress string - MinIOAccessKeyID string - MinIOSecretAccessKey string - MinIOUseSSL bool - MinioBucketName string -} - -var Params ParamTable - -func (pt *ParamTable) Init() { - pt.BaseTable.Init() - pt.initAddress() - pt.initPort() - pt.initEtcdAddress() - pt.initMasterAddress() - pt.initMetaRootPath() - pt.initMinIOAddress() - pt.initMinIOAccessKeyID() - pt.initMinIOSecretAccessKey() - pt.initMinIOUseSSL() - pt.initMinioBucketName() -} - -func (pt *ParamTable) initAddress() { - addr, err := pt.Load("indexServer.address") - if err != nil { - panic(err) - } - - hostName, _ := net.LookupHost(addr) - if len(hostName) <= 0 { - if ip := net.ParseIP(addr); ip == nil { - panic("invalid ip indexBuilder.address") - } - } - - port, err := pt.Load("indexServer.port") - if err != nil { - panic(err) - } - _, err = strconv.Atoi(port) - if err != nil { - panic(err) - } - - pt.Address = addr + ":" + port -} - -func (pt *ParamTable) initPort() { - pt.Port = pt.ParseInt("indexServer.port") -} - -func (pt *ParamTable) initEtcdAddress() { - addr, err := pt.Load("_EtcdAddress") - if err != nil { - panic(err) - } - pt.EtcdAddress = addr -} - -func (pt *ParamTable) initMetaRootPath() { - rootPath, err := pt.Load("etcd.rootPath") - if err != nil { - panic(err) - } - subPath, err := pt.Load("etcd.metaSubPath") - if err != nil { - panic(err) - } - pt.MetaRootPath = rootPath + "/" + subPath -} - -func (pt *ParamTable) initMasterAddress() { - ret, err := pt.Load("_MasterAddress") - if err != nil { - panic(err) - } - pt.MasterAddress = ret -} - -func (pt *ParamTable) initMinIOAddress() { - ret, err := pt.Load("_MinioAddress") - if err != nil { - panic(err) - } - pt.MinIOAddress = ret -} - -func (pt *ParamTable) initMinIOAccessKeyID() { - ret, err := pt.Load("minio.accessKeyID") - if err != nil { - panic(err) - } - pt.MinIOAccessKeyID = ret -} - -func (pt *ParamTable) initMinIOSecretAccessKey() { - ret, err := pt.Load("minio.secretAccessKey") - if err != nil { - panic(err) - } - pt.MinIOSecretAccessKey = ret -} - -func (pt *ParamTable) initMinIOUseSSL() { - ret, err := pt.Load("minio.useSSL") - if err != nil { - panic(err) - } - pt.MinIOUseSSL, err = strconv.ParseBool(ret) - if err != nil { - panic(err) - } -} - -func (pt *ParamTable) initMinioBucketName() { - bucketName, err := pt.Load("minio.bucketName") - if err != nil { - panic(err) - } - pt.MinioBucketName = bucketName -} diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 5434d64f53..7bf384d867 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -29,13 +29,11 @@ type Server struct { } func (s *Server) Init() { - log.Println("initing params ...") - Params.Init() + indexservice.Params.Init() } func (s *Server) Start() error { s.Init() - log.Println("stringing indexserver ...") return s.startIndexServer() } @@ -64,20 +62,6 @@ func (s *Server) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequ func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { return s.server.BuildIndex(req) - //indexID := int64(0) - //request := &indexpb.BuildIndexCmd{ - // IndexID: indexID, - // Req: req, - //} - // - //indexNodeClient := grpcindexnode.NewClient() - // - //status, err := indexNodeClient.BuildIndex(request) - //response := &indexpb.BuildIndexResponse{ - // Status: status, - // IndexID: indexID, - //} - //return response, err } func (s *Server) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) { @@ -95,8 +79,6 @@ func (s *Server) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNo return s.server.NotifyBuildIndex(nty) } -//varindex - func NewServer() *Server { return &Server{ @@ -109,7 +91,7 @@ func (s *Server) grpcLoop() { defer s.loopWg.Done() log.Println("Starting start IndexServer") - lis, err := net.Listen("tcp", ":"+strconv.Itoa(Params.Port)) + lis, err := net.Listen("tcp", ":"+strconv.Itoa(indexservice.Params.Port)) if err != nil { log.Fatalf("IndexServer grpc server fatal error=%v", err) } @@ -121,20 +103,16 @@ func (s *Server) grpcLoop() { if err = s.grpcServer.Serve(lis); err != nil { log.Fatalf("IndexServer grpc server fatal error=%v", err) } - log.Println("IndexServer grpc server starting...") } func (s *Server) startIndexServer() error { s.loopWg.Add(1) go s.grpcLoop() + log.Println("IndexServer grpc server start successfully") return nil } -func Init() { - Params.Init() -} - func CreateIndexServer(ctx context.Context) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 3d9aab88f4..01644c8cad 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -10,8 +10,6 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/allocator" - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/indexservice" "github.com/zilliztech/milvus-distributed/internal/kv" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" @@ -43,8 +41,8 @@ type IndexNode struct { startCallbacks []func() closeCallbacks []func() - indexNodeID int64 - serviceClient indexservice.Interface // method factory + indexNodeID int64 + //serviceClient indexservice.Interface // method factory } func (i *IndexNode) Init() { @@ -72,58 +70,25 @@ func (i *IndexNode) GetStatisticsChannel() (string, error) { } func (i *IndexNode) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) { - //TODO: build index in index node - ctx := context.Background() - t := NewIndexAddTask() - t.req = req.Req - t.idAllocator = i.idAllocator - t.buildQueue = i.sched.IndexBuildQueue - t.table = i.metaTable - t.kv = i.kv - var cancel func() - t.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) - defer cancel() - fn := func() error { - select { - case <-ctx.Done(): - return errors.New("insert timeout") - default: - return i.sched.IndexAddQueue.Enqueue(t) - } - } - ret := &commonpb.Status{ + log.Println("Create index with indexID=", req.IndexID) + return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", - } - - err := fn() - if err != nil { - ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - ret.Reason = err.Error() - return ret, nil - } - - err = t.WaitToFinish() - if err != nil { - ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - ret.Reason = err.Error() - return ret, nil - } - return ret, nil + }, nil } func CreateIndexNode(ctx context.Context) (*IndexNode, error) { return &IndexNode{}, nil } -func NewIndexNode(ctx context.Context, indexID int64) *IndexNode { +func NewIndexNode(ctx context.Context, nodeID int64) *IndexNode { ctx1, cancel := context.WithCancel(ctx) in := &IndexNode{ loopCtx: ctx1, loopCancel: cancel, - indexNodeID: indexID, + indexNodeID: nodeID, } return in diff --git a/internal/indexnode/paramtable.go b/internal/indexnode/paramtable.go index 2396fd095f..1954c6ee95 100644 --- a/internal/indexnode/paramtable.go +++ b/internal/indexnode/paramtable.go @@ -13,6 +13,14 @@ type ParamTable struct { Address string Port int + NodeAddress string + NodeIP string + NodePort int + ServiceAddress string + ServicePort int + + NodeID int64 + MasterAddress string EtcdAddress string @@ -31,6 +39,11 @@ func (pt *ParamTable) Init() { pt.BaseTable.Init() pt.initAddress() pt.initPort() + pt.initNodeAddress() + pt.initNodeIP() + pt.initNodePort() + pt.initIndexServerAddr() + pt.initIndexServerPort() pt.initEtcdAddress() pt.initMasterAddress() pt.initMetaRootPath() @@ -70,6 +83,72 @@ func (pt *ParamTable) initPort() { pt.Port = pt.ParseInt("indexBuilder.port") } +func (pt *ParamTable) initNodeAddress() { + addr, err := pt.Load("indexNode.address") + if err != nil { + panic(err) + } + + hostName, _ := net.LookupHost(addr) + if len(hostName) <= 0 { + if ip := net.ParseIP(addr); ip == nil { + panic("invalid ip indexBuilder.address") + } + } + + port, err := pt.Load("indexNode.port") + if err != nil { + panic(err) + } + _, err = strconv.Atoi(port) + if err != nil { + panic(err) + } + + pt.NodeAddress = addr + ":" + port +} + +func (pt *ParamTable) initNodeIP() { + addr, err := pt.Load("indexNode.address") + if err != nil { + panic(err) + } + pt.NodeIP = addr +} + +func (pt *ParamTable) initNodePort() { + pt.NodePort = pt.ParseInt("indexNode.port") +} + +func (pt *ParamTable) initIndexServerAddr() { + addr, err := pt.Load("indexServer.address") + if err != nil { + panic(err) + } + + hostName, _ := net.LookupHost(addr) + if len(hostName) <= 0 { + if ip := net.ParseIP(addr); ip == nil { + panic("invalid ip indexServer.address") + } + } + + port, err := pt.Load("indexServer.port") + if err != nil { + panic(err) + } + _, err = strconv.Atoi(port) + if err != nil { + panic(err) + } + + pt.ServiceAddress = addr + ":" + port +} + +func (pt ParamTable) initIndexServerPort() { + pt.ServicePort = pt.ParseInt("indexServer.port") +} + func (pt *ParamTable) initEtcdAddress() { addr, err := pt.Load("_EtcdAddress") if err != nil { diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index e0329a69ac..84b8a47ac4 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -8,7 +8,9 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/allocator" + grpcindexnodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode/client" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/indexnode" "github.com/zilliztech/milvus-distributed/internal/kv" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -22,7 +24,7 @@ import ( type IndexService struct { // implement Service - //nodeClients [] .Interface + nodeClients []indexnode.Interface // factory method loopCtx context.Context loopCancel func() @@ -99,6 +101,11 @@ func (i *IndexService) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb. i.nodeNum++ + nodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10) + log.Println(nodeAddress) + nodeClient := grpcindexnodeclient.NewClient(nodeAddress) + i.nodeClients = append(i.nodeClients, nodeClient) + return &indexpb.RegisterNodeResponse{ InitParams: &internalpb2.InitParams{ NodeID: nodeID, @@ -108,28 +115,19 @@ func (i *IndexService) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb. } func (i *IndexService) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { - //TODO: Multiple indexes will build at same time. - //ctx := context.Background() - //indexNodeClient := indexnode.NewIndexNode(ctx, rand.Int63n(i.nodeNum)) - // - ////TODO: Allocator index ID - //indexID := int64(0) - // - //request := &indexpb.BuildIndexCmd{ - // IndexID: indexID, - // Req: req, - //} - // - //status, err := indexNodeClient.BuildIndex(request) - //if err != nil { - // return nil, err - //} - // - //return &indexpb.BuildIndexResponse{ - // Status: status, - // IndexID: indexID, - //}, nil - return nil, nil + + //TODO: Allocator ID + indexID := int64(0) + nodeClient := i.nodeClients[0] + request := &indexpb.BuildIndexCmd{ + IndexID: indexID, + Req: req, + } + status, err := nodeClient.BuildIndex(request) + return &indexpb.BuildIndexResponse{ + Status: status, + IndexID: indexID, + }, err } func (i *IndexService) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) { diff --git a/internal/indexservice/paramtable.go b/internal/indexservice/paramtable.go index 361b6f7607..f71d87cc16 100644 --- a/internal/indexservice/paramtable.go +++ b/internal/indexservice/paramtable.go @@ -13,8 +13,6 @@ type ParamTable struct { Address string Port int - NodeID int64 - MasterAddress string EtcdAddress string