enhance: [2.4] try to speed up the loading of small collections (#33746)

- issue: #33569
- pr: #33570

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-06-11 15:07:55 +08:00 committed by GitHub
parent c331aa4ad3
commit f664b51ebe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 148 additions and 16 deletions

View File

@ -285,6 +285,8 @@ queryCoord:
channelTaskTimeout: 60000 # 1 minute
segmentTaskTimeout: 120000 # 2 minute
distPullInterval: 500
collectionObserverInterval: 200
checkExecutedFlagInterval: 100
heartbeatAvailableInterval: 10000 # 10s, Only QueryNodes which fetched heartbeats within the duration are available
loadTimeoutSeconds: 600
distRequestTimeout: 5000 # the request timeout for querycoord fetching data distribution from querynodes, in milliseconds

View File

@ -78,7 +78,7 @@ func (dc *ControllerImpl) SyncAll(ctx context.Context) {
if err != nil {
log.Warn("SyncAll come across err when getting data distribution", zap.Error(err))
} else {
handler.handleDistResp(resp)
handler.handleDistResp(resp, true)
}
}(h)
}

View File

@ -77,6 +77,7 @@ func (suite *DistControllerTestSuite) SetupTest() {
suite.broker = meta.NewMockBroker(suite.T())
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe()
suite.controller = NewDistController(suite.mockCluster, nodeManager, distManager, targetManager, suite.mockScheduler)
}

View File

@ -58,6 +58,8 @@ func (dh *distHandler) start(ctx context.Context) {
log.Info("start dist handler")
ticker := time.NewTicker(Params.QueryCoordCfg.DistPullInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
checkExecutedFlagTicker := time.NewTicker(Params.QueryCoordCfg.CheckExecutedFlagInterval.GetAsDuration(time.Millisecond))
defer checkExecutedFlagTicker.Stop()
failures := 0
for {
select {
@ -67,25 +69,39 @@ func (dh *distHandler) start(ctx context.Context) {
case <-dh.c:
log.Info("close dist handler")
return
case <-ticker.C:
resp, err := dh.getDistribution(ctx)
if err != nil {
node := dh.nodeManager.Get(dh.nodeID)
fields := []zap.Field{zap.Int("times", failures)}
if node != nil {
fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
case <-checkExecutedFlagTicker.C:
executedFlagChan := dh.scheduler.GetExecutedFlag(dh.nodeID)
if executedFlagChan != nil {
select {
case <-executedFlagChan:
dh.pullDist(ctx, &failures, false)
default:
}
fields = append(fields, zap.Error(err))
log.RatedWarn(30.0, "failed to get data distribution", fields...)
} else {
failures = 0
dh.handleDistResp(resp)
}
case <-ticker.C:
dh.pullDist(ctx, &failures, true)
}
}
}
func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse) {
func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) {
resp, err := dh.getDistribution(ctx)
if err != nil {
node := dh.nodeManager.Get(dh.nodeID)
*failures = *failures + 1
fields := []zap.Field{zap.Int("times", *failures)}
if node != nil {
fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
}
fields = append(fields, zap.Error(err))
log.RatedWarn(30.0, "failed to get data distribution", fields...)
} else {
*failures = 0
dh.handleDistResp(resp, dispatchTask)
}
}
func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse, dispatchTask bool) {
node := dh.nodeManager.Get(resp.GetNodeID())
if node != nil {
node.UpdateStats(
@ -103,7 +119,9 @@ func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse)
dh.updateChannelsDistribution(resp)
dh.updateLeaderView(resp)
dh.scheduler.Dispatch(dh.nodeID)
if dispatchTask {
dh.scheduler.Dispatch(dh.nodeID)
}
}
func (dh *distHandler) updateSegmentsDistribution(resp *querypb.GetDataDistributionResponse) {

View File

@ -88,7 +88,7 @@ func (ob *CollectionObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
const observePeriod = time.Second
observePeriod := Params.QueryCoordCfg.CollectionObserverInterval.GetAsDuration(time.Millisecond)
ob.wg.Add(1)
go func() {
defer ob.wg.Done()

View File

@ -63,6 +63,7 @@ type Executor struct {
executingTasks *typeutil.ConcurrentSet[string] // task index
executingTaskNum atomic.Int32
executedFlag chan struct{}
}
func NewExecutor(meta *meta.Meta,
@ -82,6 +83,7 @@ func NewExecutor(meta *meta.Meta,
nodeMgr: nodeMgr,
executingTasks: typeutil.NewConcurrentSet[string](),
executedFlag: make(chan struct{}, 1),
}
}
@ -131,12 +133,21 @@ func (ex *Executor) Execute(task Task, step int) bool {
return true
}
func (ex *Executor) GetExecutedFlag() <-chan struct{} {
return ex.executedFlag
}
func (ex *Executor) removeTask(task Task, step int) {
if task.Err() != nil {
log.Info("execute action done, remove it",
zap.Int64("taskID", task.ID()),
zap.Int("step", step),
zap.Error(task.Err()))
} else {
select {
case ex.executedFlag <- struct{}{}:
default:
}
}
ex.executingTasks.Remove(task.Index())

View File

@ -166,6 +166,50 @@ func (_c *MockScheduler_GetChannelTaskNum_Call) RunAndReturn(run func() int) *Mo
return _c
}
// GetExecutedFlag provides a mock function with given fields: nodeID
func (_m *MockScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} {
ret := _m.Called(nodeID)
var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func(int64) <-chan struct{}); ok {
r0 = rf(nodeID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}
// MockScheduler_GetExecutedFlag_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetExecutedFlag'
type MockScheduler_GetExecutedFlag_Call struct {
*mock.Call
}
// GetExecutedFlag is a helper method to define mock.On call
// - nodeID int64
func (_e *MockScheduler_Expecter) GetExecutedFlag(nodeID interface{}) *MockScheduler_GetExecutedFlag_Call {
return &MockScheduler_GetExecutedFlag_Call{Call: _e.mock.On("GetExecutedFlag", nodeID)}
}
func (_c *MockScheduler_GetExecutedFlag_Call) Run(run func(nodeID int64)) *MockScheduler_GetExecutedFlag_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockScheduler_GetExecutedFlag_Call) Return(_a0 <-chan struct{}) *MockScheduler_GetExecutedFlag_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScheduler_GetExecutedFlag_Call) RunAndReturn(run func(int64) <-chan struct{}) *MockScheduler_GetExecutedFlag_Call {
_c.Call.Return(run)
return _c
}
// GetNodeChannelDelta provides a mock function with given fields: nodeID
func (_m *MockScheduler) GetNodeChannelDelta(nodeID int64) int {
ret := _m.Called(nodeID)

View File

@ -143,6 +143,7 @@ type Scheduler interface {
RemoveByNode(node int64)
GetNodeSegmentDelta(nodeID int64) int
GetNodeChannelDelta(nodeID int64) int
GetExecutedFlag(nodeID int64) <-chan struct{}
GetChannelTaskNum() int
GetSegmentTaskNum() int
}
@ -485,6 +486,18 @@ func (scheduler *taskScheduler) GetNodeChannelDelta(nodeID int64) int {
return calculateNodeDelta(nodeID, scheduler.channelTasks)
}
func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
executor, ok := scheduler.executors[nodeID]
if !ok {
return nil
}
return executor.GetExecutedFlag()
}
func (scheduler *taskScheduler) GetChannelTaskNum() int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()

View File

@ -471,6 +471,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
// Process tasks
suite.dispatchAndWait(targetNode)
suite.assertExecutedFlagChan(targetNode)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
// Process tasks done
@ -1536,6 +1537,17 @@ func (suite *TaskSuite) dispatchAndWait(node int64) {
suite.FailNow("executor hangs in executing tasks", "count=%d keys=%+v", count, keys)
}
func (suite *TaskSuite) assertExecutedFlagChan(targetNode int64) {
flagChan := suite.scheduler.GetExecutedFlag(targetNode)
if flagChan != nil {
select {
case <-flagChan:
default:
suite.FailNow("task not executed")
}
}
}
func (suite *TaskSuite) TestLeaderTaskRemove() {
ctx := context.Background()
timeout := 10 * time.Second

View File

@ -1510,6 +1510,9 @@ type queryCoordConfig struct {
GracefulStopTimeout ParamItem `refreshable:"true"`
EnableStoppingBalance ParamItem `refreshable:"true"`
ChannelExclusiveNodeFactor ParamItem `refreshable:"true"`
CollectionObserverInterval ParamItem `refreshable:"false"`
CheckExecutedFlagInterval ParamItem `refreshable:"false"`
}
func (p *queryCoordConfig) init(base *BaseTable) {
@ -1995,6 +1998,24 @@ func (p *queryCoordConfig) init(base *BaseTable) {
Export: true,
}
p.ChannelExclusiveNodeFactor.Init(base.mgr)
p.CollectionObserverInterval = ParamItem{
Key: "queryCoord.collectionObserverInterval",
Version: "2.4.4",
DefaultValue: "200",
Doc: "the interval of collection observer",
Export: false,
}
p.CollectionObserverInterval.Init(base.mgr)
p.CheckExecutedFlagInterval = ParamItem{
Key: "queryCoord.checkExecutedFlagInterval",
Version: "2.4.4",
DefaultValue: "100",
Doc: "the interval of check executed flag to force to pull dist",
Export: false,
}
p.CheckExecutedFlagInterval.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -307,6 +307,16 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, true, Params.EnableStoppingBalance.GetAsBool())
assert.Equal(t, 4, Params.ChannelExclusiveNodeFactor.GetAsInt())
assert.Equal(t, 200, Params.CollectionObserverInterval.GetAsInt())
params.Save("queryCoord.collectionObserverInterval", "100")
assert.Equal(t, 100, Params.CollectionObserverInterval.GetAsInt())
params.Reset("queryCoord.collectionObserverInterval")
assert.Equal(t, 100, Params.CheckExecutedFlagInterval.GetAsInt())
params.Save("queryCoord.checkExecutedFlagInterval", "200")
assert.Equal(t, 200, Params.CheckExecutedFlagInterval.GetAsInt())
params.Reset("queryCoord.checkExecutedFlagInterval")
})
t.Run("test queryNodeConfig", func(t *testing.T) {