mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
enhance: make segments SQPool & LoadPool resizable (#29239)
See also #29223 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
1f1a8b7770
commit
88b4b8b77c
@ -17,12 +17,16 @@
|
||||
package segments
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/config"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
@ -45,14 +49,17 @@ var (
|
||||
func initSQPool() {
|
||||
sqOnce.Do(func() {
|
||||
pt := paramtable.Get()
|
||||
initPoolSize := int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat() * pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()))
|
||||
pool := conc.NewPool[any](
|
||||
int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat()*pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat())),
|
||||
conc.WithPreAlloc(true),
|
||||
initPoolSize,
|
||||
conc.WithPreAlloc(false), // pre alloc must be false to resize pool dynamically, use warmup to alloc worker here
|
||||
conc.WithDisablePurge(true),
|
||||
)
|
||||
conc.WarmupPool(pool, runtime.LockOSThread)
|
||||
|
||||
sqp.Store(pool)
|
||||
|
||||
pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("qn.sqpool.maxconc", ResizeSQPool))
|
||||
pt.Watch(pt.QueryNodeCfg.CGOPoolSizeRatio.Key, config.NewHandler("qn.sqpool.cgopoolratio", ResizeSQPool))
|
||||
})
|
||||
}
|
||||
|
||||
@ -71,14 +78,17 @@ func initDynamicPool() {
|
||||
|
||||
func initLoadPool() {
|
||||
loadOnce.Do(func() {
|
||||
pt := paramtable.Get()
|
||||
pool := conc.NewPool[any](
|
||||
hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(),
|
||||
hardware.GetCPUNum()*pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(),
|
||||
conc.WithPreAlloc(false),
|
||||
conc.WithDisablePurge(false),
|
||||
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
|
||||
)
|
||||
|
||||
loadPool.Store(pool)
|
||||
|
||||
pt.Watch(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.Key, config.NewHandler("qn.loadpool.middlepriority", ResizeLoadPool))
|
||||
})
|
||||
}
|
||||
|
||||
@ -98,3 +108,41 @@ func GetLoadPool() *conc.Pool[any] {
|
||||
initLoadPool()
|
||||
return loadPool.Load()
|
||||
}
|
||||
|
||||
func ResizeSQPool(evt *config.Event) {
|
||||
if evt.HasUpdated {
|
||||
pt := paramtable.Get()
|
||||
newSize := int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat() * pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()))
|
||||
pool := GetSQPool()
|
||||
resizePool(pool, newSize, "SQPool")
|
||||
conc.WarmupPool(pool, runtime.LockOSThread)
|
||||
}
|
||||
}
|
||||
|
||||
func ResizeLoadPool(evt *config.Event) {
|
||||
if evt.HasUpdated {
|
||||
pt := paramtable.Get()
|
||||
newSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt()
|
||||
resizePool(GetLoadPool(), newSize, "LoadPool")
|
||||
}
|
||||
}
|
||||
|
||||
func resizePool(pool *conc.Pool[any], newSize int, tag string) {
|
||||
log := log.Ctx(context.Background()).
|
||||
With(
|
||||
zap.String("poolTag", tag),
|
||||
zap.Int("newSize", newSize),
|
||||
)
|
||||
|
||||
if newSize <= 0 {
|
||||
log.Warn("cannot set pool size to non-positive value")
|
||||
return
|
||||
}
|
||||
|
||||
err := pool.Resize(newSize)
|
||||
if err != nil {
|
||||
log.Warn("failed to resize pool", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("pool resize successfully")
|
||||
}
|
||||
|
93
internal/querynodev2/segments/pool_test.go
Normal file
93
internal/querynodev2/segments/pool_test.go
Normal file
@ -0,0 +1,93 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package segments
|
||||
|
||||
import (
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/config"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
func TestResizePools(t *testing.T) {
|
||||
paramtable.Get().Init(paramtable.NewBaseTable(paramtable.SkipRemote(true)))
|
||||
pt := paramtable.Get()
|
||||
|
||||
defer func() {
|
||||
pt.Reset(pt.QueryNodeCfg.MaxReadConcurrency.Key)
|
||||
pt.Reset(pt.QueryNodeCfg.CGOPoolSizeRatio.Key)
|
||||
pt.Reset(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.Key)
|
||||
}()
|
||||
|
||||
t.Run("SQPool", func(t *testing.T) {
|
||||
expectedCap := int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat() * pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()))
|
||||
|
||||
ResizeSQPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetSQPool().Cap())
|
||||
|
||||
pt.Save(pt.QueryNodeCfg.CGOPoolSizeRatio.Key, strconv.FormatFloat(pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()*2, 'f', 10, 64))
|
||||
expectedCap = int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat() * pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()))
|
||||
ResizeSQPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetSQPool().Cap())
|
||||
|
||||
pt.Save(pt.QueryNodeCfg.CGOPoolSizeRatio.Key, "0")
|
||||
ResizeSQPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetSQPool().Cap(), "pool shall not be resized when newSize is 0")
|
||||
})
|
||||
|
||||
t.Run("LoadPool", func(t *testing.T) {
|
||||
expectedCap := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt()
|
||||
|
||||
ResizeLoadPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetLoadPool().Cap())
|
||||
|
||||
pt.Save(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.Key, strconv.FormatFloat(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()*2, 'f', 10, 64))
|
||||
ResizeLoadPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetLoadPool().Cap())
|
||||
|
||||
pt.Save(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.Key, "0")
|
||||
ResizeLoadPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetLoadPool().Cap())
|
||||
})
|
||||
|
||||
t.Run("error_pool", func(*testing.T) {
|
||||
pool := conc.NewDefaultPool[any]()
|
||||
c := pool.Cap()
|
||||
|
||||
resizePool(pool, c*2, "debug")
|
||||
|
||||
assert.Equal(t, c, pool.Cap())
|
||||
})
|
||||
}
|
@ -1703,7 +1703,7 @@ type queryNodeConfig struct {
|
||||
SchedulePolicyMaxPendingTaskPerUser ParamItem `refreshable:"true"`
|
||||
|
||||
// CGOPoolSize ratio to MaxReadConcurrency
|
||||
CGOPoolSizeRatio ParamItem `refreshable:"false"`
|
||||
CGOPoolSizeRatio ParamItem `refreshable:"true"`
|
||||
|
||||
EnableWorkerSQCostMetrics ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user