From e117a6097ada1b45fde5b60c7d4d210f76860c20 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 17 Nov 2020 16:32:34 +0800 Subject: [PATCH] Fix pulsar url Signed-off-by: bigsheeper --- cmd/reader/reader.go | 4 +- ...seg_container.go => collection_replica.go} | 159 ++++++++++-------- ...ner_test.go => collection_replica_test.go} | 151 ++++++++--------- internal/reader/collection_test.go | 11 +- internal/reader/data_sync_service.go | 48 ++---- internal/reader/data_sync_service_test.go | 15 +- internal/reader/flow_graph_insert_node.go | 18 +- .../flow_graph_msg_stream_input_nodes.go | 10 +- .../reader/flow_graph_service_time_node.go | 8 +- internal/reader/meta_service.go | 26 +-- internal/reader/meta_service_test.go | 129 +++++++------- .../reader/{paramtable.go => param_table.go} | 17 ++ internal/reader/param_table_test.go | 25 +++ internal/reader/partition_test.go | 13 +- internal/reader/query_node.go | 47 +----- internal/reader/query_node_test.go | 6 +- internal/reader/reader.go | 4 +- internal/reader/search_service.go | 20 ++- internal/reader/search_service_test.go | 17 +- internal/reader/stats_service.go | 32 ++-- internal/reader/stats_service_test.go | 35 ++-- internal/util/flowgraph/flow_graph.go | 10 ++ 22 files changed, 405 insertions(+), 400 deletions(-) rename internal/reader/{col_seg_container.go => collection_replica.go} (52%) rename internal/reader/{col_seg_container_test.go => collection_replica_test.go} (74%) rename internal/reader/{paramtable.go => param_table.go} (64%) create mode 100644 internal/reader/param_table_test.go diff --git a/cmd/reader/reader.go b/cmd/reader/reader.go index cc8a48a01e..386ed51539 100644 --- a/cmd/reader/reader.go +++ b/cmd/reader/reader.go @@ -27,8 +27,8 @@ func main() { sig = <-sc cancel() }() - pulsarAddress, _ := reader.Params.PulsarAddress() - reader.StartQueryNode(ctx, pulsarAddress) + + reader.StartQueryNode(ctx) switch sig { case syscall.SIGTERM: diff --git a/internal/reader/col_seg_container.go b/internal/reader/collection_replica.go similarity index 52% rename from internal/reader/col_seg_container.go rename to internal/reader/collection_replica.go index 5020bf4ce7..0de8917cbf 100644 --- a/internal/reader/col_seg_container.go +++ b/internal/reader/collection_replica.go @@ -20,7 +20,19 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" ) -type container interface { +/* + * collectionReplica contains a in-memory local copy of persistent collections. + * In common cases, the system has multiple query nodes. Data of a collection will be + * distributed across all the available query nodes, and each query node's collectionReplica + * will maintain its own share (only part of the collection). + * Every replica tracks a value called tSafe which is the maximum timestamp that the replica + * is up-to-date. + */ +type collectionReplica interface { + // tSafe + getTSafe() Timestamp + setTSafe(t Timestamp) + // collection getCollectionNum() int addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error @@ -44,36 +56,51 @@ type container interface { hasSegment(segmentID UniqueID) bool } -// TODO: rename -type colSegContainer struct { +type collectionReplicaImpl struct { + tSafeMu sync.Mutex + tSafe Timestamp + mu sync.RWMutex collections []*Collection segments map[UniqueID]*Segment } -//----------------------------------------------------------------------------------------------------- collection -func (container *colSegContainer) getCollectionNum() int { - container.mu.RLock() - defer container.mu.RUnlock() - - return len(container.collections) +//----------------------------------------------------------------------------------------------------- tSafe +func (colReplica *collectionReplicaImpl) getTSafe() Timestamp { + colReplica.tSafeMu.Lock() + defer colReplica.tSafeMu.Unlock() + return colReplica.tSafe } -func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error { - container.mu.Lock() - defer container.mu.Unlock() +func (colReplica *collectionReplicaImpl) setTSafe(t Timestamp) { + colReplica.tSafeMu.Lock() + colReplica.tSafe = t + colReplica.tSafeMu.Unlock() +} + +//----------------------------------------------------------------------------------------------------- collection +func (colReplica *collectionReplicaImpl) getCollectionNum() int { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + return len(colReplica.collections) +} + +func (colReplica *collectionReplicaImpl) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() var newCollection = newCollection(collMeta, colMetaBlob) - container.collections = append(container.collections, newCollection) + colReplica.collections = append(colReplica.collections, newCollection) return nil } -func (container *colSegContainer) removeCollection(collectionID UniqueID) error { - collection, err := container.getCollectionByID(collectionID) +func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { + collection, err := colReplica.getCollectionByID(collectionID) - container.mu.Lock() - defer container.mu.Unlock() + colReplica.mu.Lock() + defer colReplica.mu.Unlock() if err != nil { return err @@ -82,11 +109,11 @@ func (container *colSegContainer) removeCollection(collectionID UniqueID) error deleteCollection(collection) tmpCollections := make([]*Collection, 0) - for _, col := range container.collections { + for _, col := range colReplica.collections { if col.ID() == collectionID { for _, p := range *col.Partitions() { for _, s := range *p.Segments() { - delete(container.segments, s.ID()) + delete(colReplica.segments, s.ID()) } } } else { @@ -94,15 +121,15 @@ func (container *colSegContainer) removeCollection(collectionID UniqueID) error } } - container.collections = tmpCollections + colReplica.collections = tmpCollections return nil } -func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Collection, error) { - container.mu.RLock() - defer container.mu.RUnlock() +func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() - for _, collection := range container.collections { + for _, collection := range colReplica.collections { if collection.ID() == collectionID { return collection, nil } @@ -111,11 +138,11 @@ func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Col return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10)) } -func (container *colSegContainer) getCollectionByName(collectionName string) (*Collection, error) { - container.mu.RLock() - defer container.mu.RUnlock() +func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() - for _, collection := range container.collections { + for _, collection := range colReplica.collections { if collection.Name() == collectionName { return collection, nil } @@ -125,14 +152,14 @@ func (container *colSegContainer) getCollectionByName(collectionName string) (*C } //----------------------------------------------------------------------------------------------------- partition -func (container *colSegContainer) addPartition(collectionID UniqueID, partitionTag string) error { - collection, err := container.getCollectionByID(collectionID) +func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error { + collection, err := colReplica.getCollectionByID(collectionID) if err != nil { return err } - container.mu.Lock() - defer container.mu.Unlock() + colReplica.mu.Lock() + defer colReplica.mu.Unlock() var newPartition = newPartition(partitionTag) @@ -140,20 +167,20 @@ func (container *colSegContainer) addPartition(collectionID UniqueID, partitionT return nil } -func (container *colSegContainer) removePartition(collectionID UniqueID, partitionTag string) error { - collection, err := container.getCollectionByID(collectionID) +func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error { + collection, err := colReplica.getCollectionByID(collectionID) if err != nil { return err } - container.mu.Lock() - defer container.mu.Unlock() + colReplica.mu.Lock() + defer colReplica.mu.Unlock() var tmpPartitions = make([]*Partition, 0) for _, p := range *collection.Partitions() { if p.Tag() == partitionTag { for _, s := range *p.Segments() { - delete(container.segments, s.ID()) + delete(colReplica.segments, s.ID()) } } else { tmpPartitions = append(tmpPartitions, p) @@ -164,14 +191,14 @@ func (container *colSegContainer) removePartition(collectionID UniqueID, partiti return nil } -func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { - collection, err := container.getCollectionByID(collectionID) +func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { + collection, err := colReplica.getCollectionByID(collectionID) if err != nil { return nil, err } - container.mu.RLock() - defer container.mu.RUnlock() + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() for _, p := range *collection.Partitions() { if p.Tag() == partitionTag { @@ -183,17 +210,17 @@ func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, parti } //----------------------------------------------------------------------------------------------------- segment -func (container *colSegContainer) getSegmentNum() int { - container.mu.RLock() - defer container.mu.RUnlock() +func (colReplica *collectionReplicaImpl) getSegmentNum() int { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() - return len(container.segments) + return len(colReplica.segments) } -func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSegStats { +func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats { var statisticData = make([]*internalpb.SegmentStats, 0) - for segmentID, segment := range container.segments { + for segmentID, segment := range colReplica.segments { currentMemSize := segment.getMemSize() segment.lastMemSize = currentMemSize segmentNumOfRows := segment.getRowCount() @@ -215,36 +242,36 @@ func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSe } } -func (container *colSegContainer) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error { - collection, err := container.getCollectionByID(collectionID) +func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error { + collection, err := colReplica.getCollectionByID(collectionID) if err != nil { return err } - partition, err := container.getPartitionByTag(collectionID, partitionTag) + partition, err := colReplica.getPartitionByTag(collectionID, partitionTag) if err != nil { return err } - container.mu.Lock() - defer container.mu.Unlock() + colReplica.mu.Lock() + defer colReplica.mu.Unlock() var newSegment = newSegment(collection, segmentID) - container.segments[segmentID] = newSegment + colReplica.segments[segmentID] = newSegment *partition.Segments() = append(*partition.Segments(), newSegment) return nil } -func (container *colSegContainer) removeSegment(segmentID UniqueID) error { - container.mu.Lock() - defer container.mu.Unlock() +func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() var targetPartition *Partition var segmentIndex = -1 - for _, col := range container.collections { + for _, col := range colReplica.collections { for _, p := range *col.Partitions() { for i, s := range *p.Segments() { if s.ID() == segmentID { @@ -255,7 +282,7 @@ func (container *colSegContainer) removeSegment(segmentID UniqueID) error { } } - delete(container.segments, segmentID) + delete(colReplica.segments, segmentID) if targetPartition != nil && segmentIndex > 0 { targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...) @@ -264,11 +291,11 @@ func (container *colSegContainer) removeSegment(segmentID UniqueID) error { return nil } -func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment, error) { - container.mu.RLock() - defer container.mu.RUnlock() +func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() - targetSegment, ok := container.segments[segmentID] + targetSegment, ok := colReplica.segments[segmentID] if !ok { return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10)) @@ -277,11 +304,11 @@ func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment, return targetSegment, nil } -func (container *colSegContainer) hasSegment(segmentID UniqueID) bool { - container.mu.RLock() - defer container.mu.RUnlock() +func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() - _, ok := container.segments[segmentID] + _, ok := colReplica.segments[segmentID] return ok } diff --git a/internal/reader/col_seg_container_test.go b/internal/reader/collection_replica_test.go similarity index 74% rename from internal/reader/col_seg_container_test.go rename to internal/reader/collection_replica_test.go index 0636997c1e..864cd38ed6 100644 --- a/internal/reader/col_seg_container_test.go +++ b/internal/reader/collection_replica_test.go @@ -15,8 +15,7 @@ import ( //----------------------------------------------------------------------------------------------------- collection func TestColSegContainer_addCollection(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -59,20 +58,19 @@ func TestColSegContainer_addCollection(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) } func TestColSegContainer_removeCollection(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" collectionID := UniqueID(0) @@ -116,25 +114,24 @@ func TestColSegContainer_removeCollection(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).removeCollection(collectionID) + err = (*node.replica).removeCollection(collectionID) assert.NoError(t, err) - assert.Equal(t, (*node.container).getCollectionNum(), 0) + assert.Equal(t, (*node.replica).getCollectionNum(), 0) } func TestColSegContainer_getCollectionByID(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -177,17 +174,17 @@ func TestColSegContainer_getCollectionByID(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - targetCollection, err := (*node.container).getCollectionByID(UniqueID(0)) + targetCollection, err := (*node.replica).getCollectionByID(UniqueID(0)) assert.NoError(t, err) assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") @@ -196,8 +193,7 @@ func TestColSegContainer_getCollectionByID(t *testing.T) { func TestColSegContainer_getCollectionByName(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -240,17 +236,17 @@ func TestColSegContainer_getCollectionByName(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - targetCollection, err := (*node.container).getCollectionByName("collection0") + targetCollection, err := (*node.replica).getCollectionByName("collection0") assert.NoError(t, err) assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") @@ -260,8 +256,7 @@ func TestColSegContainer_getCollectionByName(t *testing.T) { //----------------------------------------------------------------------------------------------------- partition func TestColSegContainer_addPartition(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" collectionID := UniqueID(0) @@ -305,20 +300,20 @@ func TestColSegContainer_addPartition(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, collectionID) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.container).addPartition(collectionID, tag) + err := (*node.replica).addPartition(collectionID, tag) assert.NoError(t, err) - partition, err := (*node.container).getPartitionByTag(collectionID, tag) + partition, err := (*node.replica).getPartitionByTag(collectionID, tag) assert.NoError(t, err) assert.Equal(t, partition.partitionTag, "default") } @@ -326,8 +321,7 @@ func TestColSegContainer_addPartition(t *testing.T) { func TestColSegContainer_removePartition(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" collectionID := UniqueID(0) @@ -372,31 +366,30 @@ func TestColSegContainer_removePartition(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, collectionID) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.container).addPartition(collectionID, tag) + err := (*node.replica).addPartition(collectionID, tag) assert.NoError(t, err) - partition, err := (*node.container).getPartitionByTag(collectionID, tag) + partition, err := (*node.replica).getPartitionByTag(collectionID, tag) assert.NoError(t, err) assert.Equal(t, partition.partitionTag, partitionTag) - err = (*node.container).removePartition(collectionID, partitionTag) + err = (*node.replica).removePartition(collectionID, partitionTag) assert.NoError(t, err) } } func TestColSegContainer_getPartitionByTag(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" collectionID := UniqueID(0) @@ -440,20 +433,20 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, collectionID) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.container).addPartition(collectionID, tag) + err := (*node.replica).addPartition(collectionID, tag) assert.NoError(t, err) - partition, err := (*node.container).getPartitionByTag(collectionID, tag) + partition, err := (*node.replica).getPartitionByTag(collectionID, tag) assert.NoError(t, err) assert.Equal(t, partition.partitionTag, "default") assert.NotNil(t, partition) @@ -463,8 +456,7 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) { //----------------------------------------------------------------------------------------------------- segment func TestColSegContainer_addSegment(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" collectionID := UniqueID(0) @@ -508,24 +500,24 @@ func TestColSegContainer_addSegment(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0]) + err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) assert.NoError(t, err) - targetSeg, err := (*node.container).getSegmentByID(UniqueID(i)) + targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } @@ -533,8 +525,7 @@ func TestColSegContainer_addSegment(t *testing.T) { func TestColSegContainer_removeSegment(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" collectionID := UniqueID(0) @@ -578,35 +569,34 @@ func TestColSegContainer_removeSegment(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0]) + err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) assert.NoError(t, err) - targetSeg, err := (*node.container).getSegmentByID(UniqueID(i)) + targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) - err = (*node.container).removeSegment(UniqueID(i)) + err = (*node.replica).removeSegment(UniqueID(i)) assert.NoError(t, err) } } func TestColSegContainer_getSegmentByID(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" collectionID := UniqueID(0) @@ -650,24 +640,24 @@ func TestColSegContainer_getSegmentByID(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0]) + err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) assert.NoError(t, err) - targetSeg, err := (*node.container).getSegmentByID(UniqueID(i)) + targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } @@ -675,8 +665,7 @@ func TestColSegContainer_getSegmentByID(t *testing.T) { func TestColSegContainer_hasSegment(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" collectionID := UniqueID(0) @@ -720,29 +709,29 @@ func TestColSegContainer_hasSegment(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0]) + err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0]) assert.NoError(t, err) const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) + err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID) assert.NoError(t, err) - targetSeg, err := (*node.container).getSegmentByID(UniqueID(i)) + targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) - hasSeg := (*node.container).hasSegment(UniqueID(i)) + hasSeg := (*node.replica).hasSegment(UniqueID(i)) assert.Equal(t, hasSeg, true) - hasSeg = (*node.container).hasSegment(UniqueID(i + 100)) + hasSeg = (*node.replica).hasSegment(UniqueID(i + 100)) assert.Equal(t, hasSeg, false) } } diff --git a/internal/reader/collection_test.go b/internal/reader/collection_test.go index 01cae1211b..58d8747a2e 100644 --- a/internal/reader/collection_test.go +++ b/internal/reader/collection_test.go @@ -13,8 +13,7 @@ import ( func TestCollection_Partitions(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -57,18 +56,18 @@ func TestCollection_Partitions(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.container).addPartition(collection.ID(), tag) + err := (*node.replica).addPartition(collection.ID(), tag) assert.NoError(t, err) } diff --git a/internal/reader/data_sync_service.go b/internal/reader/data_sync_service.go index 1f2dba1f4e..ffc24e382c 100644 --- a/internal/reader/data_sync_service.go +++ b/internal/reader/data_sync_service.go @@ -4,33 +4,23 @@ import ( "context" "log" - "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) type dataSyncService struct { - ctx context.Context - pulsarURL string - fg *flowgraph.TimeTickedFlowGraph + ctx context.Context + fg *flowgraph.TimeTickedFlowGraph - // input streams - dmStream *msgstream.MsgStream - // ddStream *msgstream.MsgStream - // k2sStream *msgstream.MsgStream - - node *QueryNode + replica *collectionReplica } -func newDataSyncService(ctx context.Context, node *QueryNode, pulsarURL string) *dataSyncService { +func newDataSyncService(ctx context.Context, replica *collectionReplica) *dataSyncService { return &dataSyncService{ - ctx: ctx, - pulsarURL: pulsarURL, - fg: nil, + ctx: ctx, + fg: nil, - dmStream: nil, - - node: node, + replica: replica, } } @@ -41,7 +31,6 @@ func (dsService *dataSyncService) start() { func (dsService *dataSyncService) close() { dsService.fg.Close() - (*dsService.dmStream).Close() } func (dsService *dataSyncService) initNodes() { @@ -49,10 +38,10 @@ func (dsService *dataSyncService) initNodes() { dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.pulsarURL) + var dmStreamNode Node = newDmInputNode(dsService.ctx) var filterDmNode Node = newFilteredDmNode() - var insertNode Node = newInsertNode(dsService.node.container) - var serviceTimeNode Node = newServiceTimeNode(dsService.node) + var insertNode Node = newInsertNode(dsService.replica) + var serviceTimeNode Node = newServiceTimeNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&filterDmNode) @@ -90,21 +79,4 @@ func (dsService *dataSyncService) initNodes() { if err != nil { log.Fatal("set edges failed in node:", serviceTimeNode.Name()) } - - dsService.setDmStream(&dmStreamNode) } - -func (dsService *dataSyncService) setDmStream(node *Node) { - if (*node).IsInputNode() { - inStream, ok := (*node).(*InputNode) - dsService.dmStream = inStream.InStream() - if !ok { - log.Fatal("Invalid inputNode") - } - } else { - log.Fatal("stream set failed") - } -} - -func (dsService *dataSyncService) setDdStream(node *Node) {} -func (dsService *dataSyncService) setK2sStream(node *Node) {} diff --git a/internal/reader/data_sync_service_test.go b/internal/reader/data_sync_service_test.go index 4284b74727..a3dbaa9b6e 100644 --- a/internal/reader/data_sync_service_test.go +++ b/internal/reader/data_sync_service_test.go @@ -19,6 +19,7 @@ import ( // NOTE: start pulsar before test func TestManipulationService_Start(t *testing.T) { + Params.Init() var ctx context.Context if closeWithDeadline { @@ -32,7 +33,7 @@ func TestManipulationService_Start(t *testing.T) { // init query node pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) // init meta collectionName := "collection0" @@ -76,20 +77,20 @@ func TestManipulationService_Start(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) - err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) // test data generate @@ -168,7 +169,7 @@ func TestManipulationService_Start(t *testing.T) { assert.NoError(t, err) // dataSync - node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL) + node.dataSyncService = newDataSyncService(node.ctx, node.replica) go node.dataSyncService.start() node.Close() diff --git a/internal/reader/flow_graph_insert_node.go b/internal/reader/flow_graph_insert_node.go index 77500545e8..cb8c9b29ab 100644 --- a/internal/reader/flow_graph_insert_node.go +++ b/internal/reader/flow_graph_insert_node.go @@ -10,7 +10,7 @@ import ( type insertNode struct { BaseNode - container *container + replica *collectionReplica } type InsertData struct { @@ -58,13 +58,13 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...) // check if segment exists, if not, create this segment - if !(*iNode.container).hasSegment(task.SegmentID) { - collection, err := (*iNode.container).getCollectionByName(task.CollectionName) + if !(*iNode.replica).hasSegment(task.SegmentID) { + collection, err := (*iNode.replica).getCollectionByName(task.CollectionName) if err != nil { log.Println(err) continue } - err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID()) + err = (*iNode.replica).addSegment(task.SegmentID, task.PartitionTag, collection.ID()) if err != nil { log.Println(err) continue @@ -74,7 +74,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { // 2. do preInsert for segmentID := range insertData.insertRecords { - var targetSegment, err = (*iNode.container).getSegmentByID(segmentID) + var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID) if err != nil { log.Println("preInsert failed") // TODO: add error handling @@ -102,7 +102,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { } func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { - var targetSegment, err = (*iNode.container).getSegmentByID(segmentID) + var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID) if err != nil { log.Println("cannot find segment:", segmentID) // TODO: add error handling @@ -125,13 +125,13 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn wg.Done() } -func newInsertNode(container *container) *insertNode { +func newInsertNode(replica *collectionReplica) *insertNode { baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &insertNode{ - BaseNode: baseNode, - container: container, + BaseNode: baseNode, + replica: replica, } } diff --git a/internal/reader/flow_graph_msg_stream_input_nodes.go b/internal/reader/flow_graph_msg_stream_input_nodes.go index d00574c078..5eec6306c0 100644 --- a/internal/reader/flow_graph_msg_stream_input_nodes.go +++ b/internal/reader/flow_graph_msg_stream_input_nodes.go @@ -2,22 +2,28 @@ package reader import ( "context" + "log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) -func newDmInputNode(ctx context.Context, pulsarURL string) *flowgraph.InputNode { +func newDmInputNode(ctx context.Context) *flowgraph.InputNode { const ( receiveBufSize = 1024 pulsarBufSize = 1024 ) + msgStreamURL, err := Params.PulsarAddress() + if err != nil { + log.Fatal(err) + } + consumeChannels := []string{"insert"} consumeSubName := "insertSub" insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) - insertStream.SetPulsarCient(pulsarURL) + insertStream.SetPulsarCient(msgStreamURL) unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) diff --git a/internal/reader/flow_graph_service_time_node.go b/internal/reader/flow_graph_service_time_node.go index 50ca674ff8..25c6d74028 100644 --- a/internal/reader/flow_graph_service_time_node.go +++ b/internal/reader/flow_graph_service_time_node.go @@ -6,7 +6,7 @@ import ( type serviceTimeNode struct { BaseNode - node *QueryNode + replica *collectionReplica } func (stNode *serviceTimeNode) Name() string { @@ -28,17 +28,17 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { } // update service time - stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax) + (*stNode.replica).setTSafe(serviceTimeMsg.timeRange.timestampMax) return nil } -func newServiceTimeNode(node *QueryNode) *serviceTimeNode { +func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode { baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &serviceTimeNode{ BaseNode: baseNode, - node: node, + replica: replica, } } diff --git a/internal/reader/meta_service.go b/internal/reader/meta_service.go index 8af38ca830..2b98161b2a 100644 --- a/internal/reader/meta_service.go +++ b/internal/reader/meta_service.go @@ -24,12 +24,12 @@ const ( ) type metaService struct { - ctx context.Context - kvBase *kv.EtcdKV - container *container + ctx context.Context + kvBase *kv.EtcdKV + replica *collectionReplica } -func newMetaService(ctx context.Context, container *container) *metaService { +func newMetaService(ctx context.Context, replica *collectionReplica) *metaService { ETCDAddr, err := Params.EtcdAddress() if err != nil { panic(err) @@ -46,9 +46,9 @@ func newMetaService(ctx context.Context, container *container) *metaService { }) return &metaService{ - ctx: ctx, - kvBase: kv.NewEtcdKV(cli, ETCDRootPath), - container: container, + ctx: ctx, + kvBase: kv.NewEtcdKV(cli, ETCDRootPath), + replica: replica, } } @@ -164,12 +164,12 @@ func (mService *metaService) processCollectionCreate(id string, value string) { col := mService.collectionUnmarshal(value) if col != nil { - err := (*mService.container).addCollection(col, value) + err := (*mService.replica).addCollection(col, value) if err != nil { log.Println(err) } for _, partitionTag := range col.PartitionTags { - err = (*mService.container).addPartition(col.ID, partitionTag) + err = (*mService.replica).addPartition(col.ID, partitionTag) if err != nil { log.Println(err) } @@ -187,7 +187,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) { // TODO: what if seg == nil? We need to notify master and return rpc request failed if seg != nil { - err := (*mService.container).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID) + err := (*mService.replica).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID) if err != nil { log.Println(err) return @@ -216,7 +216,7 @@ func (mService *metaService) processSegmentModify(id string, value string) { } if seg != nil { - targetSegment, err := (*mService.container).getSegmentByID(seg.SegmentID) + targetSegment, err := (*mService.replica).getSegmentByID(seg.SegmentID) if err != nil { log.Println(err) return @@ -251,7 +251,7 @@ func (mService *metaService) processSegmentDelete(id string) { log.Println("Cannot parse segment id:" + id) } - err = (*mService.container).removeSegment(segmentID) + err = (*mService.replica).removeSegment(segmentID) if err != nil { log.Println(err) return @@ -266,7 +266,7 @@ func (mService *metaService) processCollectionDelete(id string) { log.Println("Cannot parse collection id:" + id) } - err = (*mService.container).removeCollection(collectionID) + err = (*mService.replica).removeCollection(collectionID) if err != nil { log.Println(err) return diff --git a/internal/reader/meta_service_test.go b/internal/reader/meta_service_test.go index 4c366b3535..b2c908aca3 100644 --- a/internal/reader/meta_service_test.go +++ b/internal/reader/meta_service_test.go @@ -27,9 +27,8 @@ func TestMetaService_start(t *testing.T) { } // init query node - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) (*node.metaService).start() } @@ -187,9 +186,8 @@ func TestMetaService_processCollectionCreate(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) id := "0" value := `schema: < @@ -217,10 +215,10 @@ func TestMetaService_processCollectionCreate(t *testing.T) { node.metaService.processCollectionCreate(id, value) - collectionNum := (*node.container).getCollectionNum() + collectionNum := (*node.replica).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.container).getCollectionByName("test") + collection, err := (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) } @@ -233,9 +231,8 @@ func TestMetaService_processSegmentCreate(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -278,10 +275,10 @@ func TestMetaService_processSegmentCreate(t *testing.T) { colMetaBlob, err := proto.Marshal(&collectionMeta) assert.NoError(t, err) - err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) + err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) - err = (*node.container).addPartition(UniqueID(0), "default") + err = (*node.replica).addPartition(UniqueID(0), "default") assert.NoError(t, err) id := "0" @@ -293,7 +290,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) { (*node.metaService).processSegmentCreate(id, value) - s, err := (*node.container).getSegmentByID(UniqueID(0)) + s, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) } @@ -306,9 +303,8 @@ func TestMetaService_processCreate(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) key1 := "by-dev/collection/0" msg1 := `schema: < @@ -335,10 +331,10 @@ func TestMetaService_processCreate(t *testing.T) { ` (*node.metaService).processCreate(key1, msg1) - collectionNum := (*node.container).getCollectionNum() + collectionNum := (*node.replica).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.container).getCollectionByName("test") + collection, err := (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -350,7 +346,7 @@ func TestMetaService_processCreate(t *testing.T) { ` (*node.metaService).processCreate(key2, msg2) - s, err := (*node.container).getSegmentByID(UniqueID(0)) + s, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) } @@ -363,9 +359,8 @@ func TestMetaService_processSegmentModify(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -408,10 +403,10 @@ func TestMetaService_processSegmentModify(t *testing.T) { colMetaBlob, err := proto.Marshal(&collectionMeta) assert.NoError(t, err) - err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) + err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) - err = (*node.container).addPartition(UniqueID(0), "default") + err = (*node.replica).addPartition(UniqueID(0), "default") assert.NoError(t, err) id := "0" @@ -422,7 +417,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { ` (*node.metaService).processSegmentCreate(id, value) - s, err := (*node.container).getSegmentByID(UniqueID(0)) + s, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) @@ -434,7 +429,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { // TODO: modify segment for testing processCollectionModify (*node.metaService).processSegmentModify(id, newValue) - seg, err := (*node.container).getSegmentByID(UniqueID(0)) + seg, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) } @@ -447,9 +442,8 @@ func TestMetaService_processCollectionModify(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) id := "0" value := `schema: < @@ -476,10 +470,10 @@ func TestMetaService_processCollectionModify(t *testing.T) { ` (*node.metaService).processCollectionCreate(id, value) - collectionNum := (*node.container).getCollectionNum() + collectionNum := (*node.replica).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.container).getCollectionByName("test") + collection, err := (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -508,7 +502,7 @@ func TestMetaService_processCollectionModify(t *testing.T) { ` (*node.metaService).processCollectionModify(id, newValue) - collection, err = (*node.container).getCollectionByName("test") + collection, err = (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) } @@ -521,9 +515,8 @@ func TestMetaService_processModify(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) key1 := "by-dev/collection/0" msg1 := `schema: < @@ -550,10 +543,10 @@ func TestMetaService_processModify(t *testing.T) { ` (*node.metaService).processCreate(key1, msg1) - collectionNum := (*node.container).getCollectionNum() + collectionNum := (*node.replica).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.container).getCollectionByName("test") + collection, err := (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -565,7 +558,7 @@ func TestMetaService_processModify(t *testing.T) { ` (*node.metaService).processCreate(key2, msg2) - s, err := (*node.container).getSegmentByID(UniqueID(0)) + s, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, s.segmentID, UniqueID(0)) @@ -595,7 +588,7 @@ func TestMetaService_processModify(t *testing.T) { ` (*node.metaService).processModify(key1, msg3) - collection, err = (*node.container).getCollectionByName("test") + collection, err = (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -607,7 +600,7 @@ func TestMetaService_processModify(t *testing.T) { // TODO: modify segment for testing processCollectionModify (*node.metaService).processModify(key2, msg4) - seg, err := (*node.container).getSegmentByID(UniqueID(0)) + seg, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) } @@ -620,9 +613,8 @@ func TestMetaService_processSegmentDelete(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -665,10 +657,10 @@ func TestMetaService_processSegmentDelete(t *testing.T) { colMetaBlob, err := proto.Marshal(&collectionMeta) assert.NoError(t, err) - err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob)) + err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob)) assert.NoError(t, err) - err = (*node.container).addPartition(UniqueID(0), "default") + err = (*node.replica).addPartition(UniqueID(0), "default") assert.NoError(t, err) id := "0" @@ -679,12 +671,12 @@ func TestMetaService_processSegmentDelete(t *testing.T) { ` (*node.metaService).processSegmentCreate(id, value) - seg, err := (*node.container).getSegmentByID(UniqueID(0)) + seg, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) (*node.metaService).processSegmentDelete("0") - mapSize := (*node.container).getSegmentNum() + mapSize := (*node.replica).getSegmentNum() assert.Equal(t, mapSize, 0) } @@ -696,9 +688,8 @@ func TestMetaService_processCollectionDelete(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) id := "0" value := `schema: < @@ -725,15 +716,15 @@ func TestMetaService_processCollectionDelete(t *testing.T) { ` (*node.metaService).processCollectionCreate(id, value) - collectionNum := (*node.container).getCollectionNum() + collectionNum := (*node.replica).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.container).getCollectionByName("test") + collection, err := (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) (*node.metaService).processCollectionDelete(id) - collectionNum = (*node.container).getCollectionNum() + collectionNum = (*node.replica).getCollectionNum() assert.Equal(t, collectionNum, 0) } @@ -745,9 +736,8 @@ func TestMetaService_processDelete(t *testing.T) { defer cancel() // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) key1 := "by-dev/collection/0" msg1 := `schema: < @@ -774,10 +764,10 @@ func TestMetaService_processDelete(t *testing.T) { ` (*node.metaService).processCreate(key1, msg1) - collectionNum := (*node.container).getCollectionNum() + collectionNum := (*node.replica).getCollectionNum() assert.Equal(t, collectionNum, 1) - collection, err := (*node.container).getCollectionByName("test") + collection, err := (*node.replica).getCollectionByName("test") assert.NoError(t, err) assert.Equal(t, collection.ID(), UniqueID(0)) @@ -789,15 +779,15 @@ func TestMetaService_processDelete(t *testing.T) { ` (*node.metaService).processCreate(key2, msg2) - seg, err := (*node.container).getSegmentByID(UniqueID(0)) + seg, err := (*node.replica).getSegmentByID(UniqueID(0)) assert.NoError(t, err) assert.Equal(t, seg.segmentID, UniqueID(0)) (*node.metaService).processDelete(key1) - collectionsSize := (*node.container).getCollectionNum() + collectionsSize := (*node.replica).getCollectionNum() assert.Equal(t, collectionsSize, 0) - mapSize := (*node.container).getSegmentNum() + mapSize := (*node.replica).getSegmentNum() assert.Equal(t, mapSize, 0) } @@ -815,9 +805,8 @@ func TestMetaService_processResp(t *testing.T) { } // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) metaChan := (*node.metaService).kvBase.WatchWithPrefix("") @@ -843,9 +832,8 @@ func TestMetaService_loadCollections(t *testing.T) { } // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) err2 := (*node.metaService).loadCollections() assert.Nil(t, err2) @@ -865,9 +853,8 @@ func TestMetaService_loadSegments(t *testing.T) { } // init metaService - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) - node.metaService = newMetaService(ctx, node.container) + node := NewQueryNode(ctx, 0) + node.metaService = newMetaService(ctx, node.replica) err2 := (*node.metaService).loadSegments() assert.Nil(t, err2) diff --git a/internal/reader/paramtable.go b/internal/reader/param_table.go similarity index 64% rename from internal/reader/paramtable.go rename to internal/reader/param_table.go index c60b5b2f87..ba13e99391 100644 --- a/internal/reader/paramtable.go +++ b/internal/reader/param_table.go @@ -16,6 +16,23 @@ func (p *ParamTable) InitParamTable() { p.Init() } +func (p *ParamTable) PulsarAddress() (string, error) { + url, err := p.Load("_PulsarAddress") + if err != nil { + panic(err) + } + return "pulsar://" + url, nil +} + +func (p *ParamTable) QueryNodeID() int { + queryNodeID, _ := p.Load("reader.clientid") + id, err := strconv.Atoi(queryNodeID) + if err != nil { + panic(err) + } + return id +} + func (p *ParamTable) TopicStart() int { topicStart, _ := p.Load("reader.topicstart") topicStartNum, err := strconv.Atoi(topicStart) diff --git a/internal/reader/param_table_test.go b/internal/reader/param_table_test.go new file mode 100644 index 0000000000..e8acc50487 --- /dev/null +++ b/internal/reader/param_table_test.go @@ -0,0 +1,25 @@ +package reader + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable_QueryNodeID(t *testing.T) { + Params.InitParamTable() + id := Params.QueryNodeID() + assert.Equal(t, id, 0) +} + +func TestParamTable_TopicStart(t *testing.T) { + Params.InitParamTable() + topicStart := Params.TopicStart() + assert.Equal(t, topicStart, 0) +} + +func TestParamTable_TopicEnd(t *testing.T) { + Params.InitParamTable() + topicEnd := Params.TopicEnd() + assert.Equal(t, topicEnd, 128) +} diff --git a/internal/reader/partition_test.go b/internal/reader/partition_test.go index 5311a21448..9c19eb0852 100644 --- a/internal/reader/partition_test.go +++ b/internal/reader/partition_test.go @@ -13,8 +13,7 @@ import ( func TestPartition_Segments(t *testing.T) { ctx := context.Background() - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) collectionName := "collection0" fieldVec := schemapb.FieldSchema{ @@ -57,17 +56,17 @@ func TestPartition_Segments(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) for _, tag := range collectionMeta.PartitionTags { - err := (*node.container).addPartition(collection.ID(), tag) + err := (*node.replica).addPartition(collection.ID(), tag) assert.NoError(t, err) } @@ -78,7 +77,7 @@ func TestPartition_Segments(t *testing.T) { const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := (*node.container).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID()) + err := (*node.replica).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID()) assert.NoError(t, err) } diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index de06ece2d0..464fe35908 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -14,18 +14,14 @@ import "C" import ( "context" - "sync" ) type QueryNode struct { ctx context.Context QueryNodeID uint64 - pulsarURL string - tSafe tSafe - - container *container + replica *collectionReplica dataSyncService *dataSyncService metaService *metaService @@ -33,36 +29,21 @@ type QueryNode struct { statsService *statsService } -type tSafe interface { - getTSafe() Timestamp - setTSafe(t Timestamp) -} - -type serviceTime struct { - tSafeMu sync.Mutex - time Timestamp -} - -func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode { +func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { segmentsMap := make(map[int64]*Segment) collections := make([]*Collection, 0) - var container container = &colSegContainer{ + var replica collectionReplica = &collectionReplicaImpl{ collections: collections, segments: segmentsMap, } - var tSafe tSafe = &serviceTime{} - return &QueryNode{ ctx: ctx, QueryNodeID: queryNodeID, - pulsarURL: pulsarURL, - tSafe: tSafe, - - container: &container, + replica: &replica, dataSyncService: nil, metaService: nil, @@ -72,10 +53,10 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu } func (node *QueryNode) Start() { - node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL) - node.searchService = newSearchService(node.ctx, node, node.pulsarURL) - node.metaService = newMetaService(node.ctx, node.container) - node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) + node.dataSyncService = newDataSyncService(node.ctx, node.replica) + node.searchService = newSearchService(node.ctx, node.replica) + node.metaService = newMetaService(node.ctx, node.replica) + node.statsService = newStatsService(node.ctx, node.replica) go node.dataSyncService.start() // go node.searchService.start() @@ -86,15 +67,3 @@ func (node *QueryNode) Start() { func (node *QueryNode) Close() { // TODO: close services } - -func (st *serviceTime) getTSafe() Timestamp { - st.tSafeMu.Lock() - defer st.tSafeMu.Unlock() - return st.time -} - -func (st *serviceTime) setTSafe(t Timestamp) { - st.tSafeMu.Lock() - st.time = t - st.tSafeMu.Unlock() -} diff --git a/internal/reader/query_node_test.go b/internal/reader/query_node_test.go index b7de266460..dcae19f3fa 100644 --- a/internal/reader/query_node_test.go +++ b/internal/reader/query_node_test.go @@ -23,10 +23,6 @@ func TestQueryNode_start(t *testing.T) { ctx = context.Background() } - pulsarAddr, err := Params.PulsarAddress() - if err != nil { - panic(err) - } - node := NewQueryNode(ctx, 0, "pulsar://"+pulsarAddr) + node := NewQueryNode(ctx, 0) node.Start() } diff --git a/internal/reader/reader.go b/internal/reader/reader.go index d4d6b7a5b6..91852eac13 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -8,8 +8,8 @@ func Init() { Params.Init() } -func StartQueryNode(ctx context.Context, pulsarURL string) { - node := NewQueryNode(ctx, 0, pulsarURL) +func StartQueryNode(ctx context.Context) { + node := NewQueryNode(ctx, 0) node.Start() } diff --git a/internal/reader/search_service.go b/internal/reader/search_service.go index 332c1c8c81..0a08039bd4 100644 --- a/internal/reader/search_service.go +++ b/internal/reader/search_service.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "log" "sort" "github.com/golang/protobuf/proto" @@ -19,7 +20,7 @@ type searchService struct { ctx context.Context cancel context.CancelFunc - node *QueryNode + replica *collectionReplica searchMsgStream *msgstream.MsgStream searchResultMsgStream *msgstream.MsgStream } @@ -31,24 +32,29 @@ type SearchResult struct { ResultDistances []float32 } -func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *searchService { +func newSearchService(ctx context.Context, replica *collectionReplica) *searchService { const ( //TODO:: read config file receiveBufSize = 1024 pulsarBufSize = 1024 ) + msgStreamURL, err := Params.PulsarAddress() + if err != nil { + log.Fatal(err) + } + consumeChannels := []string{"search"} consumeSubName := "subSearch" searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) - searchStream.SetPulsarCient(pulsarURL) + searchStream.SetPulsarCient(msgStreamURL) unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) var inputStream msgstream.MsgStream = searchStream producerChannels := []string{"searchResult"} searchResultStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) - searchResultStream.SetPulsarCient(pulsarURL) + searchResultStream.SetPulsarCient(msgStreamURL) searchResultStream.CreatePulsarProducers(producerChannels) var outputStream msgstream.MsgStream = searchResultStream @@ -57,7 +63,7 @@ func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *s ctx: searchServiceCtx, cancel: searchServiceCancel, - node: node, + replica: replica, searchMsgStream: &inputStream, searchResultMsgStream: &outputStream, } @@ -120,7 +126,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error { } collectionName := query.CollectionName partitionTags := query.PartitionTags - collection, err := (*ss.node.container).getCollectionByName(collectionName) + collection, err := (*ss.replica).getCollectionByName(collectionName) if err != nil { return err } @@ -150,7 +156,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error { // 3. Do search in all segments for _, partitionTag := range partitionTags { - partition, err := (*ss.node.container).getPartitionByTag(collectionID, partitionTag) + partition, err := (*ss.replica).getPartitionByTag(collectionID, partitionTag) if err != nil { return err } diff --git a/internal/reader/search_service_test.go b/internal/reader/search_service_test.go index 42ab772b78..b8e9fcffcb 100644 --- a/internal/reader/search_service_test.go +++ b/internal/reader/search_service_test.go @@ -21,12 +21,13 @@ import ( ) func TestSearch_Search(t *testing.T) { + Params.Init() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // init query node pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) // init meta collectionName := "collection0" @@ -70,20 +71,20 @@ func TestSearch_Search(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) - err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) // test data generate @@ -162,7 +163,7 @@ func TestSearch_Search(t *testing.T) { assert.NoError(t, err) // dataSync - node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL) + node.dataSyncService = newDataSyncService(node.ctx, node.replica) go node.dataSyncService.start() time.Sleep(2 * time.Second) @@ -233,7 +234,7 @@ func TestSearch_Search(t *testing.T) { err = searchMsgStream.Produce(&msgPackSearch) assert.NoError(t, err) - node.searchService = newSearchService(node.ctx, node, node.pulsarURL) + node.searchService = newSearchService(node.ctx, node.replica) go node.searchService.start() time.Sleep(2 * time.Second) diff --git a/internal/reader/stats_service.go b/internal/reader/stats_service.go index 1487ceee4a..9f118e1dd1 100644 --- a/internal/reader/stats_service.go +++ b/internal/reader/stats_service.go @@ -13,21 +13,17 @@ import ( ) type statsService struct { - ctx context.Context - pulsarURL string - - msgStream *msgstream.MsgStream - - container *container + ctx context.Context + statsStream *msgstream.MsgStream + replica *collectionReplica } -func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService { +func newStatsService(ctx context.Context, replica *collectionReplica) *statsService { return &statsService{ - ctx: ctx, - pulsarURL: pulsarURL, - msgStream: nil, - container: container, + ctx: ctx, + statsStream: nil, + replica: replica, } } @@ -38,16 +34,20 @@ func (sService *statsService) start() { ) // start pulsar + msgStreamURL, err := Params.PulsarAddress() + if err != nil { + log.Fatal(err) + } producerChannels := []string{"statistic"} statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize) - statsStream.SetPulsarCient(sService.pulsarURL) + statsStream.SetPulsarCient(msgStreamURL) statsStream.CreatePulsarProducers(producerChannels) var statsMsgStream msgstream.MsgStream = statsStream - sService.msgStream = &statsMsgStream - (*sService.msgStream).Start() + sService.statsStream = &statsMsgStream + (*sService.statsStream).Start() // start service fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") @@ -62,7 +62,7 @@ func (sService *statsService) start() { } func (sService *statsService) sendSegmentStatistic() { - statisticData := (*sService.container).getSegmentStatistics() + statisticData := (*sService.replica).getSegmentStatistics() // fmt.Println("Publish segment statistic") // fmt.Println(statisticData) @@ -80,7 +80,7 @@ func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSeg var msgPack = msgstream.MsgPack{ Msgs: []msgstream.TsMsg{msg}, } - err := (*sService.msgStream).Produce(&msgPack) + err := (*sService.statsStream).Produce(&msgPack) if err != nil { log.Println(err) } diff --git a/internal/reader/stats_service_test.go b/internal/reader/stats_service_test.go index d9f6ef04f7..7e1dfb23fe 100644 --- a/internal/reader/stats_service_test.go +++ b/internal/reader/stats_service_test.go @@ -15,6 +15,7 @@ import ( // NOTE: start pulsar before test func TestStatsService_start(t *testing.T) { + Params.Init() var ctx context.Context if closeWithDeadline { @@ -27,8 +28,7 @@ func TestStatsService_start(t *testing.T) { } // init query node - pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) // init meta collectionName := "collection0" @@ -72,29 +72,30 @@ func TestStatsService_start(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) - err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) // start stats service - node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) + node.statsService = newStatsService(node.ctx, node.replica) node.statsService.start() } // NOTE: start pulsar before test func TestSegmentManagement_SegmentStatisticService(t *testing.T) { + Params.Init() var ctx context.Context if closeWithDeadline { @@ -108,7 +109,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) { // init query node pulsarURL := "pulsar://localhost:6650" - node := NewQueryNode(ctx, 0, pulsarURL) + node := NewQueryNode(ctx, 0) // init meta collectionName := "collection0" @@ -152,20 +153,20 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) { collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) - var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob) + var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) - collection, err := (*node.container).getCollectionByName(collectionName) + collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) - assert.Equal(t, (*node.container).getCollectionNum(), 1) + assert.Equal(t, (*node.replica).getCollectionNum(), 1) - err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) + err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) - err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) + err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) const receiveBufSize = 1024 @@ -178,9 +179,9 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) { var statsMsgStream msgstream.MsgStream = statsStream - node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL) - node.statsService.msgStream = &statsMsgStream - (*node.statsService.msgStream).Start() + node.statsService = newStatsService(node.ctx, node.replica) + node.statsService.statsStream = &statsMsgStream + (*node.statsService.statsStream).Start() // send stats node.statsService.sendSegmentStatistic() diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 822842c66c..79b49242e4 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -2,6 +2,7 @@ package flowgraph import ( "context" + "log" "sync" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -68,6 +69,15 @@ func (fg *TimeTickedFlowGraph) Start() { func (fg *TimeTickedFlowGraph) Close() { for _, v := range fg.nodeCtx { + // close message stream + if (*v.node).IsInputNode() { + inStream, ok := (*v.node).(*InputNode) + if !ok { + log.Fatal("Invalid inputNode") + } + (*inStream.inStream).Close() + } + // close input channels v.Close() } }