Add rocksmq unittest with goroutines

Signed-off-by: yukun <kun.yu@zilliz.com>
This commit is contained in:
yukun 2021-01-20 16:46:58 +08:00 committed by yefu.chen
parent c35079d7e7
commit 527c0c49df
19 changed files with 708 additions and 295 deletions

View File

@ -125,10 +125,6 @@ build-go: build-cpp
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null
@echo "Building singlenode ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/singlenode $(PWD)/cmd/singlenode/main.go 1>/dev/null
@echo "Building distributed indexservice ..."
@mkdir -p $(INSTALL_PATH)/distributed && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/distributed/indexservice $(PWD)/cmd/distributed/indexservice/main.go 1>/dev/null
@echo "Building distributed indexnode ..."
@mkdir -p $(INSTALL_PATH)/distributed && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/distributed/indexnode $(PWD)/cmd/distributed/indexnode/main.go 1>/dev/null
build-cpp:
@(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)")

View File

@ -14,13 +14,11 @@ type Master interface {
DropCollection(req DropCollectionRequest) error
HasCollection(req HasCollectionRequest) (bool, error)
DescribeCollection(req DescribeCollectionRequest) (DescribeCollectionResponse, error)
GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
ShowCollections(req ShowCollectionRequest) (ShowCollectionResponse, error)
CreatePartition(req CreatePartitionRequest) error
DropPartition(req DropPartitionRequest) error
HasPartition(req HasPartitionRequest) (bool, error)
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse, error)
DescribeSegment(req DescribeSegmentRequest) (DescribeSegmentResponse, error)

View File

@ -23,6 +23,9 @@ type DataService interface {
GetInsertBinlogPaths(req InsertBinlogPathRequest) (InsertBinlogPathsResponse, error)
GetInsertChannels(req InsertChannelRequest) ([]string, error)
GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
}
```
@ -164,6 +167,35 @@ type InsertChannelRequest struct {
```
* *GetCollectionStatistics*
```go
type CollectionStatsRequest struct {
MsgBase
DbName string
CollectionName string
}
type CollectionStatsResponse struct {
Stats []KeyValuePair
}
```
* *GetPartitionStatistics*
```go
type PartitionStatsRequest struct {
MsgBase
DbName string
CollectionName string
PartitionName string
}
type PartitionStatsResponse struct {
Stats []KeyValuePair
}
```
#### 8.2 Insert Channel
@ -199,7 +231,6 @@ type DataNode interface {
```
* *WatchDmChannels*
```go

View File

@ -1,4 +1,4 @@
package grpcindexnodeclient
package grpcindexnode
import (
"context"
@ -7,6 +7,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"google.golang.org/grpc"
)
@ -14,6 +15,32 @@ type Client struct {
grpcClient indexpb.IndexNodeClient
}
func (c Client) Init() {
//TODO:???
panic("implement me")
}
func (c Client) Start() {
//TODO:???
panic("implement me")
}
func (c Client) Stop() {
panic("implement me")
}
func (c Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
panic("implement me")
}
func (c Client) GetTimeTickChannel() (string, error) {
panic("implement me")
}
func (c Client) GetStatisticsChannel() (string, error) {
panic("implement me")
}
func (c Client) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
ctx := context.TODO()
@ -26,9 +53,9 @@ func NewClient(nodeAddress string) *Client {
defer cancel()
conn, err := grpc.DialContext(ctx1, nodeAddress, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Printf("Connect to IndexNode failed, error= %v", err)
log.Printf("IndexNode connect to IndexService failed, error= %v", err)
}
log.Printf("Connected to IndexService, IndexService=%s", nodeAddress)
log.Printf("IndexNode connected to IndexService, IndexService=%s", Params.Address)
return &Client{
grpcClient: indexpb.NewIndexNodeClient(conn),
}

View File

@ -23,6 +23,7 @@ import (
)
func main() {
grpcindexnode.Init()
ctx, cancel := context.WithCancel(context.Background())
svr, err := grpcindexnode.CreateIndexNode(ctx)
if err != nil {

View File

@ -0,0 +1,177 @@
package grpcindexnode
import (
"net"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
Address string
Port int
ServiceAddress string
ServicePort int
NodeID int64
MasterAddress string
EtcdAddress string
MetaRootPath string
MinIOAddress string
MinIOAccessKeyID string
MinIOSecretAccessKey string
MinIOUseSSL bool
MinioBucketName string
}
var Params ParamTable
func (pt *ParamTable) Init() {
pt.BaseTable.Init()
pt.initAddress()
pt.initPort()
pt.initIndexServerAddr()
pt.initIndexServerPort()
pt.initEtcdAddress()
pt.initMasterAddress()
pt.initMetaRootPath()
pt.initMinIOAddress()
pt.initMinIOAccessKeyID()
pt.initMinIOSecretAccessKey()
pt.initMinIOUseSSL()
pt.initMinioBucketName()
}
func (pt *ParamTable) initAddress() {
addr, err := pt.Load("indexNode.address")
if err != nil {
panic(err)
}
hostName, _ := net.LookupHost(addr)
if len(hostName) <= 0 {
if ip := net.ParseIP(addr); ip == nil {
panic("invalid ip indexBuilder.address")
}
}
port, err := pt.Load("indexNode.port")
if err != nil {
panic(err)
}
_, err = strconv.Atoi(port)
if err != nil {
panic(err)
}
pt.Address = addr + ":" + port
}
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("indexNode.port")
}
func (pt *ParamTable) initIndexServerAddr() {
addr, err := pt.Load("indexServer.address")
if err != nil {
panic(err)
}
hostName, _ := net.LookupHost(addr)
if len(hostName) <= 0 {
if ip := net.ParseIP(addr); ip == nil {
panic("invalid ip indexBuilder.address")
}
}
port, err := pt.Load("indexServer.port")
if err != nil {
panic(err)
}
_, err = strconv.Atoi(port)
if err != nil {
panic(err)
}
pt.ServiceAddress = addr + ":" + port
}
func (pt ParamTable) initIndexServerPort() {
pt.ServicePort = pt.ParseInt("indexServer.port")
}
func (pt *ParamTable) initEtcdAddress() {
addr, err := pt.Load("_EtcdAddress")
if err != nil {
panic(err)
}
pt.EtcdAddress = addr
}
func (pt *ParamTable) initMetaRootPath() {
rootPath, err := pt.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := pt.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
pt.MetaRootPath = rootPath + "/" + subPath
}
func (pt *ParamTable) initMasterAddress() {
ret, err := pt.Load("_MasterAddress")
if err != nil {
panic(err)
}
pt.MasterAddress = ret
}
func (pt *ParamTable) initMinIOAddress() {
ret, err := pt.Load("_MinioAddress")
if err != nil {
panic(err)
}
pt.MinIOAddress = ret
}
func (pt *ParamTable) initMinIOAccessKeyID() {
ret, err := pt.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
pt.MinIOAccessKeyID = ret
}
func (pt *ParamTable) initMinIOSecretAccessKey() {
ret, err := pt.Load("minio.secretAccessKey")
if err != nil {
panic(err)
}
pt.MinIOSecretAccessKey = ret
}
func (pt *ParamTable) initMinIOUseSSL() {
ret, err := pt.Load("minio.useSSL")
if err != nil {
panic(err)
}
pt.MinIOUseSSL, err = strconv.ParseBool(ret)
if err != nil {
panic(err)
}
}
func (pt *ParamTable) initMinioBucketName() {
bucketName, err := pt.Load("minio.bucketName")
if err != nil {
panic(err)
}
pt.MinioBucketName = bucketName
}

View File

@ -7,7 +7,7 @@ import (
"strconv"
"sync"
serviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
grpcindexservice "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice"
"github.com/zilliztech/milvus-distributed/internal/indexnode"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
@ -19,29 +19,27 @@ type Server struct {
grpcServer *grpc.Server
loopCtx context.Context
loopCancel func()
loopWg sync.WaitGroup
indexNodeLoopCtx context.Context
indexNodeLoopCancel func()
indexNodeLoopWg sync.WaitGroup
}
func NewGrpcServer(ctx context.Context, nodeID int64) *Server {
ctx1, cancel := context.WithCancel(ctx)
func NewGrpcServer(ctx context.Context, indexID int64) *Server {
return &Server{
loopCtx: ctx1,
loopCancel: cancel,
node: indexnode.NewIndexNode(ctx, nodeID),
node: indexnode.NewIndexNode(ctx, indexID),
}
}
func registerNode() error {
indexServiceClient := serviceclient.NewClient(indexnode.Params.ServiceAddress)
indexServiceClient := grpcindexservice.NewClient(Params.ServiceAddress)
request := &indexpb.RegisterNodeRequest{
Base: nil,
Address: &commonpb.Address{
Ip: indexnode.Params.NodeIP,
Port: int64(indexnode.Params.NodePort),
Ip: Params.Address,
Port: int64(Params.Port),
},
}
resp, err := indexServiceClient.RegisterNode(request)
@ -50,17 +48,17 @@ func registerNode() error {
return err
}
indexnode.Params.NodeID = resp.InitParams.NodeID
log.Println("Register indexNode successful with nodeID=", indexnode.Params.NodeID)
Params.NodeID = resp.InitParams.NodeID
log.Println("Register indexNode successful with nodeID=", Params.NodeID)
err = indexnode.Params.LoadFromKVPair(resp.InitParams.StartParams)
err = Params.LoadFromKVPair(resp.InitParams.StartParams)
return err
}
func (s *Server) grpcLoop() {
defer s.loopWg.Done()
defer s.indexNodeLoopWg.Done()
lis, err := net.Listen("tcp", ":"+strconv.Itoa(indexnode.Params.NodePort))
lis, err := net.Listen("tcp", ":"+strconv.Itoa(Params.Port))
if err != nil {
log.Fatalf("IndexNode grpc server fatal error=%v", err)
}
@ -70,41 +68,45 @@ func (s *Server) grpcLoop() {
if err = s.grpcServer.Serve(lis); err != nil {
log.Fatalf("IndexNode grpc server fatal error=%v", err)
}
log.Println("IndexNode grpc server starting...")
}
func (s *Server) startIndexNode() error {
s.loopWg.Add(1)
s.indexNodeLoopWg.Add(1)
//TODO: How to make sure that grpc server has started successfully
go s.grpcLoop()
log.Println("IndexNode grpc server start successfully")
err := registerNode()
if err != nil {
return err
}
indexnode.Params.Init()
Params.Init()
return nil
}
func (s *Server) Init() {
indexnode.Params.Init()
log.Println("IndexNode init successfully, nodeAddress=", indexnode.Params.NodeAddress)
func Init() {
Params.Init()
}
func CreateIndexNode(ctx context.Context) (*Server, error) {
return NewGrpcServer(ctx, indexnode.Params.NodeID), nil
ctx1, cancel := context.WithCancel(ctx)
s := &Server{
indexNodeLoopCtx: ctx1,
indexNodeLoopCancel: cancel,
}
return s, nil
}
func (s *Server) Start() error {
s.Init()
return s.startIndexNode()
}
func (s *Server) Stop() {
s.loopWg.Wait()
s.indexNodeLoopWg.Wait()
}
func (s *Server) Close() {

View File

@ -1,4 +1,4 @@
package grpcindexserviceclient
package grpcindexservice
import (
"context"
@ -6,14 +6,41 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type Client struct {
grpcClient indexpb.IndexServiceClient
}
func (g Client) Init() {
panic("implement me")
}
func (g Client) Start() {
panic("implement me")
}
func (g Client) Stop() {
panic("implement me")
}
func (g Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
panic("implement me")
}
func (g Client) GetTimeTickChannel() (string, error) {
panic("implement me")
}
func (g Client) GetStatisticsChannel() (string, error) {
panic("implement me")
}
func (g Client) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
ctx := context.TODO()
@ -53,9 +80,9 @@ func NewClient(address string) *Client {
defer cancel()
conn, err := grpc.DialContext(ctx1, address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Printf("Connect to IndexService failed, error= %v", err)
log.Printf("IndexNode connect to IndexService failed, error= %v", err)
}
log.Printf("Connected to IndexService, IndexService=%s", address)
log.Printf("IndexNode connected to IndexService, IndexService=%s", Params.Address)
return &Client{
grpcClient: indexpb.NewIndexServiceClient(conn),

View File

@ -23,6 +23,7 @@ import (
)
func main() {
grpcindexserver.Init()
ctx, cancel := context.WithCancel(context.Background())
svr, err := grpcindexserver.CreateIndexServer(ctx)
if err != nil {

View File

@ -0,0 +1,144 @@
package grpcindexservice
import (
"net"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
Address string
Port int
NodeID int64
MasterAddress string
EtcdAddress string
MetaRootPath string
MinIOAddress string
MinIOAccessKeyID string
MinIOSecretAccessKey string
MinIOUseSSL bool
MinioBucketName string
}
var Params ParamTable
func (pt *ParamTable) Init() {
pt.BaseTable.Init()
pt.initAddress()
pt.initPort()
pt.initEtcdAddress()
pt.initMasterAddress()
pt.initMetaRootPath()
pt.initMinIOAddress()
pt.initMinIOAccessKeyID()
pt.initMinIOSecretAccessKey()
pt.initMinIOUseSSL()
pt.initMinioBucketName()
}
func (pt *ParamTable) initAddress() {
addr, err := pt.Load("indexServer.address")
if err != nil {
panic(err)
}
hostName, _ := net.LookupHost(addr)
if len(hostName) <= 0 {
if ip := net.ParseIP(addr); ip == nil {
panic("invalid ip indexBuilder.address")
}
}
port, err := pt.Load("indexServer.port")
if err != nil {
panic(err)
}
_, err = strconv.Atoi(port)
if err != nil {
panic(err)
}
pt.Address = addr + ":" + port
}
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("indexServer.port")
}
func (pt *ParamTable) initEtcdAddress() {
addr, err := pt.Load("_EtcdAddress")
if err != nil {
panic(err)
}
pt.EtcdAddress = addr
}
func (pt *ParamTable) initMetaRootPath() {
rootPath, err := pt.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := pt.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
pt.MetaRootPath = rootPath + "/" + subPath
}
func (pt *ParamTable) initMasterAddress() {
ret, err := pt.Load("_MasterAddress")
if err != nil {
panic(err)
}
pt.MasterAddress = ret
}
func (pt *ParamTable) initMinIOAddress() {
ret, err := pt.Load("_MinioAddress")
if err != nil {
panic(err)
}
pt.MinIOAddress = ret
}
func (pt *ParamTable) initMinIOAccessKeyID() {
ret, err := pt.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
pt.MinIOAccessKeyID = ret
}
func (pt *ParamTable) initMinIOSecretAccessKey() {
ret, err := pt.Load("minio.secretAccessKey")
if err != nil {
panic(err)
}
pt.MinIOSecretAccessKey = ret
}
func (pt *ParamTable) initMinIOUseSSL() {
ret, err := pt.Load("minio.useSSL")
if err != nil {
panic(err)
}
pt.MinIOUseSSL, err = strconv.ParseBool(ret)
if err != nil {
panic(err)
}
}
func (pt *ParamTable) initMinioBucketName() {
bucketName, err := pt.Load("minio.bucketName")
if err != nil {
panic(err)
}
pt.MinioBucketName = bucketName
}

View File

@ -29,11 +29,13 @@ type Server struct {
}
func (s *Server) Init() {
indexservice.Params.Init()
log.Println("initing params ...")
Params.Init()
}
func (s *Server) Start() error {
s.Init()
log.Println("stringing indexserver ...")
return s.startIndexServer()
}
@ -62,6 +64,20 @@ func (s *Server) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequ
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
return s.server.BuildIndex(req)
//indexID := int64(0)
//request := &indexpb.BuildIndexCmd{
// IndexID: indexID,
// Req: req,
//}
//
//indexNodeClient := grpcindexnode.NewClient()
//
//status, err := indexNodeClient.BuildIndex(request)
//response := &indexpb.BuildIndexResponse{
// Status: status,
// IndexID: indexID,
//}
//return response, err
}
func (s *Server) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {
@ -79,6 +95,8 @@ func (s *Server) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNo
return s.server.NotifyBuildIndex(nty)
}
//varindex
func NewServer() *Server {
return &Server{
@ -91,7 +109,7 @@ func (s *Server) grpcLoop() {
defer s.loopWg.Done()
log.Println("Starting start IndexServer")
lis, err := net.Listen("tcp", ":"+strconv.Itoa(indexservice.Params.Port))
lis, err := net.Listen("tcp", ":"+strconv.Itoa(Params.Port))
if err != nil {
log.Fatalf("IndexServer grpc server fatal error=%v", err)
}
@ -103,16 +121,20 @@ func (s *Server) grpcLoop() {
if err = s.grpcServer.Serve(lis); err != nil {
log.Fatalf("IndexServer grpc server fatal error=%v", err)
}
log.Println("IndexServer grpc server starting...")
}
func (s *Server) startIndexServer() error {
s.loopWg.Add(1)
go s.grpcLoop()
log.Println("IndexServer grpc server start successfully")
return nil
}
func Init() {
Params.Init()
}
func CreateIndexServer(ctx context.Context) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)

View File

@ -10,6 +10,8 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/indexservice"
"github.com/zilliztech/milvus-distributed/internal/kv"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
@ -41,8 +43,8 @@ type IndexNode struct {
startCallbacks []func()
closeCallbacks []func()
indexNodeID int64
//serviceClient indexservice.Interface // method factory
indexNodeID int64
serviceClient indexservice.Interface // method factory
}
func (i *IndexNode) Init() {
@ -70,25 +72,58 @@ func (i *IndexNode) GetStatisticsChannel() (string, error) {
}
func (i *IndexNode) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
//TODO: build index in index node
ctx := context.Background()
t := NewIndexAddTask()
t.req = req.Req
t.idAllocator = i.idAllocator
t.buildQueue = i.sched.IndexBuildQueue
t.table = i.metaTable
t.kv = i.kv
var cancel func()
t.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
log.Println("Create index with indexID=", req.IndexID)
return &commonpb.Status{
fn := func() error {
select {
case <-ctx.Done():
return errors.New("insert timeout")
default:
return i.sched.IndexAddQueue.Enqueue(t)
}
}
ret := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}, nil
}
err := fn()
if err != nil {
ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
ret.Reason = err.Error()
return ret, nil
}
err = t.WaitToFinish()
if err != nil {
ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
ret.Reason = err.Error()
return ret, nil
}
return ret, nil
}
func CreateIndexNode(ctx context.Context) (*IndexNode, error) {
return &IndexNode{}, nil
}
func NewIndexNode(ctx context.Context, nodeID int64) *IndexNode {
func NewIndexNode(ctx context.Context, indexID int64) *IndexNode {
ctx1, cancel := context.WithCancel(ctx)
in := &IndexNode{
loopCtx: ctx1,
loopCancel: cancel,
indexNodeID: nodeID,
indexNodeID: indexID,
}
return in

View File

@ -13,14 +13,6 @@ type ParamTable struct {
Address string
Port int
NodeAddress string
NodeIP string
NodePort int
ServiceAddress string
ServicePort int
NodeID int64
MasterAddress string
EtcdAddress string
@ -39,11 +31,6 @@ func (pt *ParamTable) Init() {
pt.BaseTable.Init()
pt.initAddress()
pt.initPort()
pt.initNodeAddress()
pt.initNodeIP()
pt.initNodePort()
pt.initIndexServerAddr()
pt.initIndexServerPort()
pt.initEtcdAddress()
pt.initMasterAddress()
pt.initMetaRootPath()
@ -83,72 +70,6 @@ func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("indexBuilder.port")
}
func (pt *ParamTable) initNodeAddress() {
addr, err := pt.Load("indexNode.address")
if err != nil {
panic(err)
}
hostName, _ := net.LookupHost(addr)
if len(hostName) <= 0 {
if ip := net.ParseIP(addr); ip == nil {
panic("invalid ip indexBuilder.address")
}
}
port, err := pt.Load("indexNode.port")
if err != nil {
panic(err)
}
_, err = strconv.Atoi(port)
if err != nil {
panic(err)
}
pt.NodeAddress = addr + ":" + port
}
func (pt *ParamTable) initNodeIP() {
addr, err := pt.Load("indexNode.address")
if err != nil {
panic(err)
}
pt.NodeIP = addr
}
func (pt *ParamTable) initNodePort() {
pt.NodePort = pt.ParseInt("indexNode.port")
}
func (pt *ParamTable) initIndexServerAddr() {
addr, err := pt.Load("indexServer.address")
if err != nil {
panic(err)
}
hostName, _ := net.LookupHost(addr)
if len(hostName) <= 0 {
if ip := net.ParseIP(addr); ip == nil {
panic("invalid ip indexServer.address")
}
}
port, err := pt.Load("indexServer.port")
if err != nil {
panic(err)
}
_, err = strconv.Atoi(port)
if err != nil {
panic(err)
}
pt.ServiceAddress = addr + ":" + port
}
func (pt ParamTable) initIndexServerPort() {
pt.ServicePort = pt.ParseInt("indexServer.port")
}
func (pt *ParamTable) initEtcdAddress() {
addr, err := pt.Load("_EtcdAddress")
if err != nil {

View File

@ -8,9 +8,7 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
grpcindexnodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode/client"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/indexnode"
"github.com/zilliztech/milvus-distributed/internal/kv"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -24,7 +22,7 @@ import (
type IndexService struct {
// implement Service
nodeClients []indexnode.Interface
//nodeClients [] .Interface
// factory method
loopCtx context.Context
loopCancel func()
@ -101,11 +99,6 @@ func (i *IndexService) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.
i.nodeNum++
nodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
log.Println(nodeAddress)
nodeClient := grpcindexnodeclient.NewClient(nodeAddress)
i.nodeClients = append(i.nodeClients, nodeClient)
return &indexpb.RegisterNodeResponse{
InitParams: &internalpb2.InitParams{
NodeID: nodeID,
@ -115,19 +108,28 @@ func (i *IndexService) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.
}
func (i *IndexService) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
//TODO: Allocator ID
indexID := int64(0)
nodeClient := i.nodeClients[0]
request := &indexpb.BuildIndexCmd{
IndexID: indexID,
Req: req,
}
status, err := nodeClient.BuildIndex(request)
return &indexpb.BuildIndexResponse{
Status: status,
IndexID: indexID,
}, err
//TODO: Multiple indexes will build at same time.
//ctx := context.Background()
//indexNodeClient := indexnode.NewIndexNode(ctx, rand.Int63n(i.nodeNum))
//
////TODO: Allocator index ID
//indexID := int64(0)
//
//request := &indexpb.BuildIndexCmd{
// IndexID: indexID,
// Req: req,
//}
//
//status, err := indexNodeClient.BuildIndex(request)
//if err != nil {
// return nil, err
//}
//
//return &indexpb.BuildIndexResponse{
// Status: status,
// IndexID: indexID,
//}, nil
return nil, nil
}
func (i *IndexService) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {

View File

@ -13,6 +13,8 @@ type ParamTable struct {
Address string
Port int
NodeID int64
MasterAddress string
EtcdAddress string

View File

@ -20,7 +20,7 @@ message ComponentInfo {
message ComponentStates {
ComponentInfo state = 1;
repeated common.KeyValuePair subcomponent_states = 2;
repeated ComponentInfo subcomponent_states = 2;
}
message NodeInfo {

View File

@ -113,11 +113,11 @@ func (m *ComponentInfo) GetExtraInfo() []*commonpb.KeyValuePair {
}
type ComponentStates struct {
State *ComponentInfo `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
SubcomponentStates []*commonpb.KeyValuePair `protobuf:"bytes,2,rep,name=subcomponent_states,json=subcomponentStates,proto3" json:"subcomponent_states,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
State *ComponentInfo `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
SubcomponentStates []*ComponentInfo `protobuf:"bytes,2,rep,name=subcomponent_states,json=subcomponentStates,proto3" json:"subcomponent_states,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ComponentStates) Reset() { *m = ComponentStates{} }
@ -152,7 +152,7 @@ func (m *ComponentStates) GetState() *ComponentInfo {
return nil
}
func (m *ComponentStates) GetSubcomponentStates() []*commonpb.KeyValuePair {
func (m *ComponentStates) GetSubcomponentStates() []*ComponentInfo {
if m != nil {
return m.SubcomponentStates
}
@ -1497,85 +1497,85 @@ func init() {
func init() { proto.RegisterFile("internal.proto", fileDescriptor_41f4a519b878ee3b) }
var fileDescriptor_41f4a519b878ee3b = []byte{
// 1272 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x57, 0x4f, 0x6f, 0x1b, 0xc5,
0x1b, 0xfe, 0xad, 0xed, 0xf8, 0xcf, 0x6b, 0x27, 0xcd, 0x6f, 0xe9, 0x9f, 0x2d, 0x14, 0xea, 0x2e,
0x05, 0x0c, 0x88, 0xa4, 0x4a, 0x11, 0x42, 0x5c, 0xda, 0x24, 0x6e, 0xe9, 0xaa, 0x49, 0x08, 0x13,
0xab, 0x12, 0xbd, 0xac, 0xc6, 0xde, 0x89, 0x3d, 0x74, 0x77, 0xc7, 0x9d, 0x99, 0x6d, 0xea, 0x9c,
0xb9, 0x21, 0xb8, 0x71, 0xe4, 0x00, 0x1f, 0x04, 0x24, 0x4e, 0x48, 0x5c, 0x11, 0x27, 0x3e, 0x09,
0x27, 0x34, 0x33, 0xbb, 0x6b, 0x3b, 0xdd, 0x44, 0x69, 0x00, 0xa1, 0x4a, 0xdc, 0x76, 0x9e, 0x79,
0xe7, 0x9d, 0xf7, 0x79, 0xe6, 0x9d, 0x67, 0x77, 0x61, 0x89, 0xc6, 0x92, 0xf0, 0x18, 0x87, 0x2b,
0x63, 0xce, 0x24, 0xb3, 0x2f, 0x44, 0x34, 0x7c, 0x92, 0x08, 0x33, 0x5a, 0xc9, 0x26, 0x5f, 0x6e,
0x0d, 0x58, 0x14, 0xb1, 0xd8, 0xc0, 0xee, 0x0f, 0x16, 0x2c, 0x6e, 0xb2, 0x68, 0xcc, 0x62, 0x12,
0x4b, 0x2f, 0xde, 0x67, 0xf6, 0x45, 0xa8, 0xc6, 0x2c, 0x20, 0x5e, 0xd7, 0xb1, 0xda, 0x56, 0xa7,
0x8c, 0xd2, 0x91, 0x6d, 0x43, 0x85, 0xb3, 0x90, 0x38, 0xa5, 0xb6, 0xd5, 0x69, 0x20, 0xfd, 0x6c,
0xdf, 0x02, 0x10, 0x12, 0x4b, 0xe2, 0x0f, 0x58, 0x40, 0x9c, 0x72, 0xdb, 0xea, 0x2c, 0xad, 0xb5,
0x57, 0x0a, 0xf7, 0x5d, 0xd9, 0x53, 0x81, 0x9b, 0x2c, 0x20, 0xa8, 0x21, 0xb2, 0x47, 0xfb, 0x36,
0x00, 0x79, 0x2a, 0x39, 0xf6, 0x69, 0xbc, 0xcf, 0x9c, 0x4a, 0xbb, 0xdc, 0x69, 0xae, 0x5d, 0x9b,
0x4f, 0x90, 0x96, 0x7b, 0x9f, 0x4c, 0x1e, 0xe0, 0x30, 0x21, 0xbb, 0x98, 0x72, 0xd4, 0xd0, 0x8b,
0x54, 0xb9, 0xee, 0x77, 0x16, 0x9c, 0xcb, 0x09, 0xe8, 0x3d, 0x84, 0xfd, 0x11, 0x2c, 0xe8, 0x2d,
0x34, 0x83, 0xe6, 0xda, 0xf5, 0x63, 0x2a, 0x9a, 0xe3, 0x8d, 0xcc, 0x12, 0x1b, 0xc1, 0x4b, 0x22,
0xe9, 0x0f, 0xb2, 0x29, 0x5f, 0xa3, 0xc2, 0x29, 0x9d, 0xb6, 0x34, 0x7b, 0x76, 0xb5, 0xa9, 0xc7,
0x7d, 0x00, 0xf5, 0x1d, 0x25, 0xa2, 0x92, 0xf7, 0x03, 0xa8, 0xe1, 0x20, 0xe0, 0x44, 0x88, 0xb4,
0xba, 0x2b, 0x85, 0x39, 0xd7, 0x4d, 0x0c, 0xca, 0x82, 0x8b, 0xe4, 0x77, 0x3f, 0x07, 0xf0, 0x62,
0x2a, 0x77, 0x31, 0xc7, 0x91, 0x38, 0xf6, 0xe0, 0xba, 0xd0, 0x12, 0x12, 0x73, 0xe9, 0x8f, 0x75,
0xdc, 0xe9, 0xa9, 0x34, 0xf5, 0x32, 0x93, 0xdd, 0xbd, 0x0e, 0xb0, 0x27, 0x39, 0x8d, 0x87, 0x5b,
0x54, 0x48, 0xb5, 0xd7, 0x13, 0x15, 0x67, 0xb2, 0x35, 0x50, 0x3a, 0x72, 0x6f, 0x41, 0xb3, 0x47,
0x23, 0xd2, 0xa3, 0x83, 0x47, 0xdb, 0x62, 0x68, 0xdf, 0x80, 0x4a, 0x1f, 0x0b, 0x72, 0x22, 0xd3,
0x6d, 0x31, 0xdc, 0xc0, 0x82, 0x20, 0x1d, 0xe9, 0xfe, 0x6e, 0xc1, 0xa5, 0x4d, 0x4e, 0x74, 0x7f,
0x84, 0x21, 0x19, 0x48, 0xca, 0x62, 0x44, 0x1e, 0x27, 0x44, 0xc8, 0xe7, 0xcf, 0x66, 0x5f, 0x82,
0x5a, 0xd0, 0xf7, 0x63, 0x1c, 0x65, 0xba, 0x55, 0x83, 0xfe, 0x0e, 0x8e, 0x88, 0xfd, 0x26, 0x2c,
0x0d, 0xf2, 0xfc, 0x0a, 0xd1, 0xcd, 0xdb, 0x40, 0x47, 0x50, 0xa5, 0x7a, 0xd0, 0xf7, 0xba, 0x4e,
0x45, 0x2b, 0xaa, 0x9f, 0x6d, 0x17, 0x5a, 0xd3, 0x28, 0xaf, 0xeb, 0x2c, 0xe8, 0xb9, 0x39, 0x4c,
0xe9, 0x23, 0x06, 0x23, 0x12, 0x61, 0xa7, 0xda, 0xb6, 0x3a, 0x2d, 0x94, 0x8e, 0xdc, 0x9f, 0x2c,
0xb8, 0xd0, 0xe5, 0x6c, 0xfc, 0x22, 0x93, 0x73, 0xbf, 0x2a, 0xc1, 0x45, 0x73, 0x46, 0xbb, 0x98,
0x4b, 0xfa, 0x0f, 0xb1, 0x78, 0x0b, 0xce, 0x4d, 0x77, 0x35, 0x01, 0xc5, 0x34, 0xde, 0x80, 0xa5,
0x71, 0x56, 0x87, 0x89, 0xab, 0xe8, 0xb8, 0xc5, 0x1c, 0x9d, 0x63, 0xbb, 0x70, 0x02, 0xdb, 0x6a,
0xc1, 0x51, 0xb6, 0xa1, 0x99, 0x27, 0xf2, 0xba, 0x4e, 0x4d, 0x87, 0xcc, 0x42, 0xee, 0x97, 0x25,
0x38, 0xaf, 0x0e, 0xf5, 0x3f, 0x35, 0x94, 0x1a, 0x3f, 0x96, 0xc0, 0x36, 0xdd, 0xe1, 0xc5, 0x01,
0x79, 0xfa, 0x6f, 0x6a, 0xf1, 0x2a, 0xc0, 0x3e, 0x25, 0x61, 0x30, 0xab, 0x43, 0x43, 0x23, 0x7f,
0x49, 0x03, 0x07, 0x6a, 0x3a, 0x49, 0xce, 0x3f, 0x1b, 0x2a, 0xab, 0x35, 0xaf, 0xb3, 0xd4, 0x6a,
0xeb, 0xa7, 0xb6, 0x5a, 0xbd, 0x2c, 0xb5, 0xda, 0x5f, 0x4b, 0xb0, 0xe8, 0xc5, 0x82, 0x70, 0xf9,
0x02, 0x34, 0xd2, 0x15, 0x68, 0x08, 0x32, 0x8c, 0xd4, 0x5b, 0x34, 0x53, 0x72, 0x0a, 0xa8, 0xd9,
0xc1, 0x08, 0xc7, 0x31, 0x09, 0x53, 0x2d, 0x1b, 0x68, 0x0a, 0xd8, 0xaf, 0x01, 0x48, 0x1a, 0x11,
0x21, 0x71, 0x34, 0x16, 0x4e, 0xad, 0x5d, 0xee, 0x54, 0xd0, 0x0c, 0xa2, 0x5c, 0x94, 0xb3, 0x03,
0xaf, 0x6b, 0x84, 0x2c, 0xa3, 0x74, 0x64, 0xbf, 0x0f, 0x75, 0xce, 0x0e, 0xfc, 0x00, 0x4b, 0xec,
0x34, 0xb4, 0xc4, 0x97, 0x0b, 0x25, 0xd9, 0x08, 0x59, 0x1f, 0xd5, 0x38, 0x3b, 0xe8, 0x62, 0x89,
0xdd, 0x6f, 0x2d, 0x58, 0xdc, 0x23, 0x98, 0x0f, 0x46, 0x67, 0x97, 0xf5, 0x6d, 0x58, 0xe6, 0x44,
0x24, 0xa1, 0xf4, 0xa7, 0xb4, 0x8c, 0xbe, 0xe7, 0x0c, 0xbe, 0x99, 0x93, 0x5b, 0x85, 0x85, 0xc7,
0x09, 0xe1, 0x13, 0x2d, 0xef, 0x89, 0x15, 0x9a, 0x38, 0xf7, 0xb7, 0x99, 0xfa, 0x54, 0x2a, 0x71,
0x86, 0xfa, 0x6e, 0x42, 0x55, 0x7d, 0xb0, 0x24, 0x42, 0x57, 0xd5, 0x5c, 0x7b, 0xa5, 0x70, 0xcd,
0x9e, 0x0e, 0x41, 0x69, 0x68, 0x21, 0xa9, 0x72, 0x31, 0xa9, 0xab, 0xd0, 0x8c, 0x88, 0xe4, 0x74,
0xe0, 0xcb, 0xc9, 0x38, 0xeb, 0x08, 0x30, 0x50, 0x6f, 0x32, 0xd6, 0x77, 0x6a, 0x44, 0xa5, 0x70,
0x16, 0xda, 0xe5, 0x4e, 0x0b, 0xe9, 0x67, 0xf7, 0x17, 0x0b, 0x16, 0xbb, 0x24, 0x24, 0x92, 0x9c,
0x5d, 0xf8, 0x82, 0xb6, 0x2d, 0x15, 0xb6, 0xed, 0x5c, 0xc7, 0x95, 0x4f, 0xee, 0xb8, 0xca, 0x33,
0x1d, 0x77, 0x0d, 0x5a, 0x63, 0x4e, 0x23, 0xcc, 0x27, 0xfe, 0x23, 0x32, 0x31, 0x34, 0x94, 0xbf,
0x19, 0xec, 0x3e, 0x99, 0x08, 0xf7, 0x7b, 0x0b, 0xea, 0x77, 0xc3, 0x44, 0x8c, 0xce, 0xf4, 0x81,
0x33, 0x7f, 0x5f, 0x4a, 0x47, 0xef, 0xcb, 0x51, 0xfb, 0x29, 0x17, 0xd8, 0x8f, 0x0b, 0xad, 0xfc,
0x0a, 0xf6, 0xf0, 0x30, 0x3d, 0x84, 0x39, 0xcc, 0xfd, 0xc3, 0x82, 0xc6, 0x16, 0xc3, 0x81, 0xb6,
0xe0, 0xbf, 0xbd, 0xca, 0x2b, 0x30, 0x75, 0xd1, 0x4c, 0xe3, 0xa9, 0xad, 0xce, 0xd8, 0x63, 0x65,
0xde, 0x1e, 0xaf, 0x42, 0x93, 0xaa, 0x82, 0xfc, 0x31, 0x96, 0x23, 0x23, 0x6e, 0x03, 0x81, 0x86,
0x76, 0x15, 0xa2, 0xfc, 0x33, 0x0b, 0xd0, 0xfe, 0x59, 0x3d, 0xb5, 0x7f, 0xa6, 0x49, 0xb4, 0x7f,
0x7e, 0x61, 0xa9, 0xef, 0xe2, 0x80, 0x3c, 0x55, 0x7d, 0xfe, 0x6c, 0x52, 0xeb, 0x2c, 0x49, 0xed,
0x1b, 0x70, 0x3e, 0x4e, 0x22, 0x9f, 0x93, 0x10, 0x4b, 0x12, 0xf8, 0xa9, 0x18, 0x22, 0x15, 0xc7,
0x8e, 0x93, 0x08, 0x99, 0xa9, 0xbd, 0x74, 0xc6, 0xfd, 0xda, 0x02, 0xb8, 0xab, 0x98, 0x9b, 0x32,
0x8e, 0x1e, 0xad, 0x75, 0xf2, 0x9b, 0xa5, 0x34, 0x2f, 0xdd, 0x46, 0x26, 0x9d, 0xba, 0xb3, 0xc2,
0x29, 0x17, 0x71, 0xc8, 0x7f, 0x6c, 0xa6, 0xe4, 0x53, 0x75, 0xf5, 0xb3, 0xfb, 0x8d, 0x05, 0xad,
0xb4, 0x3a, 0x53, 0xd2, 0xdc, 0x29, 0x5b, 0x47, 0x4f, 0x59, 0xdf, 0xf5, 0x88, 0xf1, 0x89, 0x2f,
0xe8, 0x21, 0x49, 0x0b, 0x02, 0x03, 0xed, 0xd1, 0x43, 0x62, 0x5f, 0x86, 0xba, 0x96, 0x84, 0x1d,
0x88, 0xb4, 0x51, 0x6b, 0x4a, 0x06, 0x76, 0x20, 0xec, 0x77, 0xe1, 0xff, 0x9c, 0x0c, 0x48, 0x2c,
0xc3, 0x89, 0x1f, 0xb1, 0x80, 0xee, 0x53, 0x12, 0xe8, 0x6e, 0xa8, 0xa3, 0xe5, 0x6c, 0x62, 0x3b,
0xc5, 0xdd, 0x9f, 0x2d, 0x58, 0xfa, 0x54, 0x59, 0xa0, 0xfa, 0x49, 0x32, 0x95, 0x3d, 0x7f, 0xc7,
0xde, 0xd6, 0x5c, 0x52, 0x79, 0xcc, 0x2f, 0xce, 0xeb, 0xc7, 0xfd, 0x89, 0xce, 0x68, 0x80, 0xea,
0x82, 0x0c, 0xcd, 0x9e, 0x1b, 0xd0, 0x34, 0x5f, 0x0b, 0xa7, 0x91, 0x78, 0x7a, 0xb0, 0xc8, 0x7c,
0x63, 0x18, 0x89, 0x03, 0x68, 0x6e, 0x8b, 0xe1, 0x2e, 0x13, 0xfa, 0x26, 0x2a, 0x3b, 0x49, 0xbd,
0xc7, 0x58, 0x96, 0xa5, 0xef, 0x4a, 0x33, 0xc5, 0xf4, 0x6d, 0x39, 0x0f, 0x0b, 0x91, 0x18, 0xe6,
0xaf, 0x11, 0x33, 0x50, 0x27, 0x93, 0xbb, 0x92, 0xd6, 0xb6, 0x82, 0xa6, 0xc0, 0x3b, 0x1f, 0x42,
0x23, 0xff, 0x9b, 0xb6, 0x97, 0xa1, 0xe5, 0xed, 0x78, 0x3d, 0x6f, 0x7d, 0xcb, 0x7b, 0xe8, 0xed,
0x7c, 0xbc, 0xfc, 0x3f, 0xbb, 0x09, 0xb5, 0x7b, 0x77, 0xd6, 0xb7, 0x7a, 0xf7, 0x3e, 0x5b, 0xb6,
0xec, 0x16, 0xd4, 0xd7, 0x37, 0x76, 0x3e, 0x41, 0xdb, 0xeb, 0x5b, 0xcb, 0xa5, 0x8d, 0x3b, 0x0f,
0x37, 0x87, 0x54, 0x8e, 0x92, 0xbe, 0x12, 0x71, 0xf5, 0x90, 0x86, 0x21, 0x3d, 0x94, 0x64, 0x30,
0x5a, 0x35, 0x2c, 0xdf, 0x0b, 0xa8, 0x90, 0x9c, 0xf6, 0x13, 0x49, 0x82, 0xd5, 0x8c, 0xeb, 0xaa,
0xa6, 0x9e, 0x0f, 0xc7, 0xfd, 0xb5, 0x7e, 0x55, 0x43, 0x37, 0xff, 0x0c, 0x00, 0x00, 0xff, 0xff,
0x6b, 0x80, 0x63, 0x47, 0x73, 0x10, 0x00, 0x00,
// 1275 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x57, 0xcd, 0x6f, 0x1b, 0x45,
0x14, 0x67, 0x6d, 0xc7, 0x1f, 0xcf, 0x4e, 0x1a, 0x96, 0x7e, 0x6c, 0xa1, 0x50, 0x77, 0x29, 0x60,
0x40, 0x24, 0x55, 0x8a, 0x10, 0xe2, 0xd2, 0x26, 0x71, 0x4b, 0x57, 0x4d, 0x42, 0x98, 0x98, 0x4a,
0xf4, 0xb2, 0x1a, 0x7b, 0x27, 0xf6, 0xd0, 0xdd, 0x1d, 0x77, 0x66, 0xdc, 0xd4, 0x39, 0x73, 0x43,
0x70, 0xe3, 0xc8, 0x05, 0xf1, 0x77, 0x80, 0xc4, 0x09, 0x89, 0x2b, 0xe2, 0xc4, 0x5f, 0xc2, 0x09,
0xcd, 0xc7, 0xfa, 0x23, 0xdd, 0x46, 0x69, 0x00, 0xa1, 0x4a, 0xdc, 0x76, 0x7e, 0xf3, 0xf6, 0xed,
0xfb, 0xfd, 0xde, 0x9b, 0xdf, 0xee, 0xc2, 0x12, 0x4d, 0x25, 0xe1, 0x29, 0x8e, 0x57, 0x86, 0x9c,
0x49, 0xe6, 0x9e, 0x4b, 0x68, 0xfc, 0x68, 0x24, 0xcc, 0x6a, 0x25, 0xdb, 0x7c, 0xb9, 0xd1, 0x63,
0x49, 0xc2, 0x52, 0x03, 0xfb, 0x3f, 0x3a, 0xb0, 0xb8, 0xc9, 0x92, 0x21, 0x4b, 0x49, 0x2a, 0x83,
0x74, 0x9f, 0xb9, 0xe7, 0xa1, 0x9c, 0xb2, 0x88, 0x04, 0x6d, 0xcf, 0x69, 0x3a, 0xad, 0x22, 0xb2,
0x2b, 0xd7, 0x85, 0x12, 0x67, 0x31, 0xf1, 0x0a, 0x4d, 0xa7, 0x55, 0x43, 0xfa, 0xda, 0xbd, 0x01,
0x20, 0x24, 0x96, 0x24, 0xec, 0xb1, 0x88, 0x78, 0xc5, 0xa6, 0xd3, 0x5a, 0x5a, 0x6b, 0xae, 0xe4,
0x3e, 0x77, 0x65, 0x4f, 0x05, 0x6e, 0xb2, 0x88, 0xa0, 0x9a, 0xc8, 0x2e, 0xdd, 0x9b, 0x00, 0xe4,
0xb1, 0xe4, 0x38, 0xa4, 0xe9, 0x3e, 0xf3, 0x4a, 0xcd, 0x62, 0xab, 0xbe, 0x76, 0x65, 0x3e, 0x81,
0x2d, 0xf7, 0x2e, 0x19, 0xdf, 0xc3, 0xf1, 0x88, 0xec, 0x62, 0xca, 0x51, 0x4d, 0xdf, 0xa4, 0xca,
0xf5, 0x7f, 0x70, 0xe0, 0xcc, 0x84, 0x80, 0x7e, 0x86, 0x70, 0x3f, 0x82, 0x05, 0xfd, 0x08, 0xcd,
0xa0, 0xbe, 0x76, 0xf5, 0x29, 0x15, 0xcd, 0xf1, 0x46, 0xe6, 0x16, 0xf7, 0x33, 0x78, 0x49, 0x8c,
0xba, 0xbd, 0x6c, 0x2b, 0xd4, 0xa8, 0xf0, 0x0a, 0xba, 0xb4, 0x93, 0x65, 0x72, 0x67, 0x13, 0x98,
0x92, 0xfc, 0x7b, 0x50, 0xdd, 0x51, 0x3a, 0x2a, 0x85, 0x3f, 0x80, 0x0a, 0x8e, 0x22, 0x4e, 0x84,
0xb0, 0x05, 0x5e, 0xca, 0x65, 0xbc, 0x6e, 0x62, 0x50, 0x16, 0x9c, 0xd7, 0x01, 0xff, 0x0b, 0x80,
0x20, 0xa5, 0x72, 0x17, 0x73, 0x9c, 0x88, 0xa7, 0xf6, 0xae, 0x0d, 0x0d, 0x21, 0x31, 0x97, 0xe1,
0x50, 0xc7, 0x59, 0x36, 0x27, 0x10, 0xba, 0xae, 0x6f, 0x33, 0xd9, 0xfd, 0xab, 0x00, 0x7b, 0x92,
0xd3, 0xb4, 0xbf, 0x45, 0x85, 0x54, 0xcf, 0x7a, 0xa4, 0xe2, 0x4c, 0xb6, 0x1a, 0xb2, 0x2b, 0xff,
0x06, 0xd4, 0x3b, 0x34, 0x21, 0x1d, 0xda, 0x7b, 0xb0, 0x2d, 0xfa, 0xee, 0x35, 0x28, 0x75, 0xb1,
0x20, 0xc7, 0x32, 0xdd, 0x16, 0xfd, 0x0d, 0x2c, 0x08, 0xd2, 0x91, 0xfe, 0x1f, 0x0e, 0x5c, 0xd8,
0xe4, 0x44, 0x8f, 0x48, 0x1c, 0x93, 0x9e, 0xa4, 0x2c, 0x45, 0xe4, 0xe1, 0x88, 0x08, 0xf9, 0xec,
0xd9, 0xdc, 0x0b, 0x50, 0x89, 0xba, 0x61, 0x8a, 0x93, 0x4c, 0xb7, 0x72, 0xd4, 0xdd, 0xc1, 0x09,
0x71, 0xdf, 0x84, 0xa5, 0xde, 0x24, 0xbf, 0x42, 0xf4, 0xfc, 0xd6, 0xd0, 0x11, 0x54, 0xa9, 0x1e,
0x75, 0x83, 0xb6, 0x57, 0xd2, 0x8a, 0xea, 0x6b, 0xd7, 0x87, 0xc6, 0x34, 0x2a, 0x68, 0x7b, 0x0b,
0x7a, 0x6f, 0x0e, 0x53, 0xfa, 0x88, 0xde, 0x80, 0x24, 0xd8, 0x2b, 0x37, 0x9d, 0x56, 0x03, 0xd9,
0x95, 0xff, 0xb3, 0x03, 0xe7, 0xda, 0x9c, 0x0d, 0x9f, 0x67, 0x72, 0xfe, 0xd7, 0x05, 0x38, 0x6f,
0x7a, 0xb4, 0x8b, 0xb9, 0xa4, 0xff, 0x12, 0x8b, 0xb7, 0xe0, 0xcc, 0xf4, 0xa9, 0x26, 0x20, 0x9f,
0xc6, 0x1b, 0xb0, 0x34, 0xcc, 0xea, 0x30, 0x71, 0x25, 0x1d, 0xb7, 0x38, 0x41, 0xe7, 0xd8, 0x2e,
0x1c, 0xc3, 0xb6, 0x9c, 0xd3, 0xca, 0x26, 0xd4, 0x27, 0x89, 0x82, 0xb6, 0x57, 0xd1, 0x21, 0xb3,
0x90, 0xff, 0x55, 0x01, 0xce, 0xaa, 0xa6, 0xfe, 0xaf, 0x86, 0x52, 0xe3, 0xa7, 0x02, 0xb8, 0x66,
0x3a, 0x82, 0x34, 0x22, 0x8f, 0xff, 0x4b, 0x2d, 0x5e, 0x05, 0xd8, 0xa7, 0x24, 0x8e, 0x66, 0x75,
0xa8, 0x69, 0xe4, 0x6f, 0x69, 0xe0, 0x41, 0x45, 0x27, 0x99, 0xf0, 0xcf, 0x96, 0xca, 0x6a, 0xcd,
0x1b, 0xcd, 0x5a, 0x6d, 0xf5, 0xc4, 0x56, 0xab, 0x6f, 0xb3, 0x56, 0xfb, 0x5b, 0x01, 0x16, 0x83,
0x54, 0x10, 0x2e, 0x9f, 0x83, 0x41, 0xba, 0x04, 0x35, 0x41, 0xfa, 0x89, 0x7a, 0xfd, 0x65, 0x4a,
0x4e, 0x01, 0xb5, 0xdb, 0x1b, 0xe0, 0x34, 0x25, 0xb1, 0xd5, 0xb2, 0x86, 0xa6, 0x80, 0xfb, 0x1a,
0x80, 0xa4, 0x09, 0x11, 0x12, 0x27, 0x43, 0xe1, 0x55, 0x9a, 0xc5, 0x56, 0x09, 0xcd, 0x20, 0xca,
0x45, 0x39, 0x3b, 0x08, 0xda, 0x46, 0xc8, 0x22, 0xb2, 0x2b, 0xf7, 0x7d, 0xa8, 0x72, 0x76, 0x10,
0x46, 0x58, 0x62, 0xaf, 0xa6, 0x25, 0xbe, 0x98, 0x2b, 0xc9, 0x46, 0xcc, 0xba, 0xa8, 0xc2, 0xd9,
0x41, 0x1b, 0x4b, 0xec, 0x7f, 0xe7, 0xc0, 0xe2, 0x1e, 0xc1, 0xbc, 0x37, 0x38, 0xbd, 0xac, 0x6f,
0xc3, 0x32, 0x27, 0x62, 0x14, 0xcb, 0x70, 0x4a, 0xcb, 0xe8, 0x7b, 0xc6, 0xe0, 0x9b, 0x13, 0x72,
0xab, 0xb0, 0xf0, 0x70, 0x44, 0xf8, 0x58, 0xcb, 0x7b, 0x6c, 0x85, 0x26, 0xce, 0xff, 0x7d, 0xa6,
0x3e, 0x95, 0x4a, 0x9c, 0xa2, 0xbe, 0xeb, 0x50, 0x56, 0xdf, 0x2c, 0x23, 0xa1, 0xab, 0xaa, 0xaf,
0xbd, 0x92, 0x7b, 0xcf, 0x9e, 0x0e, 0x41, 0x36, 0x34, 0x97, 0x54, 0x31, 0x9f, 0xd4, 0x65, 0xa8,
0x27, 0x44, 0x72, 0xda, 0x0b, 0xe5, 0x78, 0x98, 0x4d, 0x04, 0x18, 0xa8, 0x33, 0x1e, 0xea, 0x33,
0x35, 0xa0, 0x52, 0x78, 0x0b, 0xcd, 0x62, 0xab, 0x81, 0xf4, 0xb5, 0xff, 0xab, 0x03, 0x8b, 0x6d,
0x12, 0x13, 0x49, 0x4e, 0x2f, 0x7c, 0xce, 0xd8, 0x16, 0x72, 0xc7, 0x76, 0x6e, 0xe2, 0x8a, 0xc7,
0x4f, 0x5c, 0xe9, 0x89, 0x89, 0xbb, 0x02, 0x8d, 0x21, 0xa7, 0x09, 0xe6, 0xe3, 0xf0, 0x01, 0x19,
0x1b, 0x1a, 0xca, 0xdf, 0x0c, 0x76, 0x97, 0x8c, 0x85, 0xff, 0xbd, 0x03, 0xd5, 0xdb, 0xf1, 0x48,
0x0c, 0x4e, 0xf5, 0x81, 0x33, 0x7f, 0x5e, 0x0a, 0x47, 0xcf, 0xcb, 0x51, 0xfb, 0x29, 0xe6, 0xd8,
0x8f, 0x0f, 0x8d, 0xc9, 0x11, 0xec, 0xe0, 0xbe, 0x6d, 0xc2, 0x1c, 0xe6, 0xff, 0xe9, 0x40, 0x6d,
0x8b, 0xe1, 0x48, 0x5b, 0xf0, 0x3f, 0x5e, 0xe5, 0x25, 0x98, 0xba, 0x68, 0xa6, 0xf1, 0xd4, 0x56,
0x67, 0xec, 0xb1, 0x34, 0x6f, 0x8f, 0x97, 0xa1, 0x4e, 0x55, 0x41, 0xe1, 0x10, 0xcb, 0x81, 0x11,
0xb7, 0x86, 0x40, 0x43, 0xbb, 0x0a, 0x51, 0xfe, 0x99, 0x05, 0x68, 0xff, 0x2c, 0x9f, 0xd8, 0x3f,
0x6d, 0x12, 0xed, 0x9f, 0x5f, 0x3a, 0xea, 0xbb, 0x38, 0x22, 0x8f, 0xd5, 0x9c, 0x3f, 0x99, 0xd4,
0x39, 0x4d, 0x52, 0xf7, 0x1a, 0x9c, 0x4d, 0x47, 0x49, 0xc8, 0x49, 0x8c, 0x25, 0x89, 0x42, 0x2b,
0x86, 0xb0, 0xe2, 0xb8, 0xe9, 0x28, 0x41, 0x66, 0x6b, 0xcf, 0xee, 0xf8, 0xdf, 0x38, 0x00, 0xb7,
0x15, 0x73, 0x53, 0xc6, 0xd1, 0xd6, 0x3a, 0xc7, 0xbf, 0x59, 0x0a, 0xf3, 0xd2, 0x6d, 0x64, 0xd2,
0xa9, 0x33, 0x2b, 0xbc, 0x62, 0x1e, 0x87, 0xc9, 0x1f, 0xc9, 0x94, 0xbc, 0x55, 0x57, 0x5f, 0xfb,
0xdf, 0x3a, 0xd0, 0xb0, 0xd5, 0x99, 0x92, 0xe6, 0xba, 0xec, 0x1c, 0xed, 0xb2, 0x3e, 0xeb, 0x09,
0xe3, 0xe3, 0x50, 0xd0, 0x43, 0x62, 0x0b, 0x02, 0x03, 0xed, 0xd1, 0x43, 0xe2, 0x5e, 0x84, 0xaa,
0x96, 0x84, 0x1d, 0x08, 0x3b, 0xa8, 0x15, 0x25, 0x03, 0x3b, 0x10, 0xee, 0xbb, 0xf0, 0x22, 0x27,
0x3d, 0x92, 0xca, 0x78, 0x1c, 0x26, 0x2c, 0xa2, 0xfb, 0x94, 0x44, 0x7a, 0x1a, 0xaa, 0x68, 0x39,
0xdb, 0xd8, 0xb6, 0xb8, 0xff, 0x8b, 0x03, 0x4b, 0x9f, 0x2a, 0x0b, 0x54, 0x3f, 0x49, 0xa6, 0xb2,
0x67, 0x9f, 0xd8, 0x9b, 0x9a, 0x8b, 0x95, 0xc7, 0xfc, 0xe2, 0xbc, 0xfe, 0xb4, 0x9f, 0xd1, 0x19,
0x0d, 0x50, 0x55, 0x90, 0xbe, 0x79, 0xe6, 0x06, 0xd4, 0xcd, 0xd7, 0xc2, 0x49, 0x24, 0x9e, 0x36,
0x16, 0x99, 0x6f, 0x0c, 0x23, 0x71, 0x04, 0xf5, 0x6d, 0xd1, 0xdf, 0x65, 0x42, 0x9f, 0x44, 0x65,
0x27, 0xd6, 0x7b, 0x8c, 0x65, 0x39, 0xfa, 0xac, 0xd4, 0x2d, 0xa6, 0x4f, 0xcb, 0x59, 0x58, 0x48,
0x44, 0x7f, 0xf2, 0x1a, 0x31, 0x0b, 0xd5, 0x99, 0x89, 0x2b, 0x69, 0x6d, 0x4b, 0x68, 0x0a, 0xbc,
0xf3, 0x21, 0xd4, 0x26, 0x3f, 0xd4, 0xee, 0x32, 0x34, 0x82, 0x9d, 0xa0, 0x13, 0xac, 0x6f, 0x05,
0xf7, 0x83, 0x9d, 0x8f, 0x97, 0x5f, 0x70, 0xeb, 0x50, 0xb9, 0x73, 0x6b, 0x7d, 0xab, 0x73, 0xe7,
0xf3, 0x65, 0xc7, 0x6d, 0x40, 0x75, 0x7d, 0x63, 0xe7, 0x13, 0xb4, 0xbd, 0xbe, 0xb5, 0x5c, 0xd8,
0xb8, 0x75, 0x7f, 0xb3, 0x4f, 0xe5, 0x60, 0xd4, 0x55, 0x22, 0xae, 0x1e, 0xd2, 0x38, 0xa6, 0x87,
0x92, 0xf4, 0x06, 0xab, 0x86, 0xe5, 0x7b, 0x11, 0x15, 0x92, 0xd3, 0xee, 0x48, 0x92, 0x68, 0x35,
0xe3, 0xba, 0xaa, 0xa9, 0x4f, 0x96, 0xc3, 0xee, 0x5a, 0xb7, 0xac, 0xa1, 0xeb, 0x7f, 0x05, 0x00,
0x00, 0xff, 0xff, 0xa4, 0x3b, 0x4f, 0xdd, 0x76, 0x10, 0x00, 0x00,
}

View File

@ -74,7 +74,15 @@ type RocksMQ struct {
channels map[string]*Channel
cgCtxs map[string]ConsumerGroupContext
idAllocator master.IDAllocator
mu sync.Mutex
produceMu sync.Mutex
consumeMu sync.Mutex
//ctx context.Context
//serverLoopWg sync.WaitGroup
//serverLoopCtx context.Context
//serverLoopCancel func()
//// tso ticker
//tsoTicker *time.Ticker
}
func NewRocksMQ(name string, idAllocator master.IDAllocator) (*RocksMQ, error) {
@ -97,6 +105,7 @@ func NewRocksMQ(name string, idAllocator master.IDAllocator) (*RocksMQ, error) {
kv: mkv,
idAllocator: idAllocator,
}
rmq.channels = make(map[string]*Channel)
return rmq, nil
}
@ -174,6 +183,8 @@ func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) e
}
func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error {
rmq.produceMu.Lock()
defer rmq.produceMu.Unlock()
msgLen := len(messages)
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
@ -222,6 +233,8 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro
}
func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) {
rmq.consumeMu.Lock()
defer rmq.consumeMu.Unlock()
metaKey := groupName + "/" + channelName + "/current_id"
currentID, err := rmq.kv.Load(metaKey)
if err != nil {

View File

@ -3,6 +3,7 @@ package rocksmq
import (
"os"
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@ -36,6 +37,10 @@ func TestRocksMQ(t *testing.T) {
assert.Nil(t, err)
channelName := "channel_a"
err = rmq.CreateChannel(channelName)
assert.Nil(t, err)
defer rmq.DestroyChannel(channelName)
msgA := "a_message"
pMsgs := make([]ProducerMessage, 1)
pMsgA := ProducerMessage{payload: []byte(msgA)}
@ -54,7 +59,7 @@ func TestRocksMQ(t *testing.T) {
err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
groupName := "query_node"
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(groupName, channelName)
err = rmq.CreateConsumerGroup(groupName, channelName)
assert.Nil(t, err)
@ -89,6 +94,10 @@ func TestRocksMQ_Loop(t *testing.T) {
loopNum := 100
channelName := "channel_test"
err = rmq.CreateChannel(channelName)
assert.Nil(t, err)
defer rmq.DestroyChannel(channelName)
// Produce one message once
for i := 0; i < loopNum; i++ {
msg := "message_" + strconv.Itoa(i)
@ -133,57 +142,62 @@ func TestRocksMQ_Loop(t *testing.T) {
assert.Equal(t, len(cMsgs), 0)
}
//func TestRocksMQ_Goroutines(t *testing.T) {
// master.Init()
//
// etcdAddr := master.Params.EtcdAddress
// cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
// assert.Nil(t, err)
// etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
// defer etcdKV.Close()
// idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV)
// _ = idAllocator.Initialize()
//
// name := "/tmp/rocksmq"
// defer os.RemoveAll(name)
// rmq, err := NewRocksMQ(name, idAllocator)
// assert.Nil(t, err)
//
// loopNum := 100
// channelName := "channel_test"
// // Produce two message in each goroutine
// var wg sync.WaitGroup
// wg.Add(1)
// for i := 0; i < loopNum/2; i++ {
// go func() {
// wg.Add(2)
// msg_0 := "message_" + strconv.Itoa(i)
// msg_1 := "message_" + strconv.Itoa(i+1)
// pMsg_0 := ProducerMessage{payload: []byte(msg_0)}
// pMsg_1 := ProducerMessage{payload: []byte(msg_1)}
// pMsgs := make([]ProducerMessage, 2)
// pMsgs[0] = pMsg_0
// pMsgs[1] = pMsg_1
//
// err := rmq.Produce(channelName, pMsgs)
// assert.Nil(t, err)
// }()
// }
//
// groupName := "test_group"
// _ = rmq.DestroyConsumerGroup(groupName, channelName)
// err = rmq.CreateConsumerGroup(groupName, channelName)
// assert.Nil(t, err)
// // Consume one message in each goroutine
// for i := 0; i < loopNum; i++ {
// go func() {
// wg.Done()
// cMsgs, err := rmq.Consume(groupName, channelName, 1)
// fmt.Println(string(cMsgs[0].payload))
// assert.Nil(t, err)
// assert.Equal(t, len(cMsgs), 1)
// }()
// }
// wg.Done()
// wg.Wait()
//}
func TestRocksMQ_Goroutines(t *testing.T) {
master.Init()
etcdAddr := master.Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_2"
defer os.RemoveAll(name)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
loopNum := 100
channelName := "channel_test"
err = rmq.CreateChannel(channelName)
assert.Nil(t, err)
defer rmq.DestroyChannel(channelName)
// Produce two message in each goroutine
msgChan := make(chan string, loopNum)
var wg sync.WaitGroup
for i := 0; i < loopNum; i += 2 {
go func(i int, group *sync.WaitGroup, mq *RocksMQ) {
group.Add(2)
msg0 := "message_" + strconv.Itoa(i)
msg1 := "message_" + strconv.Itoa(i+1)
pMsg0 := ProducerMessage{payload: []byte(msg0)}
pMsg1 := ProducerMessage{payload: []byte(msg1)}
pMsgs := make([]ProducerMessage, 2)
pMsgs[0] = pMsg0
pMsgs[1] = pMsg1
err := mq.Produce(channelName, pMsgs)
assert.Nil(t, err)
msgChan <- msg0
msgChan <- msg1
}(i, &wg, rmq)
}
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(groupName, channelName)
err = rmq.CreateConsumerGroup(groupName, channelName)
assert.Nil(t, err)
// Consume one message in each goroutine
for i := 0; i < loopNum; i++ {
go func(group *sync.WaitGroup, mq *RocksMQ) {
defer group.Done()
<-msgChan
cMsgs, err := mq.Consume(groupName, channelName, 1)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 1)
}(&wg, rmq)
}
wg.Wait()
}