From 0978b93a212d09a1335dee3d0a78c5653dcd2a90 Mon Sep 17 00:00:00 2001 From: sunby Date: Tue, 22 Jun 2021 18:24:08 +0800 Subject: [PATCH] Refactor data coordinator (#5982) Rename variable name and make error msg more clear Signed-off-by: sunby --- internal/datacoord/allocator.go | 14 +- internal/datacoord/allocator_test.go | 3 +- internal/datacoord/cluster_session_manager.go | 9 +- internal/datacoord/flush_monitor_test.go | 12 +- internal/datacoord/grpc_services.go | 27 ++-- internal/datacoord/meta.go | 141 +++++++++++++----- internal/datacoord/meta_test.go | 10 +- .../datacoord/segment_allocation_policy.go | 2 +- internal/datacoord/segment_manager.go | 70 +++++---- internal/datacoord/server.go | 54 ++++--- internal/datacoord/server_test.go | 6 +- .../distributed/datanode/client/client.go | 4 +- 12 files changed, 213 insertions(+), 139 deletions(-) diff --git a/internal/datacoord/allocator.go b/internal/datacoord/allocator.go index 8802c6f1cb..8502cd845f 100644 --- a/internal/datacoord/allocator.go +++ b/internal/datacoord/allocator.go @@ -25,18 +25,19 @@ type allocator interface { } type rootCoordAllocator struct { + ctx context.Context rootCoordClient types.RootCoord } -func newAllocator(rootCoordClient types.RootCoord) *rootCoordAllocator { +func newRootCoordAllocator(ctx context.Context, rootCoordClient types.RootCoord) *rootCoordAllocator { return &rootCoordAllocator{ + ctx: ctx, rootCoordClient: rootCoordClient, } } -func (allocator *rootCoordAllocator) allocTimestamp() (Timestamp, error) { - ctx := context.TODO() - resp, err := allocator.rootCoordClient.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{ +func (alloc *rootCoordAllocator) allocTimestamp() (Timestamp, error) { + resp, err := alloc.rootCoordClient.AllocTimestamp(alloc.ctx, &rootcoordpb.AllocTimestampRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RequestTSO, MsgID: -1, // todo add msg id @@ -51,9 +52,8 @@ func (allocator *rootCoordAllocator) allocTimestamp() (Timestamp, error) { return resp.Timestamp, nil } -func (allocator *rootCoordAllocator) allocID() (UniqueID, error) { - ctx := context.TODO() - resp, err := allocator.rootCoordClient.AllocID(ctx, &rootcoordpb.AllocIDRequest{ +func (alloc *rootCoordAllocator) allocID() (UniqueID, error) { + resp, err := alloc.rootCoordClient.AllocID(alloc.ctx, &rootcoordpb.AllocIDRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RequestID, MsgID: -1, // todo add msg id diff --git a/internal/datacoord/allocator_test.go b/internal/datacoord/allocator_test.go index e30529d6ed..5e2bfc6d46 100644 --- a/internal/datacoord/allocator_test.go +++ b/internal/datacoord/allocator_test.go @@ -15,11 +15,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "golang.org/x/net/context" ) func TestAllocator_Basic(t *testing.T) { ms := newMockRootCoordService() - allocator := newAllocator(ms) + allocator := newRootCoordAllocator(context.Background(), ms) t.Run("Test allocTimestamp", func(t *testing.T) { _, err := allocator.allocTimestamp() diff --git a/internal/datacoord/cluster_session_manager.go b/internal/datacoord/cluster_session_manager.go index 2fa16441f3..125a7e69b4 100644 --- a/internal/datacoord/cluster_session_manager.go +++ b/internal/datacoord/cluster_session_manager.go @@ -11,6 +11,7 @@ package datacoord import ( + "context" "sync" "github.com/milvus-io/milvus/internal/types" @@ -26,12 +27,14 @@ type sessionManager interface { type clusterSessionManager struct { sync.RWMutex + ctx context.Context sessions map[string]types.DataNode - dataClientCreator func(addr string) (types.DataNode, error) + dataClientCreator dataNodeCreatorFunc } -func newClusterSessionManager(dataClientCreator func(addr string) (types.DataNode, error)) *clusterSessionManager { +func newClusterSessionManager(ctx context.Context, dataClientCreator dataNodeCreatorFunc) *clusterSessionManager { return &clusterSessionManager{ + ctx: ctx, sessions: make(map[string]types.DataNode), dataClientCreator: dataClientCreator, } @@ -39,7 +42,7 @@ func newClusterSessionManager(dataClientCreator func(addr string) (types.DataNod // lock acquired func (m *clusterSessionManager) createSession(addr string) (types.DataNode, error) { - cli, err := m.dataClientCreator(addr) + cli, err := m.dataClientCreator(m.ctx, addr) if err != nil { return nil, err } diff --git a/internal/datacoord/flush_monitor_test.go b/internal/datacoord/flush_monitor_test.go index 91299fa2eb..ad27461a15 100644 --- a/internal/datacoord/flush_monitor_test.go +++ b/internal/datacoord/flush_monitor_test.go @@ -42,15 +42,15 @@ func TestFlushMonitor(t *testing.T) { // create seg0 for partition0, seg0/seg1 for partition1 segID0_0, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo0_0, err := BuildSegment(collID, partID0, segID0_0, channelName) + segInfo0_0, err := buildSegment(collID, partID0, segID0_0, channelName) assert.Nil(t, err) segID1_0, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo1_0, err := BuildSegment(collID, partID1, segID1_0, channelName) + segInfo1_0, err := buildSegment(collID, partID1, segID1_0, channelName) assert.Nil(t, err) segID1_1, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo1_1, err := BuildSegment(collID, partID1, segID1_1, channelName) + segInfo1_1, err := buildSegment(collID, partID1, segID1_1, channelName) assert.Nil(t, err) // check AddSegment @@ -77,7 +77,7 @@ func TestFlushMonitor(t *testing.T) { fm.segmentPolicy = estSegmentSizePolicy(1024*1024, 1024*1024*2) // row size 1Mib Limit 2 MB segID3Rows, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo3Rows, err := BuildSegment(collID, partID1, segID3Rows, channelName) + segInfo3Rows, err := buildSegment(collID, partID1, segID3Rows, channelName) segInfo3Rows.NumOfRows = 3 assert.Nil(t, err) @@ -95,7 +95,7 @@ func TestFlushMonitor(t *testing.T) { for i := 0; i < 100; i++ { segID, err := mockAllocator.allocID() assert.Nil(t, err) - seg, err := BuildSegment(collID, partID0, segID, channelName2) + seg, err := buildSegment(collID, partID0, segID, channelName2) assert.Nil(t, err) seg.DmlPosition = &internalpb.MsgPosition{ Timestamp: uint64(i + 1), @@ -108,7 +108,7 @@ func TestFlushMonitor(t *testing.T) { exSegID, err := mockAllocator.allocID() assert.Nil(t, err) - seg, err := BuildSegment(collID, partID0, exSegID, channelName2) + seg, err := buildSegment(collID, partID0, exSegID, channelName2) assert.Nil(t, err) seg.DmlPosition = &internalpb.MsgPosition{ Timestamp: uint64(0), // the oldest diff --git a/internal/datacoord/grpc_services.go b/internal/datacoord/grpc_services.go index b68734803f..c57aedd75a 100644 --- a/internal/datacoord/grpc_services.go +++ b/internal/datacoord/grpc_services.go @@ -16,6 +16,8 @@ import ( "go.uber.org/zap" ) +const serverNotServingErrMsg = "server is not serving" + func (s *Server) isClosed() bool { return atomic.LoadInt64(&s.isServing) != 2 } @@ -44,11 +46,11 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb ErrorCode: commonpb.ErrorCode_UnexpectedError, } if s.isClosed() { - resp.Reason = "server is closed" + resp.Reason = serverNotServingErrMsg return resp, nil } if err := s.segmentManager.SealAllSegments(ctx, req.CollectionID); err != nil { - resp.Reason = fmt.Sprintf("Seal all segments error %s", err) + resp.Reason = fmt.Sprintf("Failed to flush %d, %s", req.CollectionID, err) return resp, nil } return &commonpb.Status{ @@ -60,6 +62,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI if s.isClosed() { return &datapb.AssignSegmentIDResponse{ Status: &commonpb.Status{ + Reason: serverNotServingErrMsg, ErrorCode: commonpb.ErrorCode_UnexpectedError, }, }, nil @@ -85,7 +88,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI if !s.meta.HasCollection(r.CollectionID) { if err := s.loadCollectionFromRootCoord(ctx, r.CollectionID); err != nil { - errMsg := fmt.Sprintf("can not load collection %d", r.CollectionID) + errMsg := fmt.Sprintf("Can not load collection %d", r.CollectionID) appendFailedAssignment(errMsg) log.Error("load collection from rootcoord error", zap.Int64("collectionID", r.CollectionID), @@ -104,7 +107,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI segmentID, retCount, expireTs, err := s.segmentManager.AllocSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count)) if err != nil { - errMsg := fmt.Sprintf("allocation of collection %d, partition %d, channel %s, count %d error: %s", + errMsg := fmt.Sprintf("Allocation of collection %d, partition %d, channel %s, count %d error: %s", r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error()) appendFailedAssignment(errMsg) continue @@ -142,7 +145,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta }, } if s.isClosed() { - resp.Status.Reason = "server is initializing" + resp.Status.Reason = serverNotServingErrMsg return resp, nil } @@ -154,7 +157,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta segmentInfo, err := s.meta.GetSegment(segmentID) if err != nil { state.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - state.Status.Reason = "get segment states error: " + err.Error() + state.Status.Reason = fmt.Sprintf("Failed to get segment %d, %s", segmentID, err.Error()) } else { state.Status.ErrorCode = commonpb.ErrorCode_Success state.State = segmentInfo.GetState() @@ -174,7 +177,7 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert }, } if s.isClosed() { - resp.Status.Reason = "server is initializing" + resp.Status.Reason = serverNotServingErrMsg return resp, nil } p := path.Join(Params.SegmentBinlogSubPath, strconv.FormatInt(req.SegmentID, 10)) + "/" // prefix/id/ instead of prefix/id @@ -212,7 +215,7 @@ func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol }, } if s.isClosed() { - resp.Status.Reason = "server is initializing" + resp.Status.Reason = serverNotServingErrMsg return resp, nil } nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID) @@ -232,7 +235,7 @@ func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart }, } if s.isClosed() { - resp.Status.Reason = "server is initializing" + resp.Status.Reason = serverNotServingErrMsg return resp, nil } nums, err := s.meta.GetNumRowsOfPartition(req.CollectionID, req.PartitionID) @@ -261,7 +264,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR }, } if s.isClosed() { - resp.Status.Reason = "data service is not healthy" + resp.Status.Reason = serverNotServingErrMsg return resp, nil } infos := make([]*datapb.SegmentInfo, 0, len(req.SegmentIDs)) @@ -283,7 +286,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath ErrorCode: commonpb.ErrorCode_UnexpectedError, } if s.isClosed() { - resp.Reason = "server is closed" + resp.Reason = serverNotServingErrMsg return resp, nil } log.Debug("Receive SaveBinlogPaths request", @@ -364,7 +367,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf }, } if s.isClosed() { - resp.Status.Reason = "server is initializing" + resp.Status.Reason = serverNotServingErrMsg return resp, nil } segmentIDs := s.meta.GetSegmentsOfPartition(collectionID, partitionID) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 6081a2d820..792141a57c 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -32,15 +32,6 @@ const ( type errSegmentNotFound struct { segmentID UniqueID } -type errCollectionNotFound struct { - collectionID UniqueID -} -type meta struct { - sync.RWMutex - client kv.TxnKV // client of a reliable kv service, i.e. etcd client - collections map[UniqueID]*datapb.CollectionInfo // collection id to collection info - segments map[UniqueID]*datapb.SegmentInfo // segment id to segment info -} func newErrSegmentNotFound(segmentID UniqueID) errSegmentNotFound { return errSegmentNotFound{segmentID: segmentID} @@ -50,6 +41,10 @@ func (err errSegmentNotFound) Error() string { return fmt.Sprintf("segment %d not found", err.segmentID) } +type errCollectionNotFound struct { + collectionID UniqueID +} + func newErrCollectionNotFound(collectionID UniqueID) errCollectionNotFound { return errCollectionNotFound{collectionID: collectionID} } @@ -58,6 +53,62 @@ func (err errCollectionNotFound) Error() string { return fmt.Sprintf("collection %d not found", err.collectionID) } +type errPartitionNotFound struct { + partitionID UniqueID +} + +func newErrPartitionNotFound(partitionID UniqueID) errPartitionNotFound { + return errPartitionNotFound{partitionID: partitionID} +} + +func (err errPartitionNotFound) Error() string { + return fmt.Sprintf("partition %d not found", err.partitionID) +} + +type errCollectionExist struct { + name string + id UniqueID +} + +func newErrCollectionExist(collectionName string, collectionID UniqueID) errCollectionExist { + return errCollectionExist{name: collectionName, id: collectionID} +} + +func (err errCollectionExist) Error() string { + return fmt.Sprintf("collection %s with id %d already exist", err.name, err.id) +} + +type errPartitionExist struct { + id UniqueID +} + +func newErrPartitionExist(partitionID UniqueID) errPartitionExist { + return errPartitionExist{id: partitionID} +} + +func (err errPartitionExist) Error() string { + return fmt.Sprintf("partition %d already exist", err.id) +} + +type errSegmentExist struct { + id UniqueID +} + +func newErrSegmentExist(segmentID UniqueID) errSegmentExist { + return errSegmentExist{id: segmentID} +} + +func (err errSegmentExist) Error() string { + return fmt.Sprintf("segment %d already exist", err.id) +} + +type meta struct { + sync.RWMutex + client kv.TxnKV // client of a reliable kv service, i.e. etcd client + collections map[UniqueID]*datapb.CollectionInfo // collection id to collection info + segments map[UniqueID]*datapb.SegmentInfo // segment id to segment info +} + func newMeta(kv kv.TxnKV) (*meta, error) { mt := &meta{ client: kv, @@ -93,27 +144,27 @@ func (m *meta) AddCollection(collection *datapb.CollectionInfo) error { m.Lock() defer m.Unlock() if _, ok := m.collections[collection.ID]; ok { - return fmt.Errorf("collection %s with id %d already exist", collection.Schema.Name, collection.ID) + return newErrCollectionExist(collection.GetSchema().GetName(), collection.GetID()) } m.collections[collection.ID] = collection return nil } -func (m *meta) DropCollection(collID UniqueID) error { +func (m *meta) DropCollection(collectionID UniqueID) error { m.Lock() defer m.Unlock() - if _, ok := m.collections[collID]; !ok { - return newErrCollectionNotFound(collID) + if _, ok := m.collections[collectionID]; !ok { + return newErrCollectionNotFound(collectionID) } - key := fmt.Sprintf("%s/%d/", segmentPrefix, collID) + key := buildCollectionPath(collectionID) if err := m.client.RemoveWithPrefix(key); err != nil { return err } - delete(m.collections, collID) + delete(m.collections, collectionID) for i, info := range m.segments { - if info.CollectionID == collID { + if info.CollectionID == collectionID { delete(m.segments, i) } } @@ -141,9 +192,9 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) (int64, error) { m.RLock() defer m.RUnlock() var ret int64 = 0 - for _, info := range m.segments { - if info.CollectionID == collectionID { - ret += info.NumOfRows + for _, segment := range m.segments { + if segment.CollectionID == collectionID { + ret += segment.GetNumOfRows() } } return ret, nil @@ -153,7 +204,7 @@ func (m *meta) AddSegment(segment *datapb.SegmentInfo) error { m.Lock() defer m.Unlock() if _, ok := m.segments[segment.ID]; ok { - return fmt.Errorf("segment %d already exist", segment.ID) + return newErrSegmentExist(segment.GetID()) } m.segments[segment.ID] = segment if err := m.saveSegmentInfo(segment); err != nil { @@ -179,13 +230,13 @@ func (m *meta) UpdateSegmentStatistic(stats *internalpb.SegmentStatisticsUpdates func (m *meta) SetLastExpireTime(segmentID UniqueID, expireTs Timestamp) error { m.Lock() defer m.Unlock() - seg, ok := m.segments[segmentID] + segment, ok := m.segments[segmentID] if !ok { return newErrSegmentNotFound(segmentID) } - seg.LastExpireTime = expireTs + segment.LastExpireTime = expireTs - if err := m.saveSegmentInfo(seg); err != nil { + if err := m.saveSegmentInfo(segment); err != nil { return err } return nil @@ -238,7 +289,7 @@ func (m *meta) SaveBinlogAndCheckPoints(segID UniqueID, flushed bool, startPositions []*datapb.SegmentStartPosition) error { m.Lock() defer m.Unlock() - segInfo, ok := m.segments[segID] + segment, ok := m.segments[segID] if !ok { return newErrSegmentNotFound(segID) } @@ -247,7 +298,7 @@ func (m *meta) SaveBinlogAndCheckPoints(segID UniqueID, flushed bool, kv[k] = v } if flushed { - segInfo.State = commonpb.SegmentState_Flushing + segment.State = commonpb.SegmentState_Flushing } modifiedSegments := make(map[UniqueID]struct{}) @@ -281,9 +332,9 @@ func (m *meta) SaveBinlogAndCheckPoints(segID UniqueID, flushed bool, } for id := range modifiedSegments { - segInfo = m.segments[id] - segBytes := proto.MarshalTextString(segInfo) - key := m.prepareSegmentPath(segInfo) + segment = m.segments[id] + segBytes := proto.MarshalTextString(segment) + key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) kv[key] = segBytes } @@ -355,20 +406,20 @@ func (m *meta) AddPartition(collectionID UniqueID, partitionID UniqueID) error { for _, t := range coll.Partitions { if t == partitionID { - return fmt.Errorf("partition %d already exists", partitionID) + return newErrPartitionExist(partitionID) } } coll.Partitions = append(coll.Partitions, partitionID) return nil } -func (m *meta) DropPartition(collID UniqueID, partitionID UniqueID) error { +func (m *meta) DropPartition(collectionID UniqueID, partitionID UniqueID) error { m.Lock() defer m.Unlock() - collection, ok := m.collections[collID] + collection, ok := m.collections[collectionID] if !ok { - return newErrCollectionNotFound(collID) + return newErrCollectionNotFound(collectionID) } idx := -1 for i, id := range collection.Partitions { @@ -378,10 +429,10 @@ func (m *meta) DropPartition(collID UniqueID, partitionID UniqueID) error { } } if idx == -1 { - return fmt.Errorf("cannot find partition id %d", partitionID) + return newErrPartitionNotFound(partitionID) } - prefix := fmt.Sprintf("%s/%d/%d/", segmentPrefix, collID, partitionID) + prefix := buildPartitionPath(collectionID, partitionID) if err := m.client.RemoveWithPrefix(prefix); err != nil { return err } @@ -451,24 +502,32 @@ func (m *meta) GetFlushingSegments() []*datapb.SegmentInfo { func (m *meta) saveSegmentInfo(segment *datapb.SegmentInfo) error { segBytes := proto.MarshalTextString(segment) - key := m.prepareSegmentPath(segment) + key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) return m.client.Save(key, segBytes) } func (m *meta) removeSegmentInfo(segment *datapb.SegmentInfo) error { - key := m.prepareSegmentPath(segment) + key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) return m.client.Remove(key) } -func (m *meta) prepareSegmentPath(segInfo *datapb.SegmentInfo) string { - return fmt.Sprintf("%s/%d/%d/%d", segmentPrefix, segInfo.CollectionID, segInfo.PartitionID, segInfo.ID) -} - func (m *meta) saveKvTxn(kv map[string]string) error { return m.client.MultiSave(kv) } -func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*datapb.SegmentInfo, error) { +func buildSegmentPath(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) string { + return fmt.Sprintf("%s/%d/%d/%d", segmentPrefix, collectionID, partitionID, segmentID) +} + +func buildCollectionPath(collectionID UniqueID) string { + return fmt.Sprintf("%s/%d/", segmentPrefix, collectionID) +} + +func buildPartitionPath(collectionID UniqueID, partitionID UniqueID) string { + return fmt.Sprintf("%s/%d/%d/", segmentPrefix, collectionID, partitionID) +} + +func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*datapb.SegmentInfo, error) { return &datapb.SegmentInfo{ ID: segmentID, CollectionID: collectionID, diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 65e7b6dcd9..2c173489f2 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -122,15 +122,15 @@ func TestMeta_Basic(t *testing.T) { // create seg0 for partition0, seg0/seg1 for partition1 segID0_0, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo0_0, err := BuildSegment(collID, partID0, segID0_0, channelName) + segInfo0_0, err := buildSegment(collID, partID0, segID0_0, channelName) assert.Nil(t, err) segID1_0, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo1_0, err := BuildSegment(collID, partID1, segID1_0, channelName) + segInfo1_0, err := buildSegment(collID, partID1, segID1_0, channelName) assert.Nil(t, err) segID1_1, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo1_1, err := BuildSegment(collID, partID1, segID1_1, channelName) + segInfo1_1, err := buildSegment(collID, partID1, segID1_1, channelName) assert.Nil(t, err) // check AddSegment @@ -202,7 +202,7 @@ func TestMeta_Basic(t *testing.T) { // add seg1 with 100 rows segID0, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo0, err := BuildSegment(collID, partID0, segID0, channelName) + segInfo0, err := buildSegment(collID, partID0, segID0, channelName) assert.Nil(t, err) segInfo0.NumOfRows = rowCount0 err = meta.AddSegment(segInfo0) @@ -211,7 +211,7 @@ func TestMeta_Basic(t *testing.T) { // add seg2 with 300 rows segID1, err := mockAllocator.allocID() assert.Nil(t, err) - segInfo1, err := BuildSegment(collID, partID0, segID1, channelName) + segInfo1, err := buildSegment(collID, partID0, segID1, channelName) assert.Nil(t, err) segInfo1.NumOfRows = rowCount1 err = meta.AddSegment(segInfo1) diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 758f62a9a9..02443fc682 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -70,7 +70,7 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy { return func(status *segmentStatus, ts Timestamp) bool { var allocSize int64 for _, allocation := range status.allocations { - allocSize += allocation.rowNums + allocSize += allocation.numOfRows } // max, written, allocated := status.total, status.currentRows, allocSize // float64(writtenCount) >= Params.SegmentSizeFactor*float64(maxCount) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index dd38a22355..2f0967efc1 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -68,7 +68,7 @@ type segmentStatus struct { } type allocation struct { - rowNums int64 + numOfRows int64 expireTime Timestamp } @@ -250,16 +250,15 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID segID = status.id retCount = requestRows expireTime = status.lastExpireTime - return } -func (s *SegmentManager) alloc(segStatus *segmentStatus, numRows int64) (bool, error) { +func (s *SegmentManager) alloc(status *segmentStatus, numOfRows int64) (bool, error) { var allocSize int64 - for _, allocation := range segStatus.allocations { - allocSize += allocation.rowNums + for _, allocItem := range status.allocations { + allocSize += allocItem.numOfRows } - if !s.allocPolicy.apply(segStatus.total, segStatus.currentRows, allocSize, numRows) { + if !s.allocPolicy.apply(status.total, status.currentRows, allocSize, numOfRows) { return false, nil } @@ -269,13 +268,13 @@ func (s *SegmentManager) alloc(segStatus *segmentStatus, numRows int64) (bool, e } alloc := &allocation{ - rowNums: numRows, + numOfRows: numOfRows, expireTime: expireTs, } - segStatus.lastExpireTime = expireTs - segStatus.allocations = append(segStatus.allocations, alloc) + status.lastExpireTime = expireTs + status.allocations = append(status.allocations, alloc) - if err := s.meta.SetLastExpireTime(segStatus.id, expireTs); err != nil { + if err := s.meta.SetLastExpireTime(status.id, expireTs); err != nil { return false, err } return true, nil @@ -299,22 +298,22 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique if err != nil { return nil, err } - totalRows, err := s.estimateTotalRows(collectionID) + maxNumOfRows, err := s.estimateMaxNumOfRows(collectionID) if err != nil { return nil, err } - segStatus := &segmentStatus{ + status := &segmentStatus{ id: id, collectionID: collectionID, partitionID: partitionID, sealed: false, - total: int64(totalRows), + total: int64(maxNumOfRows), insertChannel: channelName, allocations: []*allocation{}, lastExpireTime: 0, currentRows: 0, } - s.stats[id] = segStatus + s.stats[id] = status segmentInfo := &datapb.SegmentInfo{ ID: id, @@ -323,7 +322,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique InsertChannel: channelName, NumOfRows: 0, State: commonpb.SegmentState_Growing, - MaxRowNum: int64(totalRows), + MaxRowNum: int64(maxNumOfRows), LastExpireTime: 0, StartPosition: &internalpb.MsgPosition{ ChannelName: channelName, @@ -339,15 +338,14 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique log.Debug("datacoord: estimateTotalRows: ", zap.Int64("CollectionID", segmentInfo.CollectionID), zap.Int64("SegmentID", segmentInfo.ID), - zap.Int("Rows", totalRows), - zap.String("channel", segmentInfo.InsertChannel)) + zap.Int("Rows", maxNumOfRows), + zap.String("Channel", segmentInfo.InsertChannel)) s.helper.afterCreateSegment(segmentInfo) - - return segStatus, nil + return status, nil } -func (s *SegmentManager) estimateTotalRows(collectionID UniqueID) (int, error) { +func (s *SegmentManager) estimateMaxNumOfRows(collectionID UniqueID) (int, error) { collMeta, err := s.meta.GetCollection(collectionID) if err != nil { return -1, err @@ -391,12 +389,12 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin } ret := make([]UniqueID, 0) - for _, segStatus := range s.stats { - if segStatus.insertChannel != channel { + for _, status := range s.stats { + if status.insertChannel != channel { continue } - if s.flushPolicy.apply(segStatus, t) { - ret = append(ret, segStatus.id) + if s.flushPolicy.apply(status, t) { + ret = append(ret, status.id) } } @@ -416,34 +414,34 @@ func (s *SegmentManager) UpdateSegmentStats(stat *internalpb.SegmentStatisticsUp // tryToSealSegment applies segment & channel seal policies func (s *SegmentManager) tryToSealSegment(ts Timestamp) error { channelInfo := make(map[string][]*segmentStatus) - for _, segStatus := range s.stats { - channelInfo[segStatus.insertChannel] = append(channelInfo[segStatus.insertChannel], segStatus) - if segStatus.sealed { + for _, status := range s.stats { + channelInfo[status.insertChannel] = append(channelInfo[status.insertChannel], status) + if status.sealed { continue } // change shouldSeal to segment seal policy logic for _, policy := range s.segmentSealPolicies { - if policy(segStatus, ts) { - if err := s.meta.SealSegment(segStatus.id); err != nil { + if policy(status, ts) { + if err := s.meta.SealSegment(status.id); err != nil { return err } - segStatus.sealed = true + status.sealed = true break } } } - for channel, segs := range channelInfo { + for channel, segmentStats := range channelInfo { for _, policy := range s.channelSealPolicies { - vs := policy(channel, segs, ts) - for _, seg := range vs { - if seg.sealed { + vs := policy(channel, segmentStats, ts) + for _, status := range vs { + if status.sealed { continue } - if err := s.meta.SealSegment(seg.id); err != nil { + if err := s.meta.SealSegment(status.id); err != nil { return err } - seg.sealed = true + status.sealed = true } } } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ff739e214d..bf4715009e 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -37,12 +37,20 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" ) -const rootCoordClientTimout = 20 * time.Second +const ( + rootCoordClientTimout = 20 * time.Second + connEtcdMaxRetryTime = 100000 + connEtcdRetryInterval = 200 * time.Millisecond +) type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp ) + +type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error) +type rootCoordCreatorFunc func(ctx context.Context) (types.RootCoord, error) + type Server struct { ctx context.Context serverLoopCtx context.Context @@ -67,27 +75,30 @@ type Server struct { activeCh <-chan bool eventCh <-chan *sessionutil.SessionEvent - dataClientCreator func(addr string) (types.DataNode, error) - rootCoordClientCreator func(addr string) (types.RootCoord, error) + dataClientCreator dataNodeCreatorFunc + rootCoordClientCreator rootCoordCreatorFunc } func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { rand.Seed(time.Now().UnixNano()) s := &Server{ - ctx: ctx, - msFactory: factory, - flushCh: make(chan UniqueID, 1024), + ctx: ctx, + msFactory: factory, + flushCh: make(chan UniqueID, 1024), + dataClientCreator: defaultDataNodeCreatorFunc, + rootCoordClientCreator: defaultRootCoordCreatorFunc, } - s.dataClientCreator = func(addr string) (types.DataNode, error) { - return datanodeclient.NewClient(addr, 3*time.Second) - } - s.rootCoordClientCreator = func(addr string) (types.RootCoord, error) { - return rootcoordclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, rootCoordClientTimout) - } - return s, nil } +func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) { + return datanodeclient.NewClient(ctx, addr, 3*time.Second) +} + +func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoord, error) { + return rootcoordclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, rootCoordClientTimout) +} + // Register register data service at etcd func (s *Server) Register() error { s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints) @@ -127,7 +138,7 @@ func (s *Server) Start() error { return err } - s.allocator = newAllocator(s.rootCoordClient) + s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient) s.startSegmentManager() if err = s.initFlushMsgStream(); err != nil { @@ -141,7 +152,7 @@ func (s *Server) Start() error { s.startServerLoop() atomic.StoreInt64(&s.isServing, 2) - log.Debug("start success") + log.Debug("DataCoordinator startup success") return nil } @@ -150,7 +161,7 @@ func (s *Server) initCluster() error { if err != nil { return err } - sManager := newClusterSessionManager(s.dataClientCreator) + sManager := newClusterSessionManager(s.ctx, s.dataClientCreator) s.cluster = newCluster(s.ctx, dManager, sManager, s) return nil } @@ -211,7 +222,7 @@ func (s *Server) initMeta() error { } return nil } - return retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + return retry.Retry(connEtcdMaxRetryTime, connEtcdRetryInterval, connectEtcdFn) } func (s *Server) initFlushMsgStream() error { @@ -224,7 +235,6 @@ func (s *Server) initFlushMsgStream() error { s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName}) log.Debug("DataCoord AsProducer:" + Params.SegmentInfoChannelName) s.flushMsgStream.Start() - return nil } @@ -251,6 +261,7 @@ func (s *Server) startStatsChannel(ctx context.Context) { for { select { case <-ctx.Done(): + log.Debug("stats channel shutdown") return default: } @@ -290,7 +301,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { for { select { case <-ctx.Done(): - log.Debug("data node tt loop done") + log.Debug("data node tt loop shutdown") return default: } @@ -381,7 +392,7 @@ func (s *Server) startActiveCheck(ctx context.Context) { continue } s.Stop() - log.Debug("disconnect with etcd") + log.Debug("disconnect with etcd and shutdown data coordinator") return case <-ctx.Done(): log.Debug("connection check shutdown") @@ -438,8 +449,7 @@ func (s *Server) handleFlushingSegments(ctx context.Context) { func (s *Server) initRootCoordClient() error { var err error - s.rootCoordClient, err = s.rootCoordClientCreator("") - if err != nil { + if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx); err != nil { return err } if err = s.rootCoordClient.Init(); err != nil { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 87f580b562..5d135fd67d 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -636,7 +636,7 @@ func TestGetRecoveryInfo(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(addr string) (types.RootCoord, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoord, error) { return newMockRootCoordService(), nil } @@ -773,10 +773,10 @@ func newTestServer(t *testing.T, receiveCh chan interface{}) *Server { svr, err := CreateServer(context.TODO(), factory) assert.Nil(t, err) - svr.dataClientCreator = func(addr string) (types.DataNode, error) { + svr.dataClientCreator = func(ctx context.Context, addr string) (types.DataNode, error) { return newMockDataNodeClient(0, receiveCh) } - svr.rootCoordClientCreator = func(addr string) (types.RootCoord, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoord, error) { return newMockRootCoordService(), nil } assert.Nil(t, err) diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 4492f3c207..e425941172 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -45,7 +45,7 @@ type Client struct { recallTry int } -func NewClient(addr string, timeout time.Duration) (*Client, error) { +func NewClient(ctx context.Context, addr string, timeout time.Duration) (*Client, error) { if addr == "" { return nil, fmt.Errorf("address is empty") } @@ -54,7 +54,7 @@ func NewClient(addr string, timeout time.Duration) (*Client, error) { grpc: nil, conn: nil, addr: addr, - ctx: context.Background(), + ctx: ctx, timeout: timeout, recallTry: 3, reconnTry: 10,