mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Remove recollect segment stats during starting datacoord (#27410)
Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
ec1fe3549e
commit
e386a62fae
@ -136,17 +136,6 @@ func (c *Cluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTas
|
||||
c.sessionManager.Import(ctx, nodeID, it)
|
||||
}
|
||||
|
||||
// ReCollectSegmentStats triggers a ReCollectSegmentStats call from session manager.
|
||||
func (c *Cluster) ReCollectSegmentStats(ctx context.Context) error {
|
||||
for _, node := range c.sessionManager.getLiveNodeIDs() {
|
||||
err := c.sessionManager.ReCollectSegmentStats(ctx, node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSessions returns all sessions
|
||||
func (c *Cluster) GetSessions() []*Session {
|
||||
return c.sessionManager.GetSessions()
|
||||
|
@ -618,66 +618,3 @@ func TestCluster_Import(t *testing.T) {
|
||||
})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestCluster_ReCollectSegmentStats(t *testing.T) {
|
||||
kv := getWatchKV(t)
|
||||
defer func() {
|
||||
kv.RemoveWithPrefix("")
|
||||
kv.Close()
|
||||
}()
|
||||
|
||||
t.Run("recollect succeed", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
defer cancel()
|
||||
mockSessionCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
|
||||
return newMockDataNodeClient(1, nil)
|
||||
}
|
||||
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
|
||||
channelManager, err := NewChannelManager(kv, newMockHandler())
|
||||
assert.NoError(t, err)
|
||||
cluster := NewCluster(sessionManager, channelManager)
|
||||
defer cluster.Close()
|
||||
addr := "localhost:8080"
|
||||
info := &NodeInfo{
|
||||
Address: addr,
|
||||
NodeID: 1,
|
||||
}
|
||||
nodes := []*NodeInfo{info}
|
||||
err = cluster.Startup(ctx, nodes)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = cluster.Watch("chan-1", 1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
cluster.ReCollectSegmentStats(ctx)
|
||||
})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
})
|
||||
|
||||
t.Run("recollect failed", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
defer cancel()
|
||||
sessionManager := NewSessionManager()
|
||||
channelManager, err := NewChannelManager(kv, newMockHandler())
|
||||
assert.NoError(t, err)
|
||||
cluster := NewCluster(sessionManager, channelManager)
|
||||
defer cluster.Close()
|
||||
addr := "localhost:8080"
|
||||
info := &NodeInfo{
|
||||
Address: addr,
|
||||
NodeID: 1,
|
||||
}
|
||||
nodes := []*NodeInfo{info}
|
||||
err = cluster.Startup(ctx, nodes)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = cluster.Watch("chan-1", 1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
cluster.ReCollectSegmentStats(ctx)
|
||||
})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
})
|
||||
}
|
||||
|
@ -394,12 +394,6 @@ func (s *Server) startDataCoord() {
|
||||
s.compactionTrigger.start()
|
||||
}
|
||||
s.startServerLoop()
|
||||
// DataCoord (re)starts successfully and starts to collection segment stats
|
||||
// data from all DataNode.
|
||||
// This will prevent DataCoord from missing out any important segment stats
|
||||
// data while offline.
|
||||
log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes")
|
||||
s.reCollectSegmentStats(s.ctx)
|
||||
s.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.ServerID)
|
||||
}
|
||||
@ -1112,25 +1106,3 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
|
||||
s.meta.AddCollection(collInfo)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) reCollectSegmentStats(ctx context.Context) {
|
||||
if s.channelManager == nil {
|
||||
log.Error("null channel manager found, which should NOT happen in non-testing environment")
|
||||
return
|
||||
}
|
||||
nodes := s.sessionManager.getLiveNodeIDs()
|
||||
log.Info("re-collecting segment stats from DataNodes",
|
||||
zap.Int64s("DataNode IDs", nodes))
|
||||
|
||||
reCollectFunc := func() error {
|
||||
err := s.cluster.ReCollectSegmentStats(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := retry.Do(ctx, reCollectFunc, retry.Attempts(20), retry.Sleep(time.Millisecond*100), retry.MaxSleepTime(5*time.Second)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
@ -40,8 +40,7 @@ import (
|
||||
const (
|
||||
flushTimeout = 15 * time.Second
|
||||
// TODO: evaluate and update import timeout.
|
||||
importTimeout = 3 * time.Hour
|
||||
reCollectTimeout = 5 * time.Second
|
||||
importTimeout = 3 * time.Hour
|
||||
)
|
||||
|
||||
// SessionManager provides the grpc interfaces of cluster
|
||||
@ -227,32 +226,6 @@ func (c *SessionManager) execImport(ctx context.Context, nodeID int64, itr *data
|
||||
log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr))
|
||||
}
|
||||
|
||||
// ReCollectSegmentStats collects segment stats info from DataNodes, after DataCoord reboots.
|
||||
func (c *SessionManager) ReCollectSegmentStats(ctx context.Context, nodeID int64) error {
|
||||
cli, err := c.getClient(ctx, nodeID)
|
||||
if err != nil {
|
||||
log.Warn("failed to get dataNode client", zap.Int64("DataNode ID", nodeID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, reCollectTimeout)
|
||||
defer cancel()
|
||||
resp, err := cli.ResendSegmentStats(ctx, &datapb.ResendSegmentStatsRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_ResendSegmentStats),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
),
|
||||
})
|
||||
if err := VerifyResponse(resp, err); err != nil {
|
||||
log.Warn("re-collect segment stats call failed",
|
||||
zap.Int64("DataNode ID", nodeID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Info("re-collect segment stats call succeeded",
|
||||
zap.Int64("DataNode ID", nodeID),
|
||||
zap.Int64s("segment stat collected", resp.GetSegResent()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateResult {
|
||||
wg := sync.WaitGroup{}
|
||||
ctx := context.Background()
|
||||
|
@ -192,26 +192,6 @@ func (fm *flowgraphManager) getChannel(segID UniqueID) (Channel, error) {
|
||||
return nil, fmt.Errorf("cannot find segment %d in all flowgraphs", segID)
|
||||
}
|
||||
|
||||
// resendTT loops through flow graphs, looks for segments that are not flushed,
|
||||
// and sends them to that flow graph's `resendTTCh` channel so stats of
|
||||
// these segments will be resent.
|
||||
func (fm *flowgraphManager) resendTT() []UniqueID {
|
||||
var unFlushedSegments []UniqueID
|
||||
fm.flowgraphs.Range(func(key string, fg *dataSyncService) bool {
|
||||
segIDs := fg.channel.listNotFlushedSegmentIDs()
|
||||
if len(segIDs) > 0 {
|
||||
log.Info("un-flushed segments found, stats will be resend",
|
||||
zap.Int64s("segment IDs", segIDs))
|
||||
unFlushedSegments = append(unFlushedSegments, segIDs...)
|
||||
fg.resendTTCh <- resendTTMsg{
|
||||
segmentIDs: segIDs,
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return unFlushedSegments
|
||||
}
|
||||
|
||||
func (fm *flowgraphManager) getFlowgraphService(vchan string) (*dataSyncService, bool) {
|
||||
return fm.flowgraphs.Get(vchan)
|
||||
}
|
||||
|
@ -162,17 +162,13 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
// ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message.
|
||||
// ResendSegmentStats . ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message.
|
||||
// It returns a list of segments to be sent.
|
||||
// Deprecated in 2.3.2, reversed it just for compatibility during rolling back
|
||||
func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) {
|
||||
log.Info("start resending segment stats, if any",
|
||||
zap.Int64("DataNode ID", paramtable.GetNodeID()))
|
||||
segResent := node.flowgraphManager.resendTT()
|
||||
log.Info("found segment(s) with stats to resend",
|
||||
zap.Int64s("segment IDs", segResent))
|
||||
return &datapb.ResendSegmentStatsResponse{
|
||||
Status: merr.Success(),
|
||||
SegResent: segResent,
|
||||
SegResent: make([]int64, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -804,13 +804,13 @@ func (s *DataNodeServicesSuite) TestResendSegmentStats() {
|
||||
resp, err := s.node.ResendSegmentStats(s.ctx, req)
|
||||
s.Assert().NoError(err)
|
||||
s.Assert().True(merr.Ok(resp.GetStatus()))
|
||||
s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent())
|
||||
s.Assert().Empty(resp.GetSegResent())
|
||||
|
||||
// Duplicate call.
|
||||
resp, err = s.node.ResendSegmentStats(s.ctx, req)
|
||||
s.Assert().NoError(err)
|
||||
s.Assert().True(merr.Ok(resp.GetStatus()))
|
||||
s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent())
|
||||
s.Assert().Empty(resp.GetSegResent())
|
||||
}
|
||||
|
||||
func (s *DataNodeServicesSuite) TestFlushChannels() {
|
||||
|
@ -111,6 +111,7 @@ service DataNode {
|
||||
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
|
||||
rpc Import(ImportTaskRequest) returns(common.Status) {}
|
||||
|
||||
// Deprecated
|
||||
rpc ResendSegmentStats(ResendSegmentStatsRequest) returns(ResendSegmentStatsResponse) {}
|
||||
|
||||
rpc AddImportSegment(AddImportSegmentRequest) returns(AddImportSegmentResponse) {}
|
||||
|
@ -7346,6 +7346,7 @@ type DataNodeClient interface {
|
||||
SyncSegments(ctx context.Context, in *SyncSegmentsRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
|
||||
Import(ctx context.Context, in *ImportTaskRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
// Deprecated
|
||||
ResendSegmentStats(ctx context.Context, in *ResendSegmentStatsRequest, opts ...grpc.CallOption) (*ResendSegmentStatsResponse, error)
|
||||
AddImportSegment(ctx context.Context, in *AddImportSegmentRequest, opts ...grpc.CallOption) (*AddImportSegmentResponse, error)
|
||||
FlushChannels(ctx context.Context, in *FlushChannelsRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
@ -7510,6 +7511,7 @@ type DataNodeServer interface {
|
||||
SyncSegments(context.Context, *SyncSegmentsRequest) (*commonpb.Status, error)
|
||||
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
|
||||
Import(context.Context, *ImportTaskRequest) (*commonpb.Status, error)
|
||||
// Deprecated
|
||||
ResendSegmentStats(context.Context, *ResendSegmentStatsRequest) (*ResendSegmentStatsResponse, error)
|
||||
AddImportSegment(context.Context, *AddImportSegmentRequest) (*AddImportSegmentResponse, error)
|
||||
FlushChannels(context.Context, *FlushChannelsRequest) (*commonpb.Status, error)
|
||||
|
Loading…
Reference in New Issue
Block a user