Fix pulsar url

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2020-11-17 16:32:34 +08:00 committed by yefu.chen
parent ad089eafe5
commit e117a6097a
22 changed files with 405 additions and 400 deletions

View File

@ -27,8 +27,8 @@ func main() {
sig = <-sc
cancel()
}()
pulsarAddress, _ := reader.Params.PulsarAddress()
reader.StartQueryNode(ctx, pulsarAddress)
reader.StartQueryNode(ctx)
switch sig {
case syscall.SIGTERM:

View File

@ -20,7 +20,19 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
)
type container interface {
/*
* collectionReplica contains a in-memory local copy of persistent collections.
* In common cases, the system has multiple query nodes. Data of a collection will be
* distributed across all the available query nodes, and each query node's collectionReplica
* will maintain its own share (only part of the collection).
* Every replica tracks a value called tSafe which is the maximum timestamp that the replica
* is up-to-date.
*/
type collectionReplica interface {
// tSafe
getTSafe() Timestamp
setTSafe(t Timestamp)
// collection
getCollectionNum() int
addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error
@ -44,36 +56,51 @@ type container interface {
hasSegment(segmentID UniqueID) bool
}
// TODO: rename
type colSegContainer struct {
type collectionReplicaImpl struct {
tSafeMu sync.Mutex
tSafe Timestamp
mu sync.RWMutex
collections []*Collection
segments map[UniqueID]*Segment
}
//----------------------------------------------------------------------------------------------------- collection
func (container *colSegContainer) getCollectionNum() int {
container.mu.RLock()
defer container.mu.RUnlock()
return len(container.collections)
//----------------------------------------------------------------------------------------------------- tSafe
func (colReplica *collectionReplicaImpl) getTSafe() Timestamp {
colReplica.tSafeMu.Lock()
defer colReplica.tSafeMu.Unlock()
return colReplica.tSafe
}
func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error {
container.mu.Lock()
defer container.mu.Unlock()
func (colReplica *collectionReplicaImpl) setTSafe(t Timestamp) {
colReplica.tSafeMu.Lock()
colReplica.tSafe = t
colReplica.tSafeMu.Unlock()
}
//----------------------------------------------------------------------------------------------------- collection
func (colReplica *collectionReplicaImpl) getCollectionNum() int {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return len(colReplica.collections)
}
func (colReplica *collectionReplicaImpl) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
var newCollection = newCollection(collMeta, colMetaBlob)
container.collections = append(container.collections, newCollection)
colReplica.collections = append(colReplica.collections, newCollection)
return nil
}
func (container *colSegContainer) removeCollection(collectionID UniqueID) error {
collection, err := container.getCollectionByID(collectionID)
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
collection, err := colReplica.getCollectionByID(collectionID)
container.mu.Lock()
defer container.mu.Unlock()
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
if err != nil {
return err
@ -82,11 +109,11 @@ func (container *colSegContainer) removeCollection(collectionID UniqueID) error
deleteCollection(collection)
tmpCollections := make([]*Collection, 0)
for _, col := range container.collections {
for _, col := range colReplica.collections {
if col.ID() == collectionID {
for _, p := range *col.Partitions() {
for _, s := range *p.Segments() {
delete(container.segments, s.ID())
delete(colReplica.segments, s.ID())
}
}
} else {
@ -94,15 +121,15 @@ func (container *colSegContainer) removeCollection(collectionID UniqueID) error
}
}
container.collections = tmpCollections
colReplica.collections = tmpCollections
return nil
}
func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Collection, error) {
container.mu.RLock()
defer container.mu.RUnlock()
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
for _, collection := range container.collections {
for _, collection := range colReplica.collections {
if collection.ID() == collectionID {
return collection, nil
}
@ -111,11 +138,11 @@ func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Col
return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
}
func (container *colSegContainer) getCollectionByName(collectionName string) (*Collection, error) {
container.mu.RLock()
defer container.mu.RUnlock()
func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
for _, collection := range container.collections {
for _, collection := range colReplica.collections {
if collection.Name() == collectionName {
return collection, nil
}
@ -125,14 +152,14 @@ func (container *colSegContainer) getCollectionByName(collectionName string) (*C
}
//----------------------------------------------------------------------------------------------------- partition
func (container *colSegContainer) addPartition(collectionID UniqueID, partitionTag string) error {
collection, err := container.getCollectionByID(collectionID)
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error {
collection, err := colReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
container.mu.Lock()
defer container.mu.Unlock()
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
var newPartition = newPartition(partitionTag)
@ -140,20 +167,20 @@ func (container *colSegContainer) addPartition(collectionID UniqueID, partitionT
return nil
}
func (container *colSegContainer) removePartition(collectionID UniqueID, partitionTag string) error {
collection, err := container.getCollectionByID(collectionID)
func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error {
collection, err := colReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
container.mu.Lock()
defer container.mu.Unlock()
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
var tmpPartitions = make([]*Partition, 0)
for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag {
for _, s := range *p.Segments() {
delete(container.segments, s.ID())
delete(colReplica.segments, s.ID())
}
} else {
tmpPartitions = append(tmpPartitions, p)
@ -164,14 +191,14 @@ func (container *colSegContainer) removePartition(collectionID UniqueID, partiti
return nil
}
func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
collection, err := container.getCollectionByID(collectionID)
func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
collection, err := colReplica.getCollectionByID(collectionID)
if err != nil {
return nil, err
}
container.mu.RLock()
defer container.mu.RUnlock()
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag {
@ -183,17 +210,17 @@ func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, parti
}
//----------------------------------------------------------------------------------------------------- segment
func (container *colSegContainer) getSegmentNum() int {
container.mu.RLock()
defer container.mu.RUnlock()
func (colReplica *collectionReplicaImpl) getSegmentNum() int {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return len(container.segments)
return len(colReplica.segments)
}
func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSegStats {
func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats {
var statisticData = make([]*internalpb.SegmentStats, 0)
for segmentID, segment := range container.segments {
for segmentID, segment := range colReplica.segments {
currentMemSize := segment.getMemSize()
segment.lastMemSize = currentMemSize
segmentNumOfRows := segment.getRowCount()
@ -215,36 +242,36 @@ func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSe
}
}
func (container *colSegContainer) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error {
collection, err := container.getCollectionByID(collectionID)
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error {
collection, err := colReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
partition, err := container.getPartitionByTag(collectionID, partitionTag)
partition, err := colReplica.getPartitionByTag(collectionID, partitionTag)
if err != nil {
return err
}
container.mu.Lock()
defer container.mu.Unlock()
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
var newSegment = newSegment(collection, segmentID)
container.segments[segmentID] = newSegment
colReplica.segments[segmentID] = newSegment
*partition.Segments() = append(*partition.Segments(), newSegment)
return nil
}
func (container *colSegContainer) removeSegment(segmentID UniqueID) error {
container.mu.Lock()
defer container.mu.Unlock()
func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
var targetPartition *Partition
var segmentIndex = -1
for _, col := range container.collections {
for _, col := range colReplica.collections {
for _, p := range *col.Partitions() {
for i, s := range *p.Segments() {
if s.ID() == segmentID {
@ -255,7 +282,7 @@ func (container *colSegContainer) removeSegment(segmentID UniqueID) error {
}
}
delete(container.segments, segmentID)
delete(colReplica.segments, segmentID)
if targetPartition != nil && segmentIndex > 0 {
targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...)
@ -264,11 +291,11 @@ func (container *colSegContainer) removeSegment(segmentID UniqueID) error {
return nil
}
func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment, error) {
container.mu.RLock()
defer container.mu.RUnlock()
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
targetSegment, ok := container.segments[segmentID]
targetSegment, ok := colReplica.segments[segmentID]
if !ok {
return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10))
@ -277,11 +304,11 @@ func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment,
return targetSegment, nil
}
func (container *colSegContainer) hasSegment(segmentID UniqueID) bool {
container.mu.RLock()
defer container.mu.RUnlock()
func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
_, ok := container.segments[segmentID]
_, ok := colReplica.segments[segmentID]
return ok
}

View File

@ -15,8 +15,7 @@ import (
//----------------------------------------------------------------------------------------------------- collection
func TestColSegContainer_addCollection(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
@ -59,20 +58,19 @@ func TestColSegContainer_addCollection(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
}
func TestColSegContainer_removeCollection(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
@ -116,25 +114,24 @@ func TestColSegContainer_removeCollection(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).removeCollection(collectionID)
err = (*node.replica).removeCollection(collectionID)
assert.NoError(t, err)
assert.Equal(t, (*node.container).getCollectionNum(), 0)
assert.Equal(t, (*node.replica).getCollectionNum(), 0)
}
func TestColSegContainer_getCollectionByID(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
@ -177,17 +174,17 @@ func TestColSegContainer_getCollectionByID(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
targetCollection, err := (*node.container).getCollectionByID(UniqueID(0))
targetCollection, err := (*node.replica).getCollectionByID(UniqueID(0))
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
@ -196,8 +193,7 @@ func TestColSegContainer_getCollectionByID(t *testing.T) {
func TestColSegContainer_getCollectionByName(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
@ -240,17 +236,17 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
targetCollection, err := (*node.container).getCollectionByName("collection0")
targetCollection, err := (*node.replica).getCollectionByName("collection0")
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
@ -260,8 +256,7 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
//----------------------------------------------------------------------------------------------------- partition
func TestColSegContainer_addPartition(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
@ -305,20 +300,20 @@ func TestColSegContainer_addPartition(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collectionID, tag)
err := (*node.replica).addPartition(collectionID, tag)
assert.NoError(t, err)
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
partition, err := (*node.replica).getPartitionByTag(collectionID, tag)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, "default")
}
@ -326,8 +321,7 @@ func TestColSegContainer_addPartition(t *testing.T) {
func TestColSegContainer_removePartition(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
@ -372,31 +366,30 @@ func TestColSegContainer_removePartition(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collectionID, tag)
err := (*node.replica).addPartition(collectionID, tag)
assert.NoError(t, err)
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
partition, err := (*node.replica).getPartitionByTag(collectionID, tag)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, partitionTag)
err = (*node.container).removePartition(collectionID, partitionTag)
err = (*node.replica).removePartition(collectionID, partitionTag)
assert.NoError(t, err)
}
}
func TestColSegContainer_getPartitionByTag(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
@ -440,20 +433,20 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collectionID, tag)
err := (*node.replica).addPartition(collectionID, tag)
assert.NoError(t, err)
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
partition, err := (*node.replica).getPartitionByTag(collectionID, tag)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, "default")
assert.NotNil(t, partition)
@ -463,8 +456,7 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
//----------------------------------------------------------------------------------------------------- segment
func TestColSegContainer_addSegment(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
@ -508,24 +500,24 @@ func TestColSegContainer_addSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
}
@ -533,8 +525,7 @@ func TestColSegContainer_addSegment(t *testing.T) {
func TestColSegContainer_removeSegment(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
@ -578,35 +569,34 @@ func TestColSegContainer_removeSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
err = (*node.container).removeSegment(UniqueID(i))
err = (*node.replica).removeSegment(UniqueID(i))
assert.NoError(t, err)
}
}
func TestColSegContainer_getSegmentByID(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
@ -650,24 +640,24 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
}
@ -675,8 +665,7 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
func TestColSegContainer_hasSegment(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
@ -720,29 +709,29 @@ func TestColSegContainer_hasSegment(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
hasSeg := (*node.container).hasSegment(UniqueID(i))
hasSeg := (*node.replica).hasSegment(UniqueID(i))
assert.Equal(t, hasSeg, true)
hasSeg = (*node.container).hasSegment(UniqueID(i + 100))
hasSeg = (*node.replica).hasSegment(UniqueID(i + 100))
assert.Equal(t, hasSeg, false)
}
}

View File

@ -13,8 +13,7 @@ import (
func TestCollection_Partitions(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
@ -57,18 +56,18 @@ func TestCollection_Partitions(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collection.ID(), tag)
err := (*node.replica).addPartition(collection.ID(), tag)
assert.NoError(t, err)
}

View File

@ -4,33 +4,23 @@ import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
type dataSyncService struct {
ctx context.Context
pulsarURL string
fg *flowgraph.TimeTickedFlowGraph
ctx context.Context
fg *flowgraph.TimeTickedFlowGraph
// input streams
dmStream *msgstream.MsgStream
// ddStream *msgstream.MsgStream
// k2sStream *msgstream.MsgStream
node *QueryNode
replica *collectionReplica
}
func newDataSyncService(ctx context.Context, node *QueryNode, pulsarURL string) *dataSyncService {
func newDataSyncService(ctx context.Context, replica *collectionReplica) *dataSyncService {
return &dataSyncService{
ctx: ctx,
pulsarURL: pulsarURL,
fg: nil,
ctx: ctx,
fg: nil,
dmStream: nil,
node: node,
replica: replica,
}
}
@ -41,7 +31,6 @@ func (dsService *dataSyncService) start() {
func (dsService *dataSyncService) close() {
dsService.fg.Close()
(*dsService.dmStream).Close()
}
func (dsService *dataSyncService) initNodes() {
@ -49,10 +38,10 @@ func (dsService *dataSyncService) initNodes() {
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.pulsarURL)
var dmStreamNode Node = newDmInputNode(dsService.ctx)
var filterDmNode Node = newFilteredDmNode()
var insertNode Node = newInsertNode(dsService.node.container)
var serviceTimeNode Node = newServiceTimeNode(dsService.node)
var insertNode Node = newInsertNode(dsService.replica)
var serviceTimeNode Node = newServiceTimeNode(dsService.replica)
dsService.fg.AddNode(&dmStreamNode)
dsService.fg.AddNode(&filterDmNode)
@ -90,21 +79,4 @@ func (dsService *dataSyncService) initNodes() {
if err != nil {
log.Fatal("set edges failed in node:", serviceTimeNode.Name())
}
dsService.setDmStream(&dmStreamNode)
}
func (dsService *dataSyncService) setDmStream(node *Node) {
if (*node).IsInputNode() {
inStream, ok := (*node).(*InputNode)
dsService.dmStream = inStream.InStream()
if !ok {
log.Fatal("Invalid inputNode")
}
} else {
log.Fatal("stream set failed")
}
}
func (dsService *dataSyncService) setDdStream(node *Node) {}
func (dsService *dataSyncService) setK2sStream(node *Node) {}

View File

@ -19,6 +19,7 @@ import (
// NOTE: start pulsar before test
func TestManipulationService_Start(t *testing.T) {
Params.Init()
var ctx context.Context
if closeWithDeadline {
@ -32,7 +33,7 @@ func TestManipulationService_Start(t *testing.T) {
// init query node
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
// init meta
collectionName := "collection0"
@ -76,20 +77,20 @@ func TestManipulationService_Start(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err)
// test data generate
@ -168,7 +169,7 @@ func TestManipulationService_Start(t *testing.T) {
assert.NoError(t, err)
// dataSync
node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
node.dataSyncService = newDataSyncService(node.ctx, node.replica)
go node.dataSyncService.start()
node.Close()

View File

@ -10,7 +10,7 @@ import (
type insertNode struct {
BaseNode
container *container
replica *collectionReplica
}
type InsertData struct {
@ -58,13 +58,13 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
// check if segment exists, if not, create this segment
if !(*iNode.container).hasSegment(task.SegmentID) {
collection, err := (*iNode.container).getCollectionByName(task.CollectionName)
if !(*iNode.replica).hasSegment(task.SegmentID) {
collection, err := (*iNode.replica).getCollectionByName(task.CollectionName)
if err != nil {
log.Println(err)
continue
}
err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID())
err = (*iNode.replica).addSegment(task.SegmentID, task.PartitionTag, collection.ID())
if err != nil {
log.Println(err)
continue
@ -74,7 +74,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
// 2. do preInsert
for segmentID := range insertData.insertRecords {
var targetSegment, err = (*iNode.container).getSegmentByID(segmentID)
var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID)
if err != nil {
log.Println("preInsert failed")
// TODO: add error handling
@ -102,7 +102,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
}
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
var targetSegment, err = (*iNode.container).getSegmentByID(segmentID)
var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID)
if err != nil {
log.Println("cannot find segment:", segmentID)
// TODO: add error handling
@ -125,13 +125,13 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
wg.Done()
}
func newInsertNode(container *container) *insertNode {
func newInsertNode(replica *collectionReplica) *insertNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &insertNode{
BaseNode: baseNode,
container: container,
BaseNode: baseNode,
replica: replica,
}
}

View File

@ -2,22 +2,28 @@ package reader
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
func newDmInputNode(ctx context.Context, pulsarURL string) *flowgraph.InputNode {
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
const (
receiveBufSize = 1024
pulsarBufSize = 1024
)
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
consumeChannels := []string{"insert"}
consumeSubName := "insertSub"
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarCient(pulsarURL)
insertStream.SetPulsarCient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

View File

@ -6,7 +6,7 @@ import (
type serviceTimeNode struct {
BaseNode
node *QueryNode
replica *collectionReplica
}
func (stNode *serviceTimeNode) Name() string {
@ -28,17 +28,17 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
}
// update service time
stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax)
(*stNode.replica).setTSafe(serviceTimeMsg.timeRange.timestampMax)
return nil
}
func newServiceTimeNode(node *QueryNode) *serviceTimeNode {
func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &serviceTimeNode{
BaseNode: baseNode,
node: node,
replica: replica,
}
}

View File

@ -24,12 +24,12 @@ const (
)
type metaService struct {
ctx context.Context
kvBase *kv.EtcdKV
container *container
ctx context.Context
kvBase *kv.EtcdKV
replica *collectionReplica
}
func newMetaService(ctx context.Context, container *container) *metaService {
func newMetaService(ctx context.Context, replica *collectionReplica) *metaService {
ETCDAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
@ -46,9 +46,9 @@ func newMetaService(ctx context.Context, container *container) *metaService {
})
return &metaService{
ctx: ctx,
kvBase: kv.NewEtcdKV(cli, ETCDRootPath),
container: container,
ctx: ctx,
kvBase: kv.NewEtcdKV(cli, ETCDRootPath),
replica: replica,
}
}
@ -164,12 +164,12 @@ func (mService *metaService) processCollectionCreate(id string, value string) {
col := mService.collectionUnmarshal(value)
if col != nil {
err := (*mService.container).addCollection(col, value)
err := (*mService.replica).addCollection(col, value)
if err != nil {
log.Println(err)
}
for _, partitionTag := range col.PartitionTags {
err = (*mService.container).addPartition(col.ID, partitionTag)
err = (*mService.replica).addPartition(col.ID, partitionTag)
if err != nil {
log.Println(err)
}
@ -187,7 +187,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) {
// TODO: what if seg == nil? We need to notify master and return rpc request failed
if seg != nil {
err := (*mService.container).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID)
err := (*mService.replica).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID)
if err != nil {
log.Println(err)
return
@ -216,7 +216,7 @@ func (mService *metaService) processSegmentModify(id string, value string) {
}
if seg != nil {
targetSegment, err := (*mService.container).getSegmentByID(seg.SegmentID)
targetSegment, err := (*mService.replica).getSegmentByID(seg.SegmentID)
if err != nil {
log.Println(err)
return
@ -251,7 +251,7 @@ func (mService *metaService) processSegmentDelete(id string) {
log.Println("Cannot parse segment id:" + id)
}
err = (*mService.container).removeSegment(segmentID)
err = (*mService.replica).removeSegment(segmentID)
if err != nil {
log.Println(err)
return
@ -266,7 +266,7 @@ func (mService *metaService) processCollectionDelete(id string) {
log.Println("Cannot parse collection id:" + id)
}
err = (*mService.container).removeCollection(collectionID)
err = (*mService.replica).removeCollection(collectionID)
if err != nil {
log.Println(err)
return

View File

@ -27,9 +27,8 @@ func TestMetaService_start(t *testing.T) {
}
// init query node
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
(*node.metaService).start()
}
@ -187,9 +186,8 @@ func TestMetaService_processCollectionCreate(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
id := "0"
value := `schema: <
@ -217,10 +215,10 @@ func TestMetaService_processCollectionCreate(t *testing.T) {
node.metaService.processCollectionCreate(id, value)
collectionNum := (*node.container).getCollectionNum()
collectionNum := (*node.replica).getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := (*node.container).getCollectionByName("test")
collection, err := (*node.replica).getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
}
@ -233,9 +231,8 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
@ -278,10 +275,10 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err)
err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err)
err = (*node.container).addPartition(UniqueID(0), "default")
err = (*node.replica).addPartition(UniqueID(0), "default")
assert.NoError(t, err)
id := "0"
@ -293,7 +290,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
(*node.metaService).processSegmentCreate(id, value)
s, err := (*node.container).getSegmentByID(UniqueID(0))
s, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0))
}
@ -306,9 +303,8 @@ func TestMetaService_processCreate(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0"
msg1 := `schema: <
@ -335,10 +331,10 @@ func TestMetaService_processCreate(t *testing.T) {
`
(*node.metaService).processCreate(key1, msg1)
collectionNum := (*node.container).getCollectionNum()
collectionNum := (*node.replica).getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := (*node.container).getCollectionByName("test")
collection, err := (*node.replica).getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
@ -350,7 +346,7 @@ func TestMetaService_processCreate(t *testing.T) {
`
(*node.metaService).processCreate(key2, msg2)
s, err := (*node.container).getSegmentByID(UniqueID(0))
s, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0))
}
@ -363,9 +359,8 @@ func TestMetaService_processSegmentModify(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
@ -408,10 +403,10 @@ func TestMetaService_processSegmentModify(t *testing.T) {
colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err)
err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err)
err = (*node.container).addPartition(UniqueID(0), "default")
err = (*node.replica).addPartition(UniqueID(0), "default")
assert.NoError(t, err)
id := "0"
@ -422,7 +417,7 @@ func TestMetaService_processSegmentModify(t *testing.T) {
`
(*node.metaService).processSegmentCreate(id, value)
s, err := (*node.container).getSegmentByID(UniqueID(0))
s, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0))
@ -434,7 +429,7 @@ func TestMetaService_processSegmentModify(t *testing.T) {
// TODO: modify segment for testing processCollectionModify
(*node.metaService).processSegmentModify(id, newValue)
seg, err := (*node.container).getSegmentByID(UniqueID(0))
seg, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0))
}
@ -447,9 +442,8 @@ func TestMetaService_processCollectionModify(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
id := "0"
value := `schema: <
@ -476,10 +470,10 @@ func TestMetaService_processCollectionModify(t *testing.T) {
`
(*node.metaService).processCollectionCreate(id, value)
collectionNum := (*node.container).getCollectionNum()
collectionNum := (*node.replica).getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := (*node.container).getCollectionByName("test")
collection, err := (*node.replica).getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
@ -508,7 +502,7 @@ func TestMetaService_processCollectionModify(t *testing.T) {
`
(*node.metaService).processCollectionModify(id, newValue)
collection, err = (*node.container).getCollectionByName("test")
collection, err = (*node.replica).getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
}
@ -521,9 +515,8 @@ func TestMetaService_processModify(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0"
msg1 := `schema: <
@ -550,10 +543,10 @@ func TestMetaService_processModify(t *testing.T) {
`
(*node.metaService).processCreate(key1, msg1)
collectionNum := (*node.container).getCollectionNum()
collectionNum := (*node.replica).getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := (*node.container).getCollectionByName("test")
collection, err := (*node.replica).getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
@ -565,7 +558,7 @@ func TestMetaService_processModify(t *testing.T) {
`
(*node.metaService).processCreate(key2, msg2)
s, err := (*node.container).getSegmentByID(UniqueID(0))
s, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, s.segmentID, UniqueID(0))
@ -595,7 +588,7 @@ func TestMetaService_processModify(t *testing.T) {
`
(*node.metaService).processModify(key1, msg3)
collection, err = (*node.container).getCollectionByName("test")
collection, err = (*node.replica).getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
@ -607,7 +600,7 @@ func TestMetaService_processModify(t *testing.T) {
// TODO: modify segment for testing processCollectionModify
(*node.metaService).processModify(key2, msg4)
seg, err := (*node.container).getSegmentByID(UniqueID(0))
seg, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0))
}
@ -620,9 +613,8 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
@ -665,10 +657,10 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
colMetaBlob, err := proto.Marshal(&collectionMeta)
assert.NoError(t, err)
err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
assert.NoError(t, err)
err = (*node.container).addPartition(UniqueID(0), "default")
err = (*node.replica).addPartition(UniqueID(0), "default")
assert.NoError(t, err)
id := "0"
@ -679,12 +671,12 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
`
(*node.metaService).processSegmentCreate(id, value)
seg, err := (*node.container).getSegmentByID(UniqueID(0))
seg, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0))
(*node.metaService).processSegmentDelete("0")
mapSize := (*node.container).getSegmentNum()
mapSize := (*node.replica).getSegmentNum()
assert.Equal(t, mapSize, 0)
}
@ -696,9 +688,8 @@ func TestMetaService_processCollectionDelete(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
id := "0"
value := `schema: <
@ -725,15 +716,15 @@ func TestMetaService_processCollectionDelete(t *testing.T) {
`
(*node.metaService).processCollectionCreate(id, value)
collectionNum := (*node.container).getCollectionNum()
collectionNum := (*node.replica).getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := (*node.container).getCollectionByName("test")
collection, err := (*node.replica).getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
(*node.metaService).processCollectionDelete(id)
collectionNum = (*node.container).getCollectionNum()
collectionNum = (*node.replica).getCollectionNum()
assert.Equal(t, collectionNum, 0)
}
@ -745,9 +736,8 @@ func TestMetaService_processDelete(t *testing.T) {
defer cancel()
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0"
msg1 := `schema: <
@ -774,10 +764,10 @@ func TestMetaService_processDelete(t *testing.T) {
`
(*node.metaService).processCreate(key1, msg1)
collectionNum := (*node.container).getCollectionNum()
collectionNum := (*node.replica).getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := (*node.container).getCollectionByName("test")
collection, err := (*node.replica).getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
@ -789,15 +779,15 @@ func TestMetaService_processDelete(t *testing.T) {
`
(*node.metaService).processCreate(key2, msg2)
seg, err := (*node.container).getSegmentByID(UniqueID(0))
seg, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, seg.segmentID, UniqueID(0))
(*node.metaService).processDelete(key1)
collectionsSize := (*node.container).getCollectionNum()
collectionsSize := (*node.replica).getCollectionNum()
assert.Equal(t, collectionsSize, 0)
mapSize := (*node.container).getSegmentNum()
mapSize := (*node.replica).getSegmentNum()
assert.Equal(t, mapSize, 0)
}
@ -815,9 +805,8 @@ func TestMetaService_processResp(t *testing.T) {
}
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
metaChan := (*node.metaService).kvBase.WatchWithPrefix("")
@ -843,9 +832,8 @@ func TestMetaService_loadCollections(t *testing.T) {
}
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
err2 := (*node.metaService).loadCollections()
assert.Nil(t, err2)
@ -865,9 +853,8 @@ func TestMetaService_loadSegments(t *testing.T) {
}
// init metaService
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node.metaService = newMetaService(ctx, node.container)
node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica)
err2 := (*node.metaService).loadSegments()
assert.Nil(t, err2)

View File

@ -16,6 +16,23 @@ func (p *ParamTable) InitParamTable() {
p.Init()
}
func (p *ParamTable) PulsarAddress() (string, error) {
url, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
return "pulsar://" + url, nil
}
func (p *ParamTable) QueryNodeID() int {
queryNodeID, _ := p.Load("reader.clientid")
id, err := strconv.Atoi(queryNodeID)
if err != nil {
panic(err)
}
return id
}
func (p *ParamTable) TopicStart() int {
topicStart, _ := p.Load("reader.topicstart")
topicStartNum, err := strconv.Atoi(topicStart)

View File

@ -0,0 +1,25 @@
package reader
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable_QueryNodeID(t *testing.T) {
Params.InitParamTable()
id := Params.QueryNodeID()
assert.Equal(t, id, 0)
}
func TestParamTable_TopicStart(t *testing.T) {
Params.InitParamTable()
topicStart := Params.TopicStart()
assert.Equal(t, topicStart, 0)
}
func TestParamTable_TopicEnd(t *testing.T) {
Params.InitParamTable()
topicEnd := Params.TopicEnd()
assert.Equal(t, topicEnd, 128)
}

View File

@ -13,8 +13,7 @@ import (
func TestPartition_Segments(t *testing.T) {
ctx := context.Background()
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
@ -57,17 +56,17 @@ func TestPartition_Segments(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.container).addPartition(collection.ID(), tag)
err := (*node.replica).addPartition(collection.ID(), tag)
assert.NoError(t, err)
}
@ -78,7 +77,7 @@ func TestPartition_Segments(t *testing.T) {
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.container).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID())
err := (*node.replica).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID())
assert.NoError(t, err)
}

View File

@ -14,18 +14,14 @@ import "C"
import (
"context"
"sync"
)
type QueryNode struct {
ctx context.Context
QueryNodeID uint64
pulsarURL string
tSafe tSafe
container *container
replica *collectionReplica
dataSyncService *dataSyncService
metaService *metaService
@ -33,36 +29,21 @@ type QueryNode struct {
statsService *statsService
}
type tSafe interface {
getTSafe() Timestamp
setTSafe(t Timestamp)
}
type serviceTime struct {
tSafeMu sync.Mutex
time Timestamp
}
func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode {
func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
segmentsMap := make(map[int64]*Segment)
collections := make([]*Collection, 0)
var container container = &colSegContainer{
var replica collectionReplica = &collectionReplicaImpl{
collections: collections,
segments: segmentsMap,
}
var tSafe tSafe = &serviceTime{}
return &QueryNode{
ctx: ctx,
QueryNodeID: queryNodeID,
pulsarURL: pulsarURL,
tSafe: tSafe,
container: &container,
replica: &replica,
dataSyncService: nil,
metaService: nil,
@ -72,10 +53,10 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu
}
func (node *QueryNode) Start() {
node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
node.searchService = newSearchService(node.ctx, node, node.pulsarURL)
node.metaService = newMetaService(node.ctx, node.container)
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
node.dataSyncService = newDataSyncService(node.ctx, node.replica)
node.searchService = newSearchService(node.ctx, node.replica)
node.metaService = newMetaService(node.ctx, node.replica)
node.statsService = newStatsService(node.ctx, node.replica)
go node.dataSyncService.start()
// go node.searchService.start()
@ -86,15 +67,3 @@ func (node *QueryNode) Start() {
func (node *QueryNode) Close() {
// TODO: close services
}
func (st *serviceTime) getTSafe() Timestamp {
st.tSafeMu.Lock()
defer st.tSafeMu.Unlock()
return st.time
}
func (st *serviceTime) setTSafe(t Timestamp) {
st.tSafeMu.Lock()
st.time = t
st.tSafeMu.Unlock()
}

View File

@ -23,10 +23,6 @@ func TestQueryNode_start(t *testing.T) {
ctx = context.Background()
}
pulsarAddr, err := Params.PulsarAddress()
if err != nil {
panic(err)
}
node := NewQueryNode(ctx, 0, "pulsar://"+pulsarAddr)
node := NewQueryNode(ctx, 0)
node.Start()
}

View File

@ -8,8 +8,8 @@ func Init() {
Params.Init()
}
func StartQueryNode(ctx context.Context, pulsarURL string) {
node := NewQueryNode(ctx, 0, pulsarURL)
func StartQueryNode(ctx context.Context) {
node := NewQueryNode(ctx, 0)
node.Start()
}

View File

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"log"
"sort"
"github.com/golang/protobuf/proto"
@ -19,7 +20,7 @@ type searchService struct {
ctx context.Context
cancel context.CancelFunc
node *QueryNode
replica *collectionReplica
searchMsgStream *msgstream.MsgStream
searchResultMsgStream *msgstream.MsgStream
}
@ -31,24 +32,29 @@ type SearchResult struct {
ResultDistances []float32
}
func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *searchService {
func newSearchService(ctx context.Context, replica *collectionReplica) *searchService {
const (
//TODO:: read config file
receiveBufSize = 1024
pulsarBufSize = 1024
)
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
consumeChannels := []string{"search"}
consumeSubName := "subSearch"
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream.SetPulsarCient(pulsarURL)
searchStream.SetPulsarCient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var inputStream msgstream.MsgStream = searchStream
producerChannels := []string{"searchResult"}
searchResultStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchResultStream.SetPulsarCient(pulsarURL)
searchResultStream.SetPulsarCient(msgStreamURL)
searchResultStream.CreatePulsarProducers(producerChannels)
var outputStream msgstream.MsgStream = searchResultStream
@ -57,7 +63,7 @@ func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *s
ctx: searchServiceCtx,
cancel: searchServiceCancel,
node: node,
replica: replica,
searchMsgStream: &inputStream,
searchResultMsgStream: &outputStream,
}
@ -120,7 +126,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error {
}
collectionName := query.CollectionName
partitionTags := query.PartitionTags
collection, err := (*ss.node.container).getCollectionByName(collectionName)
collection, err := (*ss.replica).getCollectionByName(collectionName)
if err != nil {
return err
}
@ -150,7 +156,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error {
// 3. Do search in all segments
for _, partitionTag := range partitionTags {
partition, err := (*ss.node.container).getPartitionByTag(collectionID, partitionTag)
partition, err := (*ss.replica).getPartitionByTag(collectionID, partitionTag)
if err != nil {
return err
}

View File

@ -21,12 +21,13 @@ import (
)
func TestSearch_Search(t *testing.T) {
Params.Init()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// init query node
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
// init meta
collectionName := "collection0"
@ -70,20 +71,20 @@ func TestSearch_Search(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err)
// test data generate
@ -162,7 +163,7 @@ func TestSearch_Search(t *testing.T) {
assert.NoError(t, err)
// dataSync
node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
node.dataSyncService = newDataSyncService(node.ctx, node.replica)
go node.dataSyncService.start()
time.Sleep(2 * time.Second)
@ -233,7 +234,7 @@ func TestSearch_Search(t *testing.T) {
err = searchMsgStream.Produce(&msgPackSearch)
assert.NoError(t, err)
node.searchService = newSearchService(node.ctx, node, node.pulsarURL)
node.searchService = newSearchService(node.ctx, node.replica)
go node.searchService.start()
time.Sleep(2 * time.Second)

View File

@ -13,21 +13,17 @@ import (
)
type statsService struct {
ctx context.Context
pulsarURL string
msgStream *msgstream.MsgStream
container *container
ctx context.Context
statsStream *msgstream.MsgStream
replica *collectionReplica
}
func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService {
func newStatsService(ctx context.Context, replica *collectionReplica) *statsService {
return &statsService{
ctx: ctx,
pulsarURL: pulsarURL,
msgStream: nil,
container: container,
ctx: ctx,
statsStream: nil,
replica: replica,
}
}
@ -38,16 +34,20 @@ func (sService *statsService) start() {
)
// start pulsar
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
producerChannels := []string{"statistic"}
statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize)
statsStream.SetPulsarCient(sService.pulsarURL)
statsStream.SetPulsarCient(msgStreamURL)
statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream
sService.msgStream = &statsMsgStream
(*sService.msgStream).Start()
sService.statsStream = &statsMsgStream
(*sService.statsStream).Start()
// start service
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
@ -62,7 +62,7 @@ func (sService *statsService) start() {
}
func (sService *statsService) sendSegmentStatistic() {
statisticData := (*sService.container).getSegmentStatistics()
statisticData := (*sService.replica).getSegmentStatistics()
// fmt.Println("Publish segment statistic")
// fmt.Println(statisticData)
@ -80,7 +80,7 @@ func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSeg
var msgPack = msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
err := (*sService.msgStream).Produce(&msgPack)
err := (*sService.statsStream).Produce(&msgPack)
if err != nil {
log.Println(err)
}

View File

@ -15,6 +15,7 @@ import (
// NOTE: start pulsar before test
func TestStatsService_start(t *testing.T) {
Params.Init()
var ctx context.Context
if closeWithDeadline {
@ -27,8 +28,7 @@ func TestStatsService_start(t *testing.T) {
}
// init query node
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
// init meta
collectionName := "collection0"
@ -72,29 +72,30 @@ func TestStatsService_start(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err)
// start stats service
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
node.statsService = newStatsService(node.ctx, node.replica)
node.statsService.start()
}
// NOTE: start pulsar before test
func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
Params.Init()
var ctx context.Context
if closeWithDeadline {
@ -108,7 +109,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
// init query node
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
node := NewQueryNode(ctx, 0)
// init meta
collectionName := "collection0"
@ -152,20 +153,20 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err)
const receiveBufSize = 1024
@ -178,9 +179,9 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
var statsMsgStream msgstream.MsgStream = statsStream
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
node.statsService.msgStream = &statsMsgStream
(*node.statsService.msgStream).Start()
node.statsService = newStatsService(node.ctx, node.replica)
node.statsService.statsStream = &statsMsgStream
(*node.statsService.statsStream).Start()
// send stats
node.statsService.sendSegmentStatistic()

View File

@ -2,6 +2,7 @@ package flowgraph
import (
"context"
"log"
"sync"
"github.com/zilliztech/milvus-distributed/internal/errors"
@ -68,6 +69,15 @@ func (fg *TimeTickedFlowGraph) Start() {
func (fg *TimeTickedFlowGraph) Close() {
for _, v := range fg.nodeCtx {
// close message stream
if (*v.node).IsInputNode() {
inStream, ok := (*v.node).(*InputNode)
if !ok {
log.Fatal("Invalid inputNode")
}
(*inStream.inStream).Close()
}
// close input channels
v.Close()
}
}