From db9957d48b89dee010d3c7f4f083b057229ae5df Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Sat, 7 Nov 2020 18:03:52 +0800 Subject: [PATCH] Add unittest for master Signed-off-by: zhenshan.cao --- cmd/mock/master/main.go | 53 +++++++ internal/master/master.go | 13 +- internal/master/mock/grpc_service.go | 112 ++++++++++++++ internal/master/mock/master.go | 219 +++++++++++++++++++++++++++ internal/proxy/mock/master_grpc.go | 22 --- internal/proxy/mock/master_tso.go | 1 - 6 files changed, 388 insertions(+), 32 deletions(-) create mode 100644 cmd/mock/master/main.go create mode 100644 internal/master/mock/grpc_service.go create mode 100644 internal/master/mock/master.go delete mode 100644 internal/proxy/mock/master_grpc.go diff --git a/cmd/mock/master/main.go b/cmd/mock/master/main.go new file mode 100644 index 0000000000..c4966fede6 --- /dev/null +++ b/cmd/mock/master/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + + "github.com/zilliztech/milvus-distributed/internal/master/mock" + "go.uber.org/zap" +) + +func main() { + // Creates server. + ctx, cancel := context.WithCancel(context.Background()) + svr, err := mockmaster.CreateServer(ctx) + if err != nil { + log.Print("create server failed", zap.Error(err)) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + <-ctx.Done() + log.Print("Got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + os.Exit(code) +} diff --git a/internal/master/master.go b/internal/master/master.go index 8ccb934df3..506dd1b8f7 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -182,6 +182,9 @@ func (s *Master) startServerLoop(ctx context.Context) { } func (s *Master) stopServerLoop() { + if s.grpcServer != nil{ + s.grpcServer.GracefulStop() + } s.serverLoopCancel() s.serverLoopWg.Wait() } @@ -203,17 +206,9 @@ func (s *Master) grpcLoop() { } if err := s.grpcServer.Serve(lis); err != nil { + panic("grpcServer Start Failed!!") } - ctx, cancel := context.WithCancel(s.serverLoopCtx) - defer cancel() - for { - select { - case <-ctx.Done(): - log.Print("server is closed, exit etcd leader loop") - return - } - } } // todo use messagestream diff --git a/internal/master/mock/grpc_service.go b/internal/master/mock/grpc_service.go new file mode 100644 index 0000000000..d94c3f90e8 --- /dev/null +++ b/internal/master/mock/grpc_service.go @@ -0,0 +1,112 @@ +package mockmaster + +import ( + "context" + + "github.com/zilliztech/milvus-distributed/internal/master/id" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" +) + +func (s *Master) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil +} + +func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollectionRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil +} + +func (s *Master) HasCollection(ctx context.Context, in *internalpb.HasCollectionRequest) (*servicepb.BoolResponse, error) { + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Value: false, + }, nil +} + +func (s *Master) DescribeCollection(ctx context.Context, in *internalpb.DescribeCollectionRequest) (*servicepb.CollectionDescription, error) { + return &servicepb.CollectionDescription{ + }, nil +} + +func (s *Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollectionRequest) (*servicepb.StringListResponse, error) { + return &servicepb.StringListResponse{ + }, nil +} + +////////////////////////////////////////////////////////////////////////// +func (s *Master) CreatePartition(ctx context.Context, in *internalpb.CreatePartitionRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil +} + +func (s *Master) DropPartition(ctx context.Context, in *internalpb.DropPartitionRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil +} + +func (s *Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRequest) (*servicepb.BoolResponse, error) { + + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + }, nil +} + +func (s *Master) DescribePartition(ctx context.Context, in *internalpb.DescribePartitionRequest) (*servicepb.PartitionDescription, error) { + return &servicepb.PartitionDescription{}, nil +} + +func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) { + return &servicepb.StringListResponse{}, nil +} + +//----------------------------------------Internal GRPC Service-------------------------------- + +func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequest) (*internalpb.TsoResponse, error) { + count := request.GetCount() + ts, err := s.tsoAllocator.GenerateTSO(count) + + if err != nil { + return &internalpb.TsoResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, + }, err + } + + response := &internalpb.TsoResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, + Timestamp: ts, + Count: count, + } + + return response, nil +} + +func (s *Master) AllocId(ctx context.Context, request *internalpb.IdRequest) (*internalpb.IdResponse, error) { + panic("implement me") + count := request.GetCount() + ts, err := id.AllocOne() + + if err != nil { + return &internalpb.IdResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, + }, err + } + + response := &internalpb.IdResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, + Id: ts, + Count: count, + } + + return response, nil +} \ No newline at end of file diff --git a/internal/master/mock/master.go b/internal/master/mock/master.go new file mode 100644 index 0000000000..5ca4e189c9 --- /dev/null +++ b/internal/master/mock/master.go @@ -0,0 +1,219 @@ +package mockmaster + +import ( + "context" + "fmt" + "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/kv/mockkv" + "github.com/zilliztech/milvus-distributed/internal/master/id" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "google.golang.org/grpc" + "log" + "math/rand" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/zilliztech/milvus-distributed/internal/master/tso" +) + +const ( + MOCK_GRPC_PORT=":0" +) + +var GrpcServerAddr net.Addr + +// Server is the pd server. +type Master struct { + isServing int64 + + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel func() + serverLoopWg sync.WaitGroup + + //grpc server + grpcServer *grpc.Server + + // for tso. + tsoAllocator tso.Allocator + + kvBase kv.KVBase + + // Add callback functions at different stages + startCallbacks []func() + closeCallbacks []func() + + grpcAddr net.Addr +} + +// CreateServer creates the UNINITIALIZED pd server with given configuration. +func CreateServer(ctx context.Context) (*Master, error) { + rand.Seed(time.Now().UnixNano()) + id.InitGlobalIdAllocator("idTimestamp", mockkv.NewEtcdKV()) + + m := &Master{ + ctx: ctx, + kvBase: mockkv.NewEtcdKV(), + tsoAllocator: tso.NewGlobalTSOAllocator("timestamp", mockkv.NewEtcdKV()), + } + + m.grpcServer = grpc.NewServer() + masterpb.RegisterMasterServer(m.grpcServer, m) + return m, nil +} + +// AddStartCallback adds a callback in the startServer phase. +func (s *Master) AddStartCallback(callbacks ...func()) { + s.startCallbacks = append(s.startCallbacks, callbacks...) +} + +// for unittest, get the grpc server addr +func (s *Master) GetGRPCAddr() net.Addr{ + return s.grpcAddr +} + +func (s *Master) startServer(ctx context.Context) error { + + // Run callbacks + for _, cb := range s.startCallbacks { + cb() + } + + // Server has started. + atomic.StoreInt64(&s.isServing, 1) + return nil +} + +// AddCloseCallback adds a callback in the Close phase. +func (s *Master) AddCloseCallback(callbacks ...func()) { + s.closeCallbacks = append(s.closeCallbacks, callbacks...) +} + +// Close closes the server. +func (s *Master) Close() { + if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) { + // server is already closed + return + } + + log.Print("closing server") + + s.stopServerLoop() + + if s.kvBase != nil { + s.kvBase.Close() + } + + // Run callbacks + for _, cb := range s.closeCallbacks { + cb() + } + + log.Print("close server") +} + +// IsClosed checks whether server is closed or not. +func (s *Master) IsClosed() bool { + return atomic.LoadInt64(&s.isServing) == 0 +} + +// Run runs the pd server. +func (s *Master) Run() error { + + if err := s.startServer(s.ctx); err != nil { + return err + } + + s.startServerLoop(s.ctx) + + return nil +} + +// Context returns the context of server. +func (s *Master) Context() context.Context { + return s.ctx +} + +// LoopContext returns the loop context of server. +func (s *Master) LoopContext() context.Context { + return s.serverLoopCtx +} + +func (s *Master) startServerLoop(ctx context.Context) { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx) + s.serverLoopWg.Add(3) + go s.grpcLoop() + go s.pulsarLoop() + go s.segmentStatisticsLoop() +} + +func (s *Master) stopServerLoop() { + if s.grpcServer != nil{ + s.grpcServer.GracefulStop() + } + s.serverLoopCancel() + s.serverLoopWg.Wait() +} + + +func (s *Master) grpcLoop() { + defer s.serverLoopWg.Done() + lis, err := net.Listen("tcp", MOCK_GRPC_PORT) + if err != nil { + log.Printf("failed to listen: %v", err) + return + } + + s.grpcAddr = lis.Addr() + + fmt.Printf("Start MockMaster grpc server , addr:%v\n", s.grpcAddr) + + if err := s.grpcServer.Serve(lis); err != nil { + panic("grpcServer Startup Failed!") + } +} + +// todo use messagestream +func (s *Master) pulsarLoop() { + defer s.serverLoopWg.Done() + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + for { + select { + case <-ctx.Done(): + log.Print("server is closed, exit pulsar loop") + return + } + } +} + +func (s *Master) tasksExecutionLoop() { + defer s.serverLoopWg.Done() + ctx, _ := context.WithCancel(s.serverLoopCtx) + + for { + select { + case <-ctx.Done(): + log.Print("server is closed, exit task execution loop") + return + } + } +} + +func (s *Master) segmentStatisticsLoop() { + defer s.serverLoopWg.Done() + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + + for { + select { + case <-ctx.Done(): + log.Print("server is closed, exit segmentStatistics loop") + return + } + } +} diff --git a/internal/proxy/mock/master_grpc.go b/internal/proxy/mock/master_grpc.go deleted file mode 100644 index 977f8a0b25..0000000000 --- a/internal/proxy/mock/master_grpc.go +++ /dev/null @@ -1,22 +0,0 @@ -package mock - -import ( - "context" - - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - - mpb "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" -) - -type testMasterServer struct { - mpb.UnimplementedMasterServer -} - -func (s *testMasterServer) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) { - - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, nil - -} diff --git a/internal/proxy/mock/master_tso.go b/internal/proxy/mock/master_tso.go index 294e57c4e2..b2af512275 100644 --- a/internal/proxy/mock/master_tso.go +++ b/internal/proxy/mock/master_tso.go @@ -17,7 +17,6 @@ type TSOClient struct { mux sync.Mutex } -// window is 1000ms default func (c *TSOClient) GetTimeStamp(ctx context.Context, n Timestamp) (ts Timestamp, count uint64, window time.Duration, err error) { c.mux.Lock() defer c.mux.Unlock()