From 46a14da3ebaf59516289a8416a2a65ded2a1d52d Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Sat, 16 Jan 2021 15:04:32 +0800 Subject: [PATCH] Implement segment manager and loadSegment Signed-off-by: bigsheeper --- .../src/common/{LoadInfo.h => LoadIndex.h} | 8 +- internal/core/src/segcore/SegmentGrowing.h | 2 +- internal/core/src/segcore/SegmentSealed.h | 29 +- internal/core/src/segcore/load_index_c.cpp | 2 +- internal/core/src/segcore/segment_c.cpp | 2 +- internal/core/unittest/CMakeLists.txt | 1 - internal/core/unittest/test_c_api.cpp | 2 +- internal/core/unittest/test_load.cpp | 6 - internal/distributed/querynode/client.go | 54 +++- internal/distributed/querynode/service.go | 18 +- internal/querynode/api.go | 286 ++++++++---------- internal/querynode/query_node.go | 14 +- internal/querynode/segment_manager.go | 126 ++++++++ 13 files changed, 349 insertions(+), 201 deletions(-) rename internal/core/src/common/{LoadInfo.h => LoadIndex.h} (85%) delete mode 100644 internal/core/unittest/test_load.cpp create mode 100644 internal/querynode/segment_manager.go diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadIndex.h similarity index 85% rename from internal/core/src/common/LoadInfo.h rename to internal/core/src/common/LoadIndex.h index 654691f8d0..377b533082 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadIndex.h @@ -13,7 +13,7 @@ #include #include -#include "knowhere/index/vector_index/VecIndex.h" +#include "../index/knowhere/knowhere/index/vector_index/VecIndex.h" struct LoadIndexInfo { std::string field_name; @@ -21,9 +21,3 @@ struct LoadIndexInfo { std::map index_params; milvus::knowhere::VecIndexPtr index; }; - -struct LoadFieldDataInfo { - int64_t field_id; - void* blob; - int64_t row_count; -}; diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index 9066f0be91..ffd794ca52 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -18,7 +18,7 @@ #include "query/deprecated/GeneralQuery.h" #include "query/Plan.h" -#include "common/LoadInfo.h" +#include "common/LoadIndex.h" #include "segcore/SegmentInterface.h" namespace milvus { diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index fd83f2fa03..a784f3f246 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -10,20 +10,17 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "SegmentInterface.h" -#include "common/LoadInfo.h" -namespace milvus::segcore { - -class SegmentSealed { - public: - virtual const Schema& - get_schema() = 0; - virtual int64_t - get_row_count() = 0; - virtual void - LoadIndex(const LoadIndexInfo& info) = 0; - virtual void - LoadFieldData(const LoadFieldDataInfo& info) = 0; -}; - -} // namespace milvus::segcore +// class SegmentSealed : public SegmentInternalInterface { +// public: +// const Schema& get_schema() = 0; +// int64_t get_num_chunk() = 0; +// +// explicit SegmentSealed(SchemaPtr schema); +// void set_size(); +// void load_data(FieldId field_id, void* blob, int64_t blob_size); +// +// +// private: +// SchemaPtr schema_; +// } diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 21d5637f1b..9afd0b6262 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -12,7 +12,7 @@ #include "index/knowhere/knowhere/common/BinarySet.h" #include "index/knowhere/knowhere/index/vector_index/VecIndexFactory.h" #include "segcore/load_index_c.h" -#include "common/LoadInfo.h" +#include "common/LoadIndex.h" #include "utils/EasyAssert.h" CStatus diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 61b6cf85eb..523af18427 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -19,7 +19,7 @@ #include #include #include -#include "common/LoadInfo.h" +#include "common/LoadIndex.h" CSegmentBase NewSegment(CCollection collection, uint64_t segment_id) { diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 443b3e32f2..2427de4e03 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -18,7 +18,6 @@ set(MILVUS_TEST_FILES test_reduce.cpp test_interface.cpp test_span.cpp - test_load.cpp ) add_executable(all_tests ${MILVUS_TEST_FILES} diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index d486812a29..7aba224f16 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include #include "test_utils/DataGen.h" diff --git a/internal/core/unittest/test_load.cpp b/internal/core/unittest/test_load.cpp deleted file mode 100644 index 9d6f2b1fd9..0000000000 --- a/internal/core/unittest/test_load.cpp +++ /dev/null @@ -1,6 +0,0 @@ -#include -#include "segcore/SegmentSealed.h" - -TEST(Load, Naive) { - -} \ No newline at end of file diff --git a/internal/distributed/querynode/client.go b/internal/distributed/querynode/client.go index 6e073c8b0f..51f70aea73 100644 --- a/internal/distributed/querynode/client.go +++ b/internal/distributed/querynode/client.go @@ -3,10 +3,60 @@ package querynode import ( "context" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) type Client struct { - ctx context.Context - querypb.QueryNodeClient + ctx context.Context + grpcClient querypb.QueryNodeClient +} + +func (c *Client) Init() { + panic("implement me") +} + +func (c *Client) Start() { + panic("implement me") +} + +func (c *Client) Stop() { + panic("implement me") +} + +func (c *Client) GetServiceStates() (internalpb2.ServiceStates, error) { + panic("implement me") +} + +func (c *Client) GetTimeTickChannel() (string, error) { + panic("implement me") +} + +func (c *Client) GetStatisticsChannel() (string, error) { + panic("implement me") +} + +func (c *Client) AddQueryChannel(in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) { + return c.grpcClient.AddQueryChannel(context.TODO(), in) +} + +func (c *Client) RemoveQueryChannel(in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) { + return c.grpcClient.RemoveQueryChannel(context.TODO(), in) +} + +func (c *Client) WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { + return c.grpcClient.WatchDmChannels(context.TODO(), in) +} + +func (c *Client) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) { + return c.grpcClient.LoadSegments(context.TODO(), in) +} + +func (c *Client) ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) { + return c.grpcClient.ReleaseSegments(context.TODO(), in) +} + +func (c *Client) GetPartitionState(in *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) { + return c.grpcClient.GetPartitionState(context.TODO(), in) } diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index f9966b97b6..68a7b817a6 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -44,25 +44,31 @@ func (s *Server) Start() { } func (s *Server) AddQueryChannel(ctx context.Context, in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) { - return s.node.AddQueryChannel(ctx, in) + // ignore ctx + return s.node.AddQueryChannel(in) } func (s *Server) RemoveQueryChannel(ctx context.Context, in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) { - return s.node.RemoveQueryChannel(ctx, in) + // ignore ctx + return s.node.RemoveQueryChannel(in) } func (s *Server) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { - return s.node.WatchDmChannels(ctx, in) + // ignore ctx + return s.node.WatchDmChannels(in) } func (s *Server) LoadSegments(ctx context.Context, in *querypb.LoadSegmentRequest) (*commonpb.Status, error) { - return s.node.LoadSegments(ctx, in) + // ignore ctx + return s.node.LoadSegments(in) } func (s *Server) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) { - return s.node.ReleaseSegments(ctx, in) + // ignore ctx + return s.node.ReleaseSegments(in) } func (s *Server) GetPartitionState(ctx context.Context, in *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) { - return s.node.GetPartitionState(ctx, in) + // ignore ctx + return s.node.GetPartitionState(in) } diff --git a/internal/querynode/api.go b/internal/querynode/api.go index 6d5afb2329..92f5f520ff 100644 --- a/internal/querynode/api.go +++ b/internal/querynode/api.go @@ -1,7 +1,6 @@ package querynode import ( - "context" "errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -9,187 +8,168 @@ import ( queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) -func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) { - select { - case <-ctx.Done(): - errMsg := "context exceeded" +func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) { + if node.searchService == nil || node.searchService.searchMsgStream == nil { + errMsg := "null search service or null search message stream" status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: errMsg, } return status, errors.New(errMsg) - default: - if node.searchService == nil || node.searchService.searchMsgStream == nil { - errMsg := "null search service or null search message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream) - if !ok { - errMsg := "type assertion failed for search message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream) - if !ok { - errMsg := "type assertion failed for search result message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - // add request channel - pulsarBufSize := Params.SearchPulsarBufSize - consumeChannels := []string{in.RequestChannelID} - consumeSubName := Params.MsgChannelSubName - unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() - searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) - - // add result channel - producerChannels := []string{in.ResultChannelID} - resultStream.CreatePulsarProducers(producerChannels) - - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - } - return status, nil } -} -func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) { - select { - case <-ctx.Done(): - errMsg := "context exceeded" + searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for search message stream" status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: errMsg, } return status, errors.New(errMsg) - default: - if node.searchService == nil || node.searchService.searchMsgStream == nil { - errMsg := "null search service or null search result message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream) - if !ok { - errMsg := "type assertion failed for search message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream) - if !ok { - errMsg := "type assertion failed for search result message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - // remove request channel - pulsarBufSize := Params.SearchPulsarBufSize - consumeChannels := []string{in.RequestChannelID} - consumeSubName := Params.MsgChannelSubName - unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() - // TODO: searchStream.RemovePulsarConsumers(producerChannels) - searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) - - // remove result channel - producerChannels := []string{in.ResultChannelID} - // TODO: resultStream.RemovePulsarProducer(producerChannels) - resultStream.CreatePulsarProducers(producerChannels) - - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - } - return status, nil } -} -func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) { - select { - case <-ctx.Done(): - errMsg := "context exceeded" + resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for search result message stream" status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: errMsg, } return status, errors.New(errMsg) - default: - if node.dataSyncService == nil || node.dataSyncService.dmStream == nil { - errMsg := "null data sync service or null data manipulation stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream) - if !ok { - errMsg := "type assertion failed for dm message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - // add request channel - pulsarBufSize := Params.SearchPulsarBufSize - consumeChannels := in.ChannelIDs - consumeSubName := Params.MsgChannelSubName - unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() - fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) - - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - } - return status, nil } + + // add request channel + pulsarBufSize := Params.SearchPulsarBufSize + consumeChannels := []string{in.RequestChannelID} + consumeSubName := Params.MsgChannelSubName + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + // add result channel + producerChannels := []string{in.ResultChannelID} + resultStream.CreatePulsarProducers(producerChannels) + + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } + return status, nil } -func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) { +func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) { + if node.searchService == nil || node.searchService.searchMsgStream == nil { + errMsg := "null search service or null search result message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for search message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for search result message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + // remove request channel + pulsarBufSize := Params.SearchPulsarBufSize + consumeChannels := []string{in.RequestChannelID} + consumeSubName := Params.MsgChannelSubName + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + // TODO: searchStream.RemovePulsarConsumers(producerChannels) + searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + // remove result channel + producerChannels := []string{in.ResultChannelID} + // TODO: resultStream.RemovePulsarProducer(producerChannels) + resultStream.CreatePulsarProducers(producerChannels) + + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } + return status, nil +} + +func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) { + if node.dataSyncService == nil || node.dataSyncService.dmStream == nil { + errMsg := "null data sync service or null data manipulation stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for dm message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + // add request channel + pulsarBufSize := Params.SearchPulsarBufSize + consumeChannels := in.ChannelIDs + consumeSubName := Params.MsgChannelSubName + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } + return status, nil +} + +func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) { + // TODO: support db + for _, segmentID := range in.SegmentIDs { + hasBeenBuiltIndex := segmentID > 0 // TODO: ??? + indexID := UniqueID(0) // TODO: ??? + err := node.segManager.loadSegment(segmentID, hasBeenBuiltIndex, indexID, in.FieldIDs) + if err != nil { + // TODO: return or continue + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + return status, err + } + } + + return nil, nil +} + +func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { // TODO: implement return nil, nil } -func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { - // TODO: implement - return nil, nil -} - -func (node *QueryNode) GetPartitionState(ctx context.Context, in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) { +func (node *QueryNode) GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) { // TODO: implement return nil, nil } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index f7d941097a..d3426d88b2 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -29,12 +29,12 @@ type Node interface { Start() error Close() - AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) - RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) - WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) - LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) - ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) - GetPartitionState(ctx context.Context, in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) + AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) + RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) + WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) + LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) + ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) + GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) } type QueryNode struct { @@ -53,6 +53,8 @@ type QueryNode struct { loadIndexService *loadIndexService statsService *statsService + segManager *segmentManager + //opentracing tracer opentracing.Tracer closer io.Closer diff --git a/internal/querynode/segment_manager.go b/internal/querynode/segment_manager.go new file mode 100644 index 0000000000..d59b537c4d --- /dev/null +++ b/internal/querynode/segment_manager.go @@ -0,0 +1,126 @@ +package querynode + +import ( + "context" + "errors" + "fmt" + + "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb" + "github.com/zilliztech/milvus-distributed/internal/querynode/client" + "github.com/zilliztech/milvus-distributed/internal/storage" +) + +type segmentManager struct { + replica collectionReplica + + // TODO: replace by client instead of grpc client + dataClient datapb.DataServiceClient + indexBuilderClient indexpb.IndexServiceClient + + queryNodeClient *client.Client + kv kv.Base // minio kv + iCodec storage.InsertCodec +} + +func (s *segmentManager) loadSegment(segmentID UniqueID, hasBeenBuiltIndex bool, indexID UniqueID, vecFieldIDs []int64) error { + // 1. load segment + req := &datapb.InsertBinlogPathRequest{ + SegmentID: segmentID, + } + + pathResponse, err := s.dataClient.GetInsertBinlogPaths(context.TODO(), req) + if err != nil { + return err + } + + if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { + return errors.New("illegal InsertBinlogPathsResponse") + } + + for fieldID, i := range pathResponse.FieldIDs { + paths := pathResponse.Paths[i].Values + blobs := make([]*storage.Blob, 0) + for _, path := range paths { + binLog, err := s.kv.Load(path) + if err != nil { + // TODO: return or continue? + return err + } + blobs = append(blobs, &storage.Blob{ + Key: "", // TODO: key??? + Value: []byte(binLog), + }) + } + _, _, insertData, err := s.iCodec.Deserialize(blobs) + if err != nil { + // TODO: return or continue + return err + } + if len(insertData.Data) != 1 { + return errors.New("we expect only one field in deserialized insert data") + } + + for _, value := range insertData.Data { + switch fieldData := value.(type) { + case storage.BoolFieldData: + numRows := fieldData.NumRows + data := fieldData.Data + fmt.Println(numRows, data, fieldID) + // TODO: s.replica.addSegment() + case storage.Int8FieldData: + // TODO: s.replica.addSegment() + case storage.Int16FieldData: + // TODO: s.replica.addSegment() + case storage.Int32FieldData: + // TODO: s.replica.addSegment() + case storage.Int64FieldData: + // TODO: s.replica.addSegment() + case storage.FloatFieldData: + // TODO: s.replica.addSegment() + case storage.DoubleFieldData: + // TODO: s.replica.addSegment() + default: + // TODO: what if the index has not been built ? + // does the info from hasBeenBuiltIndex is synced with the dataService? + return errors.New("unsupported field data type") + } + } + } + + // 2. load index + // does the info from hasBeenBuiltIndex is synced with the dataService? + if !hasBeenBuiltIndex { + req := &indexpb.IndexFilePathRequest{ + IndexID: indexID, + } + pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), req) + if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return err + } + targetSegment, err := s.replica.getSegmentByID(segmentID) + if err != nil { + return err + } + for _, vecFieldID := range vecFieldIDs { + targetIndexParam, ok := targetSegment.indexParam[vecFieldID] + if !ok { + return errors.New(fmt.Sprint("cannot found index params in segment ", segmentID, " with field = ", vecFieldID)) + } + err := s.queryNodeClient.LoadIndex(pathResponse.IndexFilePaths, segmentID, vecFieldID, "", targetIndexParam) + if err != nil { + return err + } + } + } + return nil +} + +func (s *segmentManager) releaseSegment(in *queryPb.ReleaseSegmentRequest) error { + // TODO: implement + // TODO: release specific field, we need segCore supply relevant interface + return nil +}