Add building datanode cmd in Makefile

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-01-26 17:40:26 +08:00 committed by yefu.chen
parent 9f72633fd9
commit 0b1c4f0420
4 changed files with 34 additions and 154 deletions

View File

@ -139,6 +139,8 @@ build-go: build-cpp
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexservice $(PWD)/cmd/distributed/indexservice/main.go 1>/dev/null
@echo "Building distributed indexnode ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexnode $(PWD)/cmd/distributed/indexnode/main.go 1>/dev/null
@echo "Building data node ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/datanode $(PWD)/cmd/datanode/main.go 1>/dev/null
@echo "Building dataservice ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/dataservice $(PWD)/cmd/dataservice/main.go 1>/dev/null

View File

@ -1,89 +0,0 @@
package dataservice
import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type ddHandler struct {
meta *meta
segmentAllocator segmentAllocator
}
func newDDHandler(meta *meta, allocator segmentAllocator) *ddHandler {
return &ddHandler{
meta: meta,
segmentAllocator: allocator,
}
}
func (handler *ddHandler) HandleDDMsg(msg msgstream.TsMsg) error {
switch msg.Type() {
case commonpb.MsgType_kCreateCollection:
realMsg := msg.(*msgstream.CreateCollectionMsg)
return handler.handleCreateCollection(realMsg)
case commonpb.MsgType_kDropCollection:
realMsg := msg.(*msgstream.DropCollectionMsg)
return handler.handleDropCollection(realMsg)
case commonpb.MsgType_kCreatePartition:
realMsg := msg.(*msgstream.CreatePartitionMsg)
return handler.handleCreatePartition(realMsg)
case commonpb.MsgType_kDropPartition:
realMsg := msg.(*msgstream.DropPartitionMsg)
return handler.handleDropPartition(realMsg)
default:
return fmt.Errorf("unknown msg type: %v", msg.Type())
}
}
func (handler *ddHandler) handleCreateCollection(msg *msgstream.CreateCollectionMsg) error {
schema := &schemapb.CollectionSchema{}
if err := proto.Unmarshal(msg.Schema, schema); err != nil {
return err
}
err := handler.meta.AddCollection(&collectionInfo{
ID: msg.CollectionID,
Schema: schema,
})
if err != nil {
return err
}
return nil
}
func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg) error {
ids := handler.meta.GetSegmentsByCollectionID(msg.CollectionID)
for _, id := range ids {
if err := handler.meta.DropSegment(id); err != nil {
continue
}
handler.segmentAllocator.DropSegment(id)
}
if err := handler.meta.DropCollection(msg.CollectionID); err != nil {
return err
}
return nil
}
func (handler *ddHandler) handleDropPartition(msg *msgstream.DropPartitionMsg) error {
ids := handler.meta.GetSegmentsByCollectionAndPartitionID(msg.CollectionID, msg.PartitionID)
for _, id := range ids {
if err := handler.meta.DropSegment(id); err != nil {
return err
}
handler.segmentAllocator.DropSegment(id)
}
if err := handler.meta.DropPartition(msg.CollectionID, msg.PartitionID); err != nil {
return err
}
return nil
}
func (handler *ddHandler) handleCreatePartition(msg *msgstream.CreatePartitionMsg) error {
return handler.meta.AddPartition(msg.CollectionID, msg.PartitionID)
}

View File

@ -65,26 +65,26 @@ type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
Server struct {
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state atomic.Value
client *etcdkv.EtcdKV
meta *meta
segAllocator segmentAllocator
statsHandler *statsHandler
ddHandler *ddHandler
insertChannelMgr *insertChannelManager
allocator allocator
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer
registerFinishCh chan struct{}
masterClient MasterClient
ttMsgStream msgstream.MsgStream
k2sMsgStream msgstream.MsgStream
ddChannelName string
segmentInfoStream msgstream.MsgStream
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state atomic.Value
client *etcdkv.EtcdKV
meta *meta
segAllocator segmentAllocator
statsHandler *statsHandler
insertChannelMgr *insertChannelManager
allocator allocator
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer
registerFinishCh chan struct{}
masterClient MasterClient
ttMsgStream msgstream.MsgStream
k2sMsgStream msgstream.MsgStream
ddChannelName string
segmentInfoStream msgstream.MsgStream
segmentFlushStream msgstream.MsgStream
}
)
@ -97,6 +97,7 @@ func CreateServer(ctx context.Context) (*Server, error) {
registerFinishCh: ch,
cluster: newDataNodeCluster(ch),
}
s.state.Store(internalpb2.StateCode_INITIALIZING)
return s, nil
}
@ -105,7 +106,6 @@ func (s *Server) SetMasterClient(masterClient MasterClient) {
}
func (s *Server) Init() error {
s.state.Store(internalpb2.StateCode_INITIALIZING)
return nil
}
@ -120,7 +120,6 @@ func (s *Server) Start() error {
if err != nil {
return err
}
s.ddHandler = newDDHandler(s.meta, s.segAllocator)
s.initSegmentInfoChannel()
if err = s.initMsgProducer(); err != nil {
return err
@ -188,13 +187,6 @@ func (s *Server) loadMetaFromMaster() error {
if err := s.checkMasterIsHealthy(); err != nil {
return err
}
if s.ddChannelName == "" {
channel, err := s.masterClient.GetDdChannel()
if err != nil {
return err
}
s.ddChannelName = channel
}
collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
@ -282,10 +274,9 @@ func (s *Server) checkMasterIsHealthy() error {
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(3)
s.serverLoopWg.Add(2)
go s.startStatsChannel(s.serverLoopCtx)
go s.startSegmentFlushChannel(s.serverLoopCtx)
go s.startDDChannel(s.serverLoopCtx)
}
func (s *Server) startStatsChannel(ctx context.Context) {
@ -349,30 +340,6 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
}
}
func (s *Server) startDDChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
ddStream.SetPulsarClient(Params.PulsarAddress)
ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
ddStream.Start()
defer ddStream.Close()
for {
select {
case <-ctx.Done():
log.Println("dd channel shut down")
return
default:
}
msgPack := ddStream.Consume()
for _, msg := range msgPack.Msgs {
if err := s.ddHandler.HandleDDMsg(msg); err != nil {
log.Println(err.Error())
continue
}
}
}
}
func (s *Server) waitDataNodeRegister() {
log.Println("waiting data node to register")
<-s.registerFinishCh
@ -545,8 +512,8 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
Timestamp: 0, // todo
SourceID: 0,
},
Segment: segmentInfo,
},

View File

@ -35,14 +35,6 @@ func NewGrpcService(ctx context.Context) *Service {
log.Fatalf("create server error: %s", err.Error())
return nil
}
return s
}
func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) {
s.server.SetMasterClient(masterClient)
}
func (s *Service) Init() error {
s.grpcServer = grpc.NewServer()
datapb.RegisterDataServiceServer(s.grpcServer, s)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port))
@ -54,6 +46,14 @@ func (s *Service) Init() error {
log.Fatal(err.Error())
return nil
}
return s
}
func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) {
s.server.SetMasterClient(masterClient)
}
func (s *Service) Init() error {
return s.server.Init()
}