From 7554246ace5bbfa34b6167d62ded612ad3d3aaa8 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 25 Jan 2021 18:33:10 +0800 Subject: [PATCH] Add segment seeking and use real client Signed-off-by: bigsheeper --- cmd/masterservice/main.go | 93 ++++++ configs/advanced/channel.yaml | 5 - configs/advanced/data_service.yaml | 13 - configs/advanced/master.yaml | 3 +- internal/datanode/allocator.go | 2 +- internal/datanode/collection.go | 15 +- internal/datanode/collection_replica.go | 7 +- internal/datanode/collection_replica_test.go | 6 +- internal/datanode/collection_test.go | 11 +- internal/datanode/data_node.go | 7 +- internal/datanode/data_sync_service_test.go | 3 +- internal/datanode/factory.go | 47 +++ internal/datanode/flow_graph_dd_node.go | 3 +- .../flow_graph_insert_buffer_node_test.go | 5 +- internal/datanode/meta_service.go | 150 ++++----- internal/datanode/meta_service_test.go | 120 +++---- internal/dataservice/param.go | 81 ++--- internal/dataservice/server.go | 213 +++++-------- internal/dataservice/stats_handler.go | 23 +- internal/distributed/masterservice/client.go | 77 +++-- .../masterservice/masterservice_test.go | 3 +- internal/distributed/masterservice/server.go | 29 +- internal/masterservice/master_service.go | 118 +++++++ internal/masterservice/param_table.go | 8 + internal/masterservice/param_table_test.go | 3 + internal/querynode/collection_replica.go | 40 +++ internal/querynode/data_sync_service.go | 9 +- .../querynode/flow_graph_filter_dm_node.go | 12 +- internal/querynode/param_table.go | 26 +- internal/querynode/partition.go | 5 +- internal/querynode/query_node.go | 34 +- internal/querynode/segment_manager.go | 119 ++++--- internal/querynode/segment_manager_test.go | 300 +++++++++++++++++- internal/timesync/timetick_watcher.go | 8 - 34 files changed, 1056 insertions(+), 542 deletions(-) create mode 100644 cmd/masterservice/main.go delete mode 100644 configs/advanced/data_service.yaml diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go new file mode 100644 index 0000000000..9b050f9560 --- /dev/null +++ b/cmd/masterservice/main.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + ds "github.com/zilliztech/milvus-distributed/internal/dataservice" + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" + is "github.com/zilliztech/milvus-distributed/internal/indexservice" + ms "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +) + +const reTryCnt = 3 + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) + + svr, err := msc.NewGrpcServer(ctx) + if err != nil { + panic(err) + } + + log.Printf("proxy service address : %s", psc.Params.NetworkAddress()) + //proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress()) + + //TODO, test proxy service GetComponentStates, before set + + //if err = svr.SetProxyService(proxyService); err != nil { + // panic(err) + //} + + log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) + dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } + cnt := 0 + for cnt = 0; cnt < reTryCnt; cnt++ { + dsStates, err := dataService.GetComponentStates() + if err != nil { + continue + } + if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } + if cnt >= reTryCnt { + panic("connect to data service failed") + } + + //if err = svr.SetDataService(dataService); err != nil { + // panic(err) + //} + + log.Printf("index service address : %s", is.Params.Address) + indexService := isc.NewClient(is.Params.Address) + + if err = svr.SetIndexService(indexService); err != nil { + panic(err) + } + + if err = svr.Start(); err != nil { + panic(err) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + sig := <-sc + log.Printf("Got %s signal to exit", sig.String()) + _ = svr.Stop() +} diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index bbfe734441..a11c7fccb0 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -29,10 +29,6 @@ msgChannel: queryNodeStats: "query-node-stats" # cmd for loadIndex, flush, etc... cmd: "cmd" - dataServiceInsertChannel: "insert-channel-" - dataServiceStatistic: "dataservice-statistics-channel" - dataServiceTimeTick: "dataservice-timetick-channel" - dataServiceSegmentInfo: "segment-info-channel" # sub name generation rule: ${subNamePrefix}-${NodeID} subNamePrefix: @@ -41,7 +37,6 @@ msgChannel: queryNodeSubNamePrefix: "queryNode" writeNodeSubNamePrefix: "writeNode" # GOOSE TODO: remove this dataNodeSubNamePrefix: "dataNode" - dataServiceSubNamePrefix: "dataService" # default channel range [0, 1) channelRange: diff --git a/configs/advanced/data_service.yaml b/configs/advanced/data_service.yaml deleted file mode 100644 index 49c405e793..0000000000 --- a/configs/advanced/data_service.yaml +++ /dev/null @@ -1,13 +0,0 @@ -dataservice: - nodeID: 14040 - address: "127.0.0.1" - port: 13333 - segment: - # old name: segmentThreshold: 536870912 - size: 512 # MB - sizeFactor: 0.75 - defaultSizePerRecord: 1024 - # old name: segmentExpireDuration: 2000 - IDAssignExpiration: 2000 # ms - insertChannelNumPerCollection: 4 - dataNodeNum: 2 \ No newline at end of file diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index 69474322ae..3e1c4e7816 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -23,4 +23,5 @@ master: IDAssignExpiration: 2000 # ms maxPartitionNum: 4096 - nodeID: 100 \ No newline at end of file + nodeID: 100 + timeout: 5 # time out, 5 seconds \ No newline at end of file diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go index 0cad7a46c3..83400a8ea2 100644 --- a/internal/datanode/allocator.go +++ b/internal/datanode/allocator.go @@ -25,7 +25,7 @@ func (alloc *allocatorImpl) allocID() (UniqueID, error) { resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowCollections, - MsgID: 1, // GOOSE TODO add msg id + MsgID: 1, // GOOSE TODO Timestamp: 0, // GOOSE TODO SourceID: Params.NodeID, }, diff --git a/internal/datanode/collection.go b/internal/datanode/collection.go index 3610dc0b81..d489eef705 100644 --- a/internal/datanode/collection.go +++ b/internal/datanode/collection.go @@ -1,9 +1,6 @@ package datanode import ( - "log" - - "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -24,17 +21,9 @@ func (c *Collection) Schema() *schemapb.CollectionSchema { return c.schema } -func newCollection(collectionID UniqueID, schemaStr string) *Collection { - - var schema schemapb.CollectionSchema - err := proto.UnmarshalText(schemaStr, &schema) - if err != nil { - log.Println(err) - return nil - } - +func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection { var newCollection = &Collection{ - schema: &schema, + schema: schema, id: collectionID, } return newCollection diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 5412a2ebd4..0be37979a4 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -6,13 +6,14 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) type collectionReplica interface { // collection getCollectionNum() int - addCollection(collectionID UniqueID, schemaBlob string) error + addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error) @@ -162,11 +163,11 @@ func (colReplica *collectionReplicaImpl) getCollectionNum() int { return len(colReplica.collections) } -func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schemaBlob string) error { +func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - var newCollection = newCollection(collectionID, schemaBlob) + var newCollection = newCollection(collectionID, schema) colReplica.collections = append(colReplica.collections, newCollection) log.Println("Create collection: ", newCollection.Name()) diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go index 25869712d7..c97d38dbdd 100644 --- a/internal/datanode/collection_replica_test.go +++ b/internal/datanode/collection_replica_test.go @@ -3,7 +3,6 @@ package datanode import ( "testing" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -23,10 +22,7 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - schemaBlob := proto.MarshalTextString(collectionMeta.Schema) - require.NotEqual(t, "", schemaBlob) - - var err = replica.addCollection(collectionMeta.ID, schemaBlob) + var err = replica.addCollection(collectionMeta.ID, collectionMeta.Schema) require.NoError(t, err) collection, err := replica.getCollectionByName(collectionName) diff --git a/internal/datanode/collection_test.go b/internal/datanode/collection_test.go index cce9bc1dcd..6f12bfd0bb 100644 --- a/internal/datanode/collection_test.go +++ b/internal/datanode/collection_test.go @@ -3,7 +3,6 @@ package datanode import ( "testing" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" ) @@ -13,10 +12,7 @@ func TestCollection_newCollection(t *testing.T) { Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - schemaBlob := proto.MarshalTextString(collectionMeta.Schema) - assert.NotEqual(t, "", schemaBlob) - - collection := newCollection(collectionMeta.ID, schemaBlob) + collection := newCollection(collectionMeta.ID, collectionMeta.Schema) assert.Equal(t, collection.Name(), collectionName) assert.Equal(t, collection.ID(), collectionID) } @@ -27,10 +23,7 @@ func TestCollection_deleteCollection(t *testing.T) { Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - schemaBlob := proto.MarshalTextString(collectionMeta.Schema) - assert.NotEqual(t, "", schemaBlob) - - collection := newCollection(collectionMeta.ID, schemaBlob) + collection := newCollection(collectionMeta.ID, collectionMeta.Schema) assert.Equal(t, collection.Name(), collectionName) assert.Equal(t, collection.ID(), collectionID) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 4be2c78d99..ced35ae9dc 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -26,8 +26,8 @@ const ( type ( Inteface interface { typeutil.Service + typeutil.Component - GetComponentStates() (*internalpb2.ComponentStates, error) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) } @@ -43,6 +43,7 @@ type ( } DataNode struct { + // GOOSE TODO: complete interface with component ctx context.Context NodeID UniqueID Role string @@ -124,7 +125,7 @@ func (node *DataNode) Init() error { chanSize := 100 flushChan := make(chan *flushMsg, chanSize) node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc) - node.metaService = newMetaService(node.ctx, replica) + node.metaService = newMetaService(node.ctx, replica, node.masterService) node.replica = replica // Opentracing @@ -154,7 +155,7 @@ func (node *DataNode) Init() error { func (node *DataNode) Start() error { go node.dataSyncService.start() - node.metaService.start() + node.metaService.init() return nil } diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 0813dde84b..3b40ea1a20 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -42,7 +41,7 @@ func TestDataSyncService_Start(t *testing.T) { replica := newReplica() allocFactory := AllocatorFactory{} sync := newDataSyncService(ctx, flushChan, replica, allocFactory) - sync.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema)) + sync.replica.addCollection(collMeta.ID, collMeta.Schema) go sync.start() // test data generate diff --git a/internal/datanode/factory.go b/internal/datanode/factory.go index d8a462a9e8..bb87ca008b 100644 --- a/internal/datanode/factory.go +++ b/internal/datanode/factory.go @@ -3,6 +3,8 @@ package datanode import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -15,6 +17,12 @@ type ( AllocatorFactory struct { } + + MasterServiceFactory struct { + ID UniqueID + collectionName string + collectionID UniqueID + } ) func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { @@ -156,3 +164,42 @@ func (alloc AllocatorFactory) allocID() (UniqueID, error) { // GOOSE TODO: random ID generate return UniqueID(0), nil } + +func (m *MasterServiceFactory) setID(id UniqueID) { + m.ID = id // GOOSE TODO: random ID generator +} + +func (m *MasterServiceFactory) setCollectionID(id UniqueID) { + m.collectionID = id +} + +func (m *MasterServiceFactory) setCollectionName(name string) { + m.collectionName = name +} + +func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { + resp := &masterpb.IDResponse{ + Status: &commonpb.Status{}, + ID: m.ID, + } + return resp, nil +} + +func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { + resp := &milvuspb.ShowCollectionResponse{ + Status: &commonpb.Status{}, + CollectionNames: []string{m.collectionName}, + } + return resp, nil + +} +func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + f := MetaFactory{} + meta := f.CollectionMetaFactory(m.collectionID, m.collectionName) + resp := &milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{}, + CollectionID: m.collectionID, + Schema: meta.Schema, + } + return resp, nil +} diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index d7432ae161..45676f19b6 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -224,9 +224,8 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { return } - schemaStr := proto.MarshalTextString(&schema) // add collection - err = ddNode.replica.addCollection(collectionID, schemaStr) + err = ddNode.replica.addCollection(collectionID, &schema) if err != nil { log.Println(err) return diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 78b08901e2..82f0ef69e9 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -39,11 +38,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { Factory := &MetaFactory{} collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") - schemaBlob := proto.MarshalTextString(collMeta.Schema) - require.NotEqual(t, "", schemaBlob) replica := newReplica() - err = replica.addCollection(collMeta.ID, schemaBlob) + err = replica.addCollection(collMeta.ID, collMeta.Schema) require.NoError(t, err) // Params.FlushInsertBufSize = 2 diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index fa92d59fbc..c259af622f 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -4,73 +4,91 @@ import ( "context" "fmt" "log" - "path" "reflect" - "strings" - "time" - "github.com/golang/protobuf/proto" - "go.etcd.io/etcd/clientv3" - - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" ) type metaService struct { - ctx context.Context - kvBase *etcdkv.EtcdKV - replica collectionReplica + ctx context.Context + replica collectionReplica + masterClient MasterServiceInterface } -func newMetaService(ctx context.Context, replica collectionReplica) *metaService { - ETCDAddr := Params.EtcdAddress - MetaRootPath := Params.MetaRootPath - - cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{ETCDAddr}, - DialTimeout: 5 * time.Second, - }) - +func newMetaService(ctx context.Context, replica collectionReplica, m MasterServiceInterface) *metaService { return &metaService{ - ctx: ctx, - kvBase: etcdkv.NewEtcdKV(cli, MetaRootPath), - replica: replica, + ctx: ctx, + replica: replica, + masterClient: m, } } -func (mService *metaService) start() { - // init from meta +func (mService *metaService) init() { err := mService.loadCollections() if err != nil { - log.Fatal("metaService loadCollections failed") + log.Fatal("metaService init failed:", err) } } -func GetCollectionObjID(key string) string { - ETCDRootPath := Params.MetaRootPath +func (mService *metaService) loadCollections() error { + names, err := mService.getCollectionNames() + if err != nil { + return err + } - prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" - return strings.TrimPrefix(key, prefix) + for _, name := range names { + err := mService.createCollection(name) + if err != nil { + return err + } + } + return nil } -func isCollectionObj(key string) bool { - ETCDRootPath := Params.MetaRootPath +func (mService *metaService) getCollectionNames() ([]string, error) { + req := &milvuspb.ShowCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kShowCollections, + MsgID: 0, //GOOSE TODO + Timestamp: 0, // GOOSE TODO + SourceID: Params.NodeID, + }, + DbName: "default", // GOOSE TODO + } - prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" - prefix = strings.TrimSpace(prefix) - index := strings.Index(key, prefix) - - return index == 0 + response, err := mService.masterClient.ShowCollections(req) + if err != nil { + return nil, errors.Errorf("Get collection names from master service wrong: %v", err) + } + return response.GetCollectionNames(), nil } -func isSegmentObj(key string) bool { - ETCDRootPath := Params.MetaRootPath +func (mService *metaService) createCollection(name string) error { + req := &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kDescribeCollection, + MsgID: 0, //GOOSE TODO + Timestamp: 0, // GOOSE TODO + SourceID: Params.NodeID, + }, + DbName: "default", // GOOSE TODO + CollectionName: name, + } - prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" - prefix = strings.TrimSpace(prefix) - index := strings.Index(key, prefix) + response, err := mService.masterClient.DescribeCollection(req) + if err != nil { + return errors.Errorf("Describe collection %v from master service wrong: %v", name, err) + } - return index == 0 + err = mService.replica.addCollection(response.GetCollectionID(), response.GetSchema()) + if err != nil { + return errors.Errorf("Add collection %v into collReplica wrong: %v", name, err) + } + + return nil } func printCollectionStruct(obj *etcdpb.CollectionMeta) { @@ -85,51 +103,3 @@ func printCollectionStruct(obj *etcdpb.CollectionMeta) { fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface()) } } - -func (mService *metaService) processCollectionCreate(id string, value string) { - //println(fmt.Sprintf("Create Collection:$%s$", id)) - - col := mService.collectionUnmarshal(value) - if col != nil { - schema := col.Schema - schemaBlob := proto.MarshalTextString(schema) - err := mService.replica.addCollection(col.ID, schemaBlob) - if err != nil { - log.Println(err) - } - } -} - -func (mService *metaService) loadCollections() error { - keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix) - if err != nil { - return err - } - - for i := range keys { - objID := GetCollectionObjID(keys[i]) - mService.processCollectionCreate(objID, values[i]) - } - - return nil -} - -//----------------------------------------------------------------------- Unmarshal and Marshal -func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta { - col := etcdpb.CollectionMeta{} - err := proto.UnmarshalText(value, &col) - if err != nil { - log.Println(err) - return nil - } - return &col -} - -func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string { - value := proto.MarshalTextString(col) - if value == "" { - log.Println("marshal collection failed") - return "" - } - return value -} diff --git a/internal/datanode/meta_service_test.go b/internal/datanode/meta_service_test.go index 3154ecebd0..23d97085b5 100644 --- a/internal/datanode/meta_service_test.go +++ b/internal/datanode/meta_service_test.go @@ -7,94 +7,46 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMetaService_start(t *testing.T) { +func TestMetaService_All(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() replica := newReplica() + mFactory := &MasterServiceFactory{} + mFactory.setCollectionID(0) + mFactory.setCollectionName("a-collection") + metaService := newMetaService(ctx, replica, mFactory) - metaService := newMetaService(ctx, replica) + t.Run("Test getCollectionNames", func(t *testing.T) { + names, err := metaService.getCollectionNames() + assert.NoError(t, err) + assert.Equal(t, 1, len(names)) + assert.Equal(t, "a-collection", names[0]) + }) + + t.Run("Test createCollection", func(t *testing.T) { + hasColletion := metaService.replica.hasCollection(0) + assert.False(t, hasColletion) + + err := metaService.createCollection("a-collection") + assert.NoError(t, err) + hasColletion = metaService.replica.hasCollection(0) + assert.True(t, hasColletion) + }) + + t.Run("Test loadCollections", func(t *testing.T) { + hasColletion := metaService.replica.hasCollection(1) + assert.False(t, hasColletion) + + mFactory.setCollectionID(1) + mFactory.setCollectionName("a-collection-1") + err := metaService.loadCollections() + assert.NoError(t, err) + + hasColletion = metaService.replica.hasCollection(1) + assert.True(t, hasColletion) + hasColletion = metaService.replica.hasCollection(0) + assert.True(t, hasColletion) + }) - metaService.start() -} - -func TestMetaService_getCollectionObjId(t *testing.T) { - var key = "/collection/collection0" - var collectionObjID1 = GetCollectionObjID(key) - - assert.Equal(t, collectionObjID1, "/collection/collection0") - - key = "fakeKey" - var collectionObjID2 = GetCollectionObjID(key) - - assert.Equal(t, collectionObjID2, "fakeKey") -} - -func TestMetaService_isCollectionObj(t *testing.T) { - var key = Params.MetaRootPath + "/collection/collection0" - var b1 = isCollectionObj(key) - - assert.Equal(t, b1, true) - - key = Params.MetaRootPath + "/segment/segment0" - var b2 = isCollectionObj(key) - - assert.Equal(t, b2, false) -} - -func TestMetaService_processCollectionCreate(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - replica := newReplica() - metaService := newMetaService(ctx, replica) - defer cancel() - id := "0" - value := `schema: < - name: "test" - fields: < - fieldID:100 - name: "vec" - data_type: VECTOR_FLOAT - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: INT32 - type_params: < - key: "dim" - value: "1" - > - > - > - segmentIDs: 0 - partition_tags: "default" - ` - - metaService.processCollectionCreate(id, value) - - collectionNum := replica.getCollectionNum() - assert.Equal(t, collectionNum, 1) - - collection, err := replica.getCollectionByName("test") - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) -} - -func TestMetaService_loadCollections(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - replica := newReplica() - - metaService := newMetaService(ctx, replica) - - err2 := (*metaService).loadCollections() - assert.Nil(t, err2) } diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 0e381ed15f..2e3f00e376 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -1,8 +1,6 @@ package dataservice import ( - "strconv" - "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -13,6 +11,8 @@ type ParamTable struct { Port int NodeID int64 + MasterAddress string + EtcdAddress string MetaRootPath string KvRootPath string @@ -31,7 +31,6 @@ type ParamTable struct { DataNodeNum int SegmentInfoChannelName string DataServiceSubscriptionName string - K2SChannelNames []string } var Params ParamTable @@ -40,14 +39,15 @@ func (p *ParamTable) Init() { // load yaml p.BaseTable.Init() - if err := p.LoadYaml("advanced/data_service.yaml"); err != nil { + err := p.LoadYaml("advanced/master.yaml") + if err != nil { panic(err) } // set members p.initAddress() p.initPort() - p.initNodeID() + p.NodeID = 1 // todo p.initEtcdAddress() p.initMetaRootPath() @@ -68,19 +68,15 @@ func (p *ParamTable) Init() { } func (p *ParamTable) initAddress() { - dataserviceAddress, err := p.Load("dataservice.address") + masterAddress, err := p.Load("master.address") if err != nil { panic(err) } - p.Address = dataserviceAddress + p.Address = masterAddress } func (p *ParamTable) initPort() { - p.Port = p.ParseInt("dataservice.port") -} - -func (p *ParamTable) initNodeID() { - p.NodeID = p.ParseInt64("dataservice.nodeID") + p.Port = p.ParseInt("master.port") } func (p *ParamTable) initEtcdAddress() { @@ -123,83 +119,46 @@ func (p *ParamTable) initKvRootPath() { p.KvRootPath = rootPath + "/" + subPath } func (p *ParamTable) initSegmentSize() { - p.SegmentSize = p.ParseFloat("dataservice.segment.size") + p.SegmentSize = p.ParseFloat("master.segment.size") } func (p *ParamTable) initSegmentSizeFactor() { - p.SegmentSizeFactor = p.ParseFloat("dataservice.segment.sizeFactor") + p.SegmentSizeFactor = p.ParseFloat("master.segment.sizeFactor") } func (p *ParamTable) initDefaultRecordSize() { - p.DefaultRecordSize = p.ParseInt64("dataservice.segment.defaultSizePerRecord") + p.DefaultRecordSize = p.ParseInt64("master.segment.defaultSizePerRecord") } +// TODO read from config/env func (p *ParamTable) initSegIDAssignExpiration() { - p.SegIDAssignExpiration = p.ParseInt64("dataservice.segment.IDAssignExpiration") //ms + p.SegIDAssignExpiration = 3000 //ms } func (p *ParamTable) initInsertChannelPrefixName() { - var err error - p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel") - if err != nil { - panic(err) - } + p.InsertChannelPrefixName = "insert-channel-" } func (p *ParamTable) initInsertChannelNumPerCollection() { - p.InsertChannelNumPerCollection = p.ParseInt64("dataservice.insertChannelNumPerCollection") + p.InsertChannelNumPerCollection = 4 } func (p *ParamTable) initStatisticsChannelName() { - var err error - p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceStatistic") - if err != nil { - panic(err) - } + p.StatisticsChannelName = "dataservice-statistics-channel" } func (p *ParamTable) initTimeTickChannelName() { - var err error - p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceTimeTick") - if err != nil { - panic(err) - } + p.TimeTickChannelName = "dataservice-timetick-channel" } func (p *ParamTable) initDataNodeNum() { - p.DataNodeNum = p.ParseInt("dataservice.dataNodeNum") + p.DataNodeNum = 2 } func (p *ParamTable) initSegmentInfoChannelName() { - var err error - p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo") - if err != nil { - panic(err) - } + p.SegmentInfoChannelName = "segment-info-channel" } func (p *ParamTable) initDataServiceSubscriptionName() { - var err error - p.DataServiceSubscriptionName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSubNamePrefix") - if err != nil { - panic(err) - } -} - -func (p *ParamTable) initK2SChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.k2s") - if err != nil { - panic(err) - } - prefix += "-" - iRangeStr, err := p.Load("msgChannel.channelRange.k2s") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - p.K2SChannelNames = ret + p.DataServiceSubscriptionName = "dataserive-sub" } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index f3c07d99f7..e3782b22ae 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -49,31 +49,28 @@ type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp Server struct { - ctx context.Context - serverLoopCtx context.Context - serverLoopCancel context.CancelFunc - serverLoopWg sync.WaitGroup - state internalpb2.StateCode - client *etcdkv.EtcdKV - meta *meta - segAllocator segmentAllocator - statsHandler *statsHandler - insertChannelMgr *insertChannelManager - allocator allocator - cluster *dataNodeCluster - msgProducer *timesync.MsgProducer - registerFinishCh chan struct{} - masterClient *masterservice.GrpcClient - ttMsgStream msgstream.MsgStream - k2sMsgStream msgstream.MsgStream - ddChannelName string - segmentInfoStream msgstream.MsgStream - segmentFlushStream msgstream.MsgStream + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel context.CancelFunc + serverLoopWg sync.WaitGroup + state internalpb2.StateCode + client *etcdkv.EtcdKV + meta *meta + segAllocator segmentAllocator + statsHandler *statsHandler + insertChannelMgr *insertChannelManager + allocator allocator + cluster *dataNodeCluster + msgProducer *timesync.MsgProducer + registerFinishCh chan struct{} + masterClient *masterservice.GrpcClient + ttMsgStream msgstream.MsgStream + ddChannelName string + segmentInfoStream msgstream.MsgStream } ) func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Server, error) { - Params.Init() ch := make(chan struct{}) return &Server{ ctx: ctx, @@ -86,29 +83,32 @@ func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Serve } func (s *Server) Init() error { + Params.Init() return nil } func (s *Server) Start() error { - var err error s.allocator = newAllocatorImpl(s.masterClient) - if err = s.initMeta(); err != nil { + if err := s.initMeta(); err != nil { return err } s.statsHandler = newStatsHandler(s.meta) - s.segAllocator, err = newSegmentAllocator(s.meta, s.allocator) + segAllocator, err := newSegmentAllocator(s.meta, s.allocator) if err != nil { return err } - s.initSegmentInfoChannel() - if err = s.initMsgProducer(); err != nil { - return err - } + s.segAllocator = segAllocator + s.waitDataNodeRegister() + if err = s.loadMetaFromMaster(); err != nil { return err } + if err = s.initMsgProducer(); err != nil { + return err + } + + s.initSegmentInfoChannel() s.startServerLoop() - s.waitDataNodeRegister() s.state = internalpb2.StateCode_HEALTHY log.Println("start success") return nil @@ -128,28 +128,21 @@ func (s *Server) initMeta() error { return nil } -func (s *Server) initSegmentInfoChannel() { - segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) - segmentInfoStream.SetPulsarClient(Params.PulsarAddress) - segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName}) - s.segmentInfoStream = segmentInfoStream - s.segmentInfoStream.Start() +func (s *Server) waitDataNodeRegister() { + log.Println("waiting data node to register") + <-s.registerFinishCh + log.Println("all data nodes register") } + func (s *Server) initMsgProducer() error { - ttMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) + ttMsgStream := pulsarms.NewPulsarTtMsgStream(s.ctx, 1024) ttMsgStream.SetPulsarClient(Params.PulsarAddress) ttMsgStream.CreatePulsarConsumers([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) s.ttMsgStream = ttMsgStream s.ttMsgStream.Start() timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs()) dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster) - k2sStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) - k2sStream.SetPulsarClient(Params.PulsarAddress) - k2sStream.CreatePulsarProducers(Params.K2SChannelNames) - s.k2sMsgStream = k2sStream - s.k2sMsgStream.Start() - k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream) - producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher) + producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher) if err != nil { return err } @@ -158,6 +151,46 @@ func (s *Server) initMsgProducer() error { return nil } +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(1) + go s.startStatsChannel(s.serverLoopCtx) +} + +func (s *Server) startStatsChannel(ctx context.Context) { + defer s.serverLoopWg.Done() + statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024) + statsStream.SetPulsarClient(Params.PulsarAddress) + statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) + statsStream.Start() + defer statsStream.Close() + for { + select { + case <-ctx.Done(): + return + default: + } + msgPack := statsStream.Consume() + for _, msg := range msgPack.Msgs { + statistics := msg.(*msgstream.SegmentStatisticsMsg) + for _, stat := range statistics.SegStats { + if err := s.statsHandler.HandleSegmentStat(stat); err != nil { + log.Println(err.Error()) + continue + } + } + } + } +} + +func (s *Server) initSegmentInfoChannel() { + segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) + segmentInfoStream.SetPulsarClient(Params.PulsarAddress) + segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName}) + s.segmentInfoStream = segmentInfoStream + s.segmentInfoStream.Start() +} + func (s *Server) loadMetaFromMaster() error { log.Println("loading collection meta from master") collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{ @@ -215,83 +248,9 @@ func (s *Server) loadMetaFromMaster() error { log.Println("load collection meta from master complete") return nil } -func (s *Server) startServerLoop() { - s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(2) - go s.startStatsChannel(s.serverLoopCtx) - go s.startSegmentFlushChannel(s.serverLoopCtx) -} - -func (s *Server) startStatsChannel(ctx context.Context) { - defer s.serverLoopWg.Done() - statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024) - statsStream.SetPulsarClient(Params.PulsarAddress) - statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) - statsStream.Start() - defer statsStream.Close() - for { - select { - case <-ctx.Done(): - return - default: - } - msgPack := statsStream.Consume() - for _, msg := range msgPack.Msgs { - statistics := msg.(*msgstream.SegmentStatisticsMsg) - for _, stat := range statistics.SegStats { - if err := s.statsHandler.HandleSegmentStat(stat); err != nil { - log.Println(err.Error()) - continue - } - } - } - } -} - -func (s *Server) startSegmentFlushChannel(ctx context.Context) { - defer s.serverLoopWg.Done() - flushStream := pulsarms.NewPulsarMsgStream(ctx, 1024) - flushStream.SetPulsarClient(Params.PulsarAddress) - flushStream.CreatePulsarConsumers([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) - flushStream.Start() - defer flushStream.Close() - for { - select { - case <-ctx.Done(): - log.Println("segment flush channel shut down") - return - default: - } - msgPack := flushStream.Consume() - for _, msg := range msgPack.Msgs { - if msg.Type() != commonpb.MsgType_kSegmentFlushDone { - continue - } - realMsg := msg.(*msgstream.FlushCompletedMsg) - - segmentInfo, err := s.meta.GetSegment(realMsg.SegmentID) - if err != nil { - log.Println(err.Error()) - continue - } - segmentInfo.FlushedTime = realMsg.BeginTimestamp - if err = s.meta.UpdateSegment(segmentInfo); err != nil { - log.Println(err.Error()) - continue - } - } - } -} - -func (s *Server) waitDataNodeRegister() { - log.Println("waiting data node to register") - <-s.registerFinishCh - log.Println("all data nodes register") -} func (s *Server) Stop() error { s.ttMsgStream.Close() - s.k2sMsgStream.Close() s.msgProducer.Close() s.segmentInfoStream.Close() s.stopServerLoop() @@ -439,23 +398,6 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha if err = s.segAllocator.OpenSegment(segmentInfo); err != nil { return err } - infoMsg := &msgstream.SegmentInfoMsg{ - SegmentMsg: datapb.SegmentMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kSegmentInfo, - MsgID: 0, - Timestamp: 0, // todo - SourceID: 0, - }, - Segment: segmentInfo, - }, - } - msgPack := &pulsarms.MsgPack{ - Msgs: []msgstream.TsMsg{infoMsg}, - } - if err = s.segmentInfoStream.Produce(msgPack); err != nil { - return err - } return nil } @@ -480,8 +422,7 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg resp.CreateTime = segmentInfo.OpenTime resp.SealedTime = segmentInfo.SealedTime resp.FlushedTime = segmentInfo.FlushedTime - resp.StartPositions = segmentInfo.StartPosition - resp.EndPositions = segmentInfo.EndPosition + // TODO start/end positions return resp, nil } diff --git a/internal/dataservice/stats_handler.go b/internal/dataservice/stats_handler.go index 68c113b2c1..2c1edc95f0 100644 --- a/internal/dataservice/stats_handler.go +++ b/internal/dataservice/stats_handler.go @@ -20,25 +20,10 @@ func (handler *statsHandler) HandleSegmentStat(segStats *internalpb2.SegmentStat return err } - if segStats.IsNewSegment { - segMeta.OpenTime = segStats.CreateTime - segMeta.StartPosition = append(segMeta.StartPosition, segStats.StartPositions...) - } - segMeta.SealedTime = segStats.EndTime - for _, pos := range segStats.EndPositions { - isNew := true - for _, epos := range segMeta.EndPosition { - if epos.ChannelName == pos.ChannelName { - epos.Timestamp = pos.Timestamp - epos.MsgID = pos.MsgID - isNew = false - break - } - } - if isNew { - segMeta.EndPosition = append(segMeta.EndPosition, pos) - } - } + //if segStats.IsNewSegment { + // segMeta.OpenTime = segStats.CreateTime + // segMeta.segStats.StartPositions + //} segMeta.NumRows = segStats.NumRows segMeta.MemSize = segStats.MemorySize diff --git a/internal/distributed/masterservice/client.go b/internal/distributed/masterservice/client.go index ff81bee94f..18af4df96b 100644 --- a/internal/distributed/masterservice/client.go +++ b/internal/distributed/masterservice/client.go @@ -5,6 +5,7 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/errors" + cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -56,63 +57,93 @@ func (c *GrpcClient) Stop() error { } func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) { - return c.grpcClient.GetComponentStatesRPC(context.Background(), &commonpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{}) } //DDL request func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - return c.grpcClient.CreateCollection(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.CreateCollection(ctx, in) } func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - return c.grpcClient.DropCollection(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DropCollection(ctx, in) } func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - return c.grpcClient.HasCollection(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.HasCollection(ctx, in) } func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - return c.grpcClient.DescribeCollection(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DescribeCollection(ctx, in) } func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - return c.grpcClient.ShowCollections(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.ShowCollections(ctx, in) } func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - return c.grpcClient.CreatePartition(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.CreatePartition(ctx, in) } func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - return c.grpcClient.DropPartition(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DropPartition(ctx, in) } func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - return c.grpcClient.HasPartition(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.HasPartition(ctx, in) } func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - return c.grpcClient.ShowPartitions(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.ShowPartitions(ctx, in) } //index builder service func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - return c.grpcClient.CreateIndex(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.CreateIndex(ctx, in) } func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - return c.grpcClient.DescribeIndex(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DescribeIndex(ctx, in) } //global timestamp allocator func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) { - return c.grpcClient.AllocTimestamp(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.AllocTimestamp(ctx, in) } func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { - return c.grpcClient.AllocID(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.AllocID(ctx, in) } //receiver time tick from proxy service, and put it into this channel func (c *GrpcClient) GetTimeTickChannel() (string, error) { - rsp, err := c.grpcClient.GetTimeTickChannelRPC(context.Background(), &commonpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{}) if err != nil { return "", err } @@ -124,7 +155,9 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) { //receive ddl from rpc and time tick from proxy service, and put them into this channel func (c *GrpcClient) GetDdChannel() (string, error) { - rsp, err := c.grpcClient.GetDdChannelRPC(context.Background(), &commonpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{}) if err != nil { return "", err } @@ -136,7 +169,9 @@ func (c *GrpcClient) GetDdChannel() (string, error) { //just define a channel, not used currently func (c *GrpcClient) GetStatisticsChannel() (string, error) { - rsp, err := c.grpcClient.GetStatisticsChannelRPC(context.Background(), &commonpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{}) if err != nil { return "", err } @@ -147,9 +182,13 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) { } func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - return c.grpcClient.DescribeSegment(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DescribeSegment(ctx, in) } func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - return c.grpcClient.ShowSegments(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.ShowSegments(ctx, in) } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 17ecc0acd6..f2e9514c69 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -1,6 +1,7 @@ package masterservice import ( + "context" "fmt" "math/rand" "regexp" @@ -26,7 +27,7 @@ func TestGrpcService(t *testing.T) { //cms.Params.Address = "127.0.0.1" cms.Params.Port = (randVal % 100) + 10000 - svr, err := NewGrpcServer() + svr, err := NewGrpcServer(context.Background()) assert.Nil(t, err) // cms.Params.NodeID = 0 diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index a531f860e3..e1731e3025 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -6,6 +6,7 @@ import ( "net" "sync" + "github.com/zilliztech/milvus-distributed/internal/errors" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -26,10 +27,10 @@ type GrpcServer struct { cancel context.CancelFunc } -func NewGrpcServer() (*GrpcServer, error) { +func NewGrpcServer(ctx context.Context) (*GrpcServer, error) { s := &GrpcServer{} var err error - s.ctx, s.cancel = context.WithCancel(context.Background()) + s.ctx, s.cancel = context.WithCancel(ctx) if s.core, err = cms.NewCore(s.ctx); err != nil { return nil, err } @@ -73,6 +74,30 @@ func (s *GrpcServer) Stop() error { return err } +func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error { + c, ok := s.core.(*cms.Core) + if !ok { + return errors.Errorf("set proxy service failed") + } + return c.SetProxyService(p) +} + +func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error { + c, ok := s.core.(*cms.Core) + if !ok { + return errors.Errorf("set data service failed") + } + return c.SetDataService(p) +} + +func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error { + c, ok := s.core.(*cms.Core) + if !ok { + return errors.Errorf("set index service failed") + } + return c.SetIndexService(p) +} + func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { return s.core.GetComponentStates() } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 5cf7eb90df..cc4338d4ab 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -2,6 +2,7 @@ package masterservice import ( "context" + "fmt" "log" "math/rand" "strconv" @@ -735,6 +736,13 @@ func (c *Core) GetStatisticsChannel() (string, error) { } func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &CreateCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -758,6 +766,13 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb } func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &DropCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -780,6 +795,16 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta } func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + Value: false, + }, nil + } t := &HasCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -809,6 +834,17 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR } func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + Schema: nil, + CollectionID: 0, + }, nil + } t := &DescribeCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -836,6 +872,16 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv } func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.ShowCollectionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + CollectionNames: nil, + }, nil + } t := &ShowCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -865,6 +911,13 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh } func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &CreatePartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -887,6 +940,13 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S } func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &DropPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -909,6 +969,16 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu } func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + Value: false, + }, nil + } t := &HasPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -938,6 +1008,17 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes } func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.ShowPartitionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + PartitionNames: nil, + PartitionIDs: nil, + }, nil + } t := &ShowPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -968,6 +1049,13 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show } func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &CreateIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -990,6 +1078,16 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e } func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.DescribeIndexResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + IndexDescriptions: nil, + }, nil + } t := &DescribeIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1020,6 +1118,16 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr } func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.DescribeSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + IndexID: 0, + }, nil + } t := &DescribeSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1050,6 +1158,16 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D } func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.ShowSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + SegmentIDs: nil, + }, nil + } t := &ShowSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 61dde4ba09..b4f8c45f4b 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -27,6 +27,8 @@ type ParamTable struct { MaxPartitionNum int64 DefaultPartitionName string DefaultIndexName string + + Timeout int } func (p *ParamTable) Init() { @@ -54,6 +56,8 @@ func (p *ParamTable) Init() { p.initMaxPartitionNum() p.initDefaultPartitionName() p.initDefaultIndexName() + + p.initTimeout() } func (p *ParamTable) initAddress() { @@ -163,3 +167,7 @@ func (p *ParamTable) initDefaultIndexName() { } p.DefaultIndexName = name } + +func (p *ParamTable) initTimeout() { + p.Timeout = p.ParseInt("master.timeout") +} diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go index 2c2b071e7c..af09ab4977 100644 --- a/internal/masterservice/param_table_test.go +++ b/internal/masterservice/param_table_test.go @@ -50,4 +50,7 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.DefaultIndexName, "") t.Logf("default index name = %s", Params.DefaultIndexName) + + assert.NotZero(t, Params.Timeout) + t.Logf("master timeout = %d", Params.Timeout) } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 1e6ab95506..3c45dc7a86 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -54,6 +54,9 @@ type collectionReplica interface { getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error) hasPartition(collectionID UniqueID, partitionTag string) bool + enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error + disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error + getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error) // segment getSegmentNum() int @@ -362,6 +365,43 @@ func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, par return false } +func (colReplica *collectionReplicaImpl) enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() + + partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) + if err != nil { + return err + } + + partition.enableDM = true + return nil +} + +func (colReplica *collectionReplicaImpl) disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() + + partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) + if err != nil { + return err + } + + partition.enableDM = false + return nil +} + +func (colReplica *collectionReplicaImpl) getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error) { + colReplica.mu.Lock() + defer colReplica.mu.Unlock() + + partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) + if err != nil { + return false, err + } + return partition.enableDM, nil +} + //----------------------------------------------------------------------------------------------------- segment func (colReplica *collectionReplicaImpl) getSegmentNum() int { colReplica.mu.RLock() diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 608006ec61..04856af94d 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -19,17 +19,18 @@ type dataSyncService struct { } func newDataSyncService(ctx context.Context, replica collectionReplica) *dataSyncService { - - return &dataSyncService{ + service := &dataSyncService{ ctx: ctx, fg: nil, replica: replica, } + + service.initNodes() + return service } func (dsService *dataSyncService) start() { - dsService.initNodes() dsService.fg.Start() } @@ -47,7 +48,7 @@ func (dsService *dataSyncService) initNodes() { var dmStreamNode node = dsService.newDmInputNode(dsService.ctx) var ddStreamNode node = dsService.newDDInputNode(dsService.ctx) - var filterDmNode node = newFilteredDmNode() + var filterDmNode node = newFilteredDmNode(dsService.replica) var ddNode node = newDDNode(dsService.replica) var insertNode node = newInsertNode(dsService.replica) diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index acef0f2b04..5a75be2c56 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -12,7 +12,8 @@ import ( type filterDmNode struct { baseNode - ddMsg *ddMsg + ddMsg *ddMsg + replica collectionReplica } func (fdmNode *filterDmNode) Name() string { @@ -102,6 +103,12 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { } func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg { + // TODO: open this check + // check if partition dm enable + //if enable, _ := fdmNode.replica.getEnablePartitionDM(msg.CollectionID, msg.PartitionID); !enable { + // return nil + //} + // No dd record, do all insert requests. records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionName] if !ok { @@ -154,7 +161,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg return msg } -func newFilteredDmNode() *filterDmNode { +func newFilteredDmNode(replica collectionReplica) *filterDmNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -164,5 +171,6 @@ func newFilteredDmNode() *filterDmNode { return &filterDmNode{ baseNode: baseNode, + replica: replica, } } diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index e5e76d24a8..7a41c50e3e 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -12,9 +12,11 @@ import ( type ParamTable struct { paramtable.BaseTable - PulsarAddress string - ETCDAddress string - MetaRootPath string + PulsarAddress string + ETCDAddress string + MetaRootPath string + WriteNodeSegKvSubPath string + IndexBuilderAddress string QueryNodeIP string QueryNodePort int64 @@ -131,6 +133,8 @@ func (p *ParamTable) Init() { p.initPulsarAddress() p.initETCDAddress() p.initMetaRootPath() + p.initWriteNodeSegKvSubPath() + p.initIndexBuilderAddress() p.initGracefulTime() p.initMsgChannelSubName() @@ -246,6 +250,14 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = url } +func (p *ParamTable) initIndexBuilderAddress() { + ret, err := p.Load("_IndexBuilderAddress") + if err != nil { + panic(err) + } + p.IndexBuilderAddress = ret +} + func (p *ParamTable) initInsertChannelRange() { insertChannelRange, err := p.Load("msgChannel.channelRange.insert") if err != nil { @@ -338,6 +350,14 @@ func (p *ParamTable) initMetaRootPath() { p.MetaRootPath = rootPath + "/" + subPath } +func (p *ParamTable) initWriteNodeSegKvSubPath() { + subPath, err := p.Load("etcd.writeNodeSegKvSubPath") + if err != nil { + panic(err) + } + p.WriteNodeSegKvSubPath = subPath + "/" +} + func (p *ParamTable) initGracefulTime() { p.GracefulTime = p.ParseInt64("queryNode.gracefulTime") } diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index 1b5279f113..5b835b8616 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -16,6 +16,7 @@ type Partition struct { partitionTag string id UniqueID segments []*Segment + enableDM bool } func (p *Partition) ID() UniqueID { @@ -33,6 +34,7 @@ func (p *Partition) Segments() *[]*Segment { func newPartition2(partitionTag string) *Partition { var newPartition = &Partition{ partitionTag: partitionTag, + enableDM: false, } return newPartition @@ -40,7 +42,8 @@ func newPartition2(partitionTag string) *Partition { func newPartition(partitionID UniqueID) *Partition { var newPartition = &Partition{ - id: partitionID, + id: partitionID, + enableDM: false, } return newPartition diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 72f583cd30..7b8270877b 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -136,7 +136,7 @@ func (node *QueryNode) Start() error { node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica) node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan) - node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.loadIndexService.loadIndexReqChan) + node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.dataSyncService.dmStream, node.loadIndexService.loadIndexReqChan) // start services go node.dataSyncService.start() @@ -344,14 +344,31 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S segmentIDs := in.SegmentIDs fieldIDs := in.FieldIDs + err := node.replica.enablePartitionDM(collectionID, partitionID) + if err != nil { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + return status, err + } + // segments are ordered before LoadSegments calling if in.LastSegmentState.State == datapb.SegmentState_SegmentGrowing { segmentNum := len(segmentIDs) - node.segManager.seekSegment(segmentIDs[segmentNum-1]) + positions := in.LastSegmentState.StartPositions + err = node.segManager.seekSegment(positions) + if err != nil { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + return status, err + } segmentIDs = segmentIDs[:segmentNum-1] } - err := node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) + err = node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -363,6 +380,17 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S } func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { + for _, id := range in.PartitionIDs { + err := node.replica.enablePartitionDM(in.CollectionID, id) + if err != nil { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + return status, err + } + } + // release all fields in the segments for _, id := range in.SegmentIDs { err := node.segManager.releaseSegment(id) diff --git a/internal/querynode/segment_manager.go b/internal/querynode/segment_manager.go index 72c880976b..0a189bbde9 100644 --- a/internal/querynode/segment_manager.go +++ b/internal/querynode/segment_manager.go @@ -5,6 +5,7 @@ import ( "errors" "strconv" + indexnodeclient "github.com/zilliztech/milvus-distributed/internal/indexnode/client" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -13,52 +14,31 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/storage" + writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client" ) type segmentManager struct { replica collectionReplica + dmStream msgstream.MsgStream loadIndexReqChan chan []msgstream.TsMsg - // TODO: replace by client instead of grpc client - dataClient datapb.DataServiceClient - indexBuilderClient indexpb.IndexServiceClient + dataClient *writerclient.Client + indexClient *indexnodeclient.Client kv kv.Base // minio kv iCodec *storage.InsertCodec } -func newSegmentManager(ctx context.Context, replica collectionReplica, loadIndexReqChan chan []msgstream.TsMsg) *segmentManager { - bucketName := Params.MinioBucketName - option := &miniokv.Option{ - Address: Params.MinioEndPoint, - AccessKeyID: Params.MinioAccessKeyID, - SecretAccessKeyID: Params.MinioSecretAccessKey, - UseSSL: Params.MinioUseSSLStr, - BucketName: bucketName, - CreateBucket: true, - } - - minioKV, err := miniokv.NewMinIOKV(ctx, option) - if err != nil { - panic(err) - } - - return &segmentManager{ - replica: replica, - loadIndexReqChan: loadIndexReqChan, - - // TODO: init clients - dataClient: nil, - indexBuilderClient: nil, - - kv: minioKV, - iCodec: &storage.InsertCodec{}, - } -} - -func (s *segmentManager) seekSegment(segmentID UniqueID) { - // TODO: impl +func (s *segmentManager) seekSegment(positions []*internalPb.MsgPosition) error { + // TODO: open seek + //for _, position := range positions { + // err := s.dmStream.Seek(position) + // if err != nil { + // return err + // } + //} + return nil } func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID, segmentIDs []UniqueID, fieldIDs []int64) error { @@ -81,7 +61,11 @@ func (s *segmentManager) loadSegment(collectionID UniqueID, partitionID UniqueID } targetFields := s.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) - // create segment + // replace segment + err = s.replica.removeSegment(segmentID) + if err != nil { + return err + } err = s.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) if err != nil { return err @@ -118,16 +102,25 @@ func (s *segmentManager) getInsertBinlogPaths(segmentID UniqueID) ([]*internalPb SegmentID: segmentID, } - pathResponse, err := s.dataClient.GetInsertBinlogPaths(context.TODO(), insertBinlogPathRequest) + pathResponse, err := s.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest.SegmentID) if err != nil { return nil, nil, err } - if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { - return nil, nil, errors.New("illegal InsertBinlogPathsResponse") + //if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { + // return nil, nil, errors.New("illegal InsertBinlogPathsResponse") + //} + + fieldIDs := make([]int64, 0) + paths := make([]*internalPb.StringList, 0) + for k, v := range pathResponse { + fieldIDs = append(fieldIDs, k) + paths = append(paths, &internalPb.StringList{ + Values: v, + }) } - return pathResponse.Paths, pathResponse.FieldIDs, nil + return paths, fieldIDs, nil } func (s *segmentManager) filterOutNeedlessFields(paths []*internalPb.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalPb.StringList { @@ -234,12 +227,15 @@ func (s *segmentManager) getIndexPaths(indexID UniqueID) ([]string, error) { indexFilePathRequest := &indexpb.IndexFilePathsRequest{ IndexIDs: []UniqueID{indexID}, } - pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), indexFilePathRequest) - if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + pathResponse, err := s.indexClient.GetIndexFilePaths(indexFilePathRequest.IndexIDs) + //if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + // return nil, err + //} + if err != nil { return nil, err } - return pathResponse.FilePaths[0].IndexFilePaths, nil + return pathResponse[0], nil } func (s *segmentManager) getIndexParam() (indexParam, error) { @@ -293,3 +289,42 @@ func (s *segmentManager) sendLoadIndex(indexPaths []string, messages := []msgstream.TsMsg{loadIndexMsg} s.loadIndexReqChan <- messages } + +func newSegmentManager(ctx context.Context, replica collectionReplica, dmStream msgstream.MsgStream, loadIndexReqChan chan []msgstream.TsMsg) *segmentManager { + bucketName := Params.MinioBucketName + option := &miniokv.Option{ + Address: Params.MinioEndPoint, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSLStr, + BucketName: bucketName, + CreateBucket: true, + } + + minioKV, err := miniokv.NewMinIOKV(ctx, option) + if err != nil { + panic(err) + } + + dataClient, err := writerclient.NewWriterClient(Params.ETCDAddress, Params.MetaRootPath, Params.WriteNodeSegKvSubPath, nil) + if err != nil { + panic(err) + } + + indexClient, err := indexnodeclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress) + if err != nil { + panic(err) + } + + return &segmentManager{ + replica: replica, + dmStream: dmStream, + loadIndexReqChan: loadIndexReqChan, + + dataClient: dataClient, + indexClient: indexClient, + + kv: minioKV, + iCodec: &storage.InsertCodec{}, + } +} diff --git a/internal/querynode/segment_manager_test.go b/internal/querynode/segment_manager_test.go index 015703579e..9686e9f4b9 100644 --- a/internal/querynode/segment_manager_test.go +++ b/internal/querynode/segment_manager_test.go @@ -16,6 +16,9 @@ import ( "github.com/zilliztech/milvus-distributed/internal/indexnode" minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" + "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -23,7 +26,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/storage" ) -func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) ([]*internalPb.StringList, []int64, error) { +func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, keyPrefix string) ([]*internalPb.StringList, []int64, error) { const ( msgLength = 1000 DIM = 16 @@ -108,10 +111,8 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID } // binLogs -> minIO/S3 - collIDStr := strconv.FormatInt(collectionID, 10) - partitionIDStr := strconv.FormatInt(partitionID, 10) segIDStr := strconv.FormatInt(segmentID, 10) - keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", collIDStr, partitionIDStr, segIDStr) + keyPrefix = path.Join(keyPrefix, segIDStr) paths := make([]*internalPb.StringList, 0) fieldIDs := make([]int64, 0) @@ -214,18 +215,197 @@ func generateIndex(segmentID UniqueID) ([]string, indexParam, error) { return indexPaths, indexParams, nil } +func doInsert(ctx context.Context, collectionName string, partitionTag string, segmentID UniqueID) error { + const msgLength = 1000 + const DIM = 16 + + var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + var rawData []byte + for _, ele := range vec { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) + rawData = append(rawData, buf...) + } + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, 1) + rawData = append(rawData, bs...) + + timeRange := TimeRange{ + timestampMin: 0, + timestampMax: math.MaxUint64, + } + + // messages generate + insertMessages := make([]msgstream.TsMsg, 0) + for i := 0; i < msgLength; i++ { + var msg msgstream.TsMsg = &msgstream.InsertMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{ + uint32(i), + }, + }, + InsertRequest: internalPb.InsertRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kInsert, + MsgID: 0, + Timestamp: uint64(i + 1000), + SourceID: 0, + }, + CollectionName: collectionName, + PartitionName: partitionTag, + SegmentID: segmentID, + ChannelID: "0", + Timestamps: []uint64{uint64(i + 1000)}, + RowIDs: []int64{int64(i)}, + RowData: []*commonpb.Blob{ + {Value: rawData}, + }, + }, + } + insertMessages = append(insertMessages, msg) + } + + msgPack := msgstream.MsgPack{ + BeginTs: timeRange.timestampMin, + EndTs: timeRange.timestampMax, + Msgs: insertMessages, + } + + // generate timeTick + timeTickMsgPack := msgstream.MsgPack{} + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: 1000, + EndTimestamp: 1500, + HashValues: []uint32{0}, + } + timeTickResult := internalPb.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kTimeTick, + MsgID: 0, + Timestamp: 1000, + SourceID: 0, + }, + } + timeTickMsg := &msgstream.TimeTickMsg{ + BaseMsg: baseMsg, + TimeTickMsg: timeTickResult, + } + timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) + + // pulsar produce + const receiveBufSize = 1024 + insertChannels := Params.InsertChannelNames + ddChannels := Params.DDChannelNames + pulsarURL := Params.PulsarAddress + + insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) + insertStream.SetPulsarClient(pulsarURL) + insertStream.CreatePulsarProducers(insertChannels) + unmarshalDispatcher := util.NewUnmarshalDispatcher() + insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName, unmarshalDispatcher, receiveBufSize) + + ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) + ddStream.SetPulsarClient(pulsarURL) + ddStream.CreatePulsarProducers(ddChannels) + + var insertMsgStream msgstream.MsgStream = insertStream + insertMsgStream.Start() + + var ddMsgStream msgstream.MsgStream = ddStream + ddMsgStream.Start() + + err := insertMsgStream.Produce(&msgPack) + if err != nil { + return err + } + + err = insertMsgStream.Broadcast(&timeTickMsgPack) + if err != nil { + return err + } + err = ddMsgStream.Broadcast(&timeTickMsgPack) + if err != nil { + return err + } + + //messages := insertStream.Consume() + //for _, msg := range messages.Msgs { + // + //} + + return nil +} + +func sentTimeTick(ctx context.Context) error { + timeTickMsgPack := msgstream.MsgPack{} + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: 1500, + EndTimestamp: 2000, + HashValues: []uint32{0}, + } + timeTickResult := internalPb.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kTimeTick, + MsgID: 0, + Timestamp: math.MaxUint64, + SourceID: 0, + }, + } + timeTickMsg := &msgstream.TimeTickMsg{ + BaseMsg: baseMsg, + TimeTickMsg: timeTickResult, + } + timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) + + // pulsar produce + const receiveBufSize = 1024 + insertChannels := Params.InsertChannelNames + ddChannels := Params.DDChannelNames + pulsarURL := Params.PulsarAddress + + insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) + insertStream.SetPulsarClient(pulsarURL) + insertStream.CreatePulsarProducers(insertChannels) + unmarshalDispatcher := util.NewUnmarshalDispatcher() + insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName, unmarshalDispatcher, receiveBufSize) + + ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize) + ddStream.SetPulsarClient(pulsarURL) + ddStream.CreatePulsarProducers(ddChannels) + + var insertMsgStream msgstream.MsgStream = insertStream + insertMsgStream.Start() + + var ddMsgStream msgstream.MsgStream = ddStream + ddMsgStream.Start() + + err := insertMsgStream.Broadcast(&timeTickMsgPack) + if err != nil { + return err + } + err = ddMsgStream.Broadcast(&timeTickMsgPack) + if err != nil { + return err + } + return nil +} + func TestSegmentManager_load_release_and_search(t *testing.T) { collectionID := UniqueID(0) partitionID := UniqueID(1) segmentID := UniqueID(2) fieldIDs := []int64{0, 101} + // mock write insert bin log + keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", strconv.FormatInt(collectionID, 10), strconv.FormatInt(partitionID, 10)) + Params.WriteNodeSegKvSubPath = keyPrefix + node := newQueryNodeMock() defer node.Stop() ctx := node.queryNodeLoopCtx node.loadIndexService = newLoadIndexService(ctx, node.replica) - node.segManager = newSegmentManager(ctx, node.replica, node.loadIndexService.loadIndexReqChan) + node.segManager = newSegmentManager(ctx, node.replica, nil, node.loadIndexService.loadIndexReqChan) go node.loadIndexService.start() collectionName := "collection0" @@ -237,7 +417,7 @@ func TestSegmentManager_load_release_and_search(t *testing.T) { err = node.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) assert.NoError(t, err) - paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID) + paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) assert.NoError(t, err) fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) @@ -299,3 +479,111 @@ func TestSegmentManager_load_release_and_search(t *testing.T) { <-ctx.Done() } + +//// NOTE: start pulsar before test +//func TestSegmentManager_with_seek(t *testing.T) { +// collectionID := UniqueID(0) +// partitionID := UniqueID(1) +// //segmentID := UniqueID(2) +// fieldIDs := []int64{0, 101} +// +// //// mock write insert bin log +// //keyPrefix := path.Join("query-node-seg-manager-test-minio-prefix", strconv.FormatInt(collectionID, 10), strconv.FormatInt(partitionID, 10)) +// //Params.WriteNodeSegKvSubPath = keyPrefix + "/" +// node := newQueryNodeMock() +// +// ctx := node.queryNodeLoopCtx +// go node.Start() +// +// collectionName := "collection0" +// initTestMeta(t, node, collectionName, collectionID, 0) +// +// err := node.replica.addPartition(collectionID, partitionID) +// assert.NoError(t, err) +// +// //err = node.replica.addSegment(segmentID, partitionID, collectionID, segTypeSealed) +// //assert.NoError(t, err) +// +// //paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) +// //assert.NoError(t, err) +// +// //fieldsMap := node.segManager.filterOutNeedlessFields(paths, srcFieldIDs, fieldIDs) +// //assert.Equal(t, len(fieldsMap), 2) +// +// segmentIDToInsert := UniqueID(3) +// err = doInsert(ctx, collectionName, "default", segmentIDToInsert) +// assert.NoError(t, err) +// +// startPositions := make([]*internalPb.MsgPosition, 0) +// for _, ch := range Params.InsertChannelNames { +// startPositions = append(startPositions, &internalPb.MsgPosition{ +// ChannelName: ch, +// }) +// } +// var positions []*internalPb.MsgPosition +// lastSegStates := &datapb.SegmentStatesResponse{ +// State: datapb.SegmentState_SegmentGrowing, +// StartPositions: positions, +// } +// loadReq := &querypb.LoadSegmentRequest{ +// CollectionID: collectionID, +// PartitionID: partitionID, +// SegmentIDs: []UniqueID{segmentIDToInsert}, +// FieldIDs: fieldIDs, +// LastSegmentState: lastSegStates, +// } +// _, err = node.LoadSegments(loadReq) +// assert.NoError(t, err) +// +// err = sentTimeTick(ctx) +// assert.NoError(t, err) +// +// // do search +// dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }" +// +// const DIM = 16 +// var searchRawData []byte +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// for _, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) +// searchRawData = append(searchRawData, buf...) +// } +// placeholderValue := milvuspb.PlaceholderValue{ +// Tag: "$0", +// Type: milvuspb.PlaceholderType_VECTOR_FLOAT, +// Values: [][]byte{searchRawData}, +// } +// +// placeholderGroup := milvuspb.PlaceholderGroup{ +// Placeholders: []*milvuspb.PlaceholderValue{&placeholderValue}, +// } +// +// placeHolderGroupBlob, err := proto.Marshal(&placeholderGroup) +// assert.NoError(t, err) +// +// //searchTimestamp := Timestamp(1020) +// collection, err := node.replica.getCollectionByID(collectionID) +// assert.NoError(t, err) +// plan, err := createPlan(*collection, dslString) +// assert.NoError(t, err) +// holder, err := parserPlaceholderGroup(plan, placeHolderGroupBlob) +// assert.NoError(t, err) +// placeholderGroups := make([]*PlaceholderGroup, 0) +// placeholderGroups = append(placeholderGroups, holder) +// +// // wait for segment building index +// time.Sleep(3 * time.Second) +// +// //segment, err := node.replica.getSegmentByID(segmentIDToInsert) +// //assert.NoError(t, err) +// //_, err = segment.segmentSearch(plan, placeholderGroups, []Timestamp{searchTimestamp}) +// //assert.Nil(t, err) +// +// plan.delete() +// holder.delete() +// +// <-ctx.Done() +// err = node.Stop() +// assert.NoError(t, err) +//} diff --git a/internal/timesync/timetick_watcher.go b/internal/timesync/timetick_watcher.go index 9ba4c87d24..906453917f 100644 --- a/internal/timesync/timetick_watcher.go +++ b/internal/timesync/timetick_watcher.go @@ -17,14 +17,6 @@ type MsgTimeTickWatcher struct { msgQueue chan *ms.TimeTickMsg } -func NewMsgTimeTickWatcher(streams ...ms.MsgStream) *MsgTimeTickWatcher { - watcher := &MsgTimeTickWatcher{ - streams: streams, - msgQueue: make(chan *ms.TimeTickMsg), - } - return watcher -} - func (watcher *MsgTimeTickWatcher) Watch(msg *ms.TimeTickMsg) { watcher.msgQueue <- msg }