mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Set query node thread pool size in milvus.yaml (#23286)
Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
This commit is contained in:
parent
3e33954f9e
commit
7ff6a3a246
@ -201,6 +201,10 @@ queryNode:
|
|||||||
stats:
|
stats:
|
||||||
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
|
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
|
||||||
segcore:
|
segcore:
|
||||||
|
knowhereThreadPoolNumRatio: 4
|
||||||
|
# Use more threads to make better use of SSD throughput in disk index.
|
||||||
|
# This parameter is only useful when enable-disk = true.
|
||||||
|
# And this value should be a number greater than 1 and less than 32.
|
||||||
chunkRows: 1024 # The number of vectors in a chunk.
|
chunkRows: 1024 # The number of vectors in a chunk.
|
||||||
smallIndex:
|
smallIndex:
|
||||||
nlist: 128 # small index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
|
nlist: 128 # small index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
|
||||||
|
@ -43,7 +43,7 @@ SegcoreSetNprobe(const int64_t value) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
extern "C" void
|
extern "C" void
|
||||||
SegcoreSetThreadPoolNum(const uint32_t num_threads) {
|
SegcoreSetKnowhereThreadPoolNum(const uint32_t num_threads) {
|
||||||
milvus::config::KnowhereInitThreadPool(num_threads);
|
milvus::config::KnowhereInitThreadPool(num_threads);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ char*
|
|||||||
SegcoreSetSimdType(const char*);
|
SegcoreSetSimdType(const char*);
|
||||||
|
|
||||||
void
|
void
|
||||||
SegcoreSetThreadPoolNum(const uint32_t num_threads);
|
SegcoreSetKnowhereThreadPoolNum(const uint32_t num_threads);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
@ -182,6 +182,9 @@ func (node *QueryNode) InitSegcore() {
|
|||||||
cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64())
|
cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64())
|
||||||
C.SegcoreSetChunkRows(cChunkRows)
|
C.SegcoreSetChunkRows(cChunkRows)
|
||||||
|
|
||||||
|
cKnowhereThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.GetAsUint32())
|
||||||
|
C.SegcoreSetKnowhereThreadPoolNum(cKnowhereThreadPoolSize)
|
||||||
|
|
||||||
nlist := C.int64_t(paramtable.Get().QueryNodeCfg.SmallIndexNlist.GetAsInt64())
|
nlist := C.int64_t(paramtable.Get().QueryNodeCfg.SmallIndexNlist.GetAsInt64())
|
||||||
C.SegcoreSetNlist(nlist)
|
C.SegcoreSetNlist(nlist)
|
||||||
|
|
||||||
|
@ -1367,9 +1367,10 @@ type queryNodeConfig struct {
|
|||||||
StatsPublishInterval ParamItem `refreshable:"true"`
|
StatsPublishInterval ParamItem `refreshable:"true"`
|
||||||
|
|
||||||
// segcore
|
// segcore
|
||||||
ChunkRows ParamItem `refreshable:"false"`
|
KnowhereThreadPoolSize ParamItem `refreshable:"false"`
|
||||||
SmallIndexNlist ParamItem `refreshable:"false"`
|
ChunkRows ParamItem `refreshable:"false"`
|
||||||
SmallIndexNProbe ParamItem `refreshable:"false"`
|
SmallIndexNlist ParamItem `refreshable:"false"`
|
||||||
|
SmallIndexNProbe ParamItem `refreshable:"false"`
|
||||||
|
|
||||||
// memory limit
|
// memory limit
|
||||||
LoadMemoryUsageFactor ParamItem `refreshable:"true"`
|
LoadMemoryUsageFactor ParamItem `refreshable:"true"`
|
||||||
@ -1442,6 +1443,25 @@ func (p *queryNodeConfig) init(base *BaseTable) {
|
|||||||
}
|
}
|
||||||
p.StatsPublishInterval.Init(base.mgr)
|
p.StatsPublishInterval.Init(base.mgr)
|
||||||
|
|
||||||
|
p.KnowhereThreadPoolSize = ParamItem{
|
||||||
|
Key: "queryNode.segcore.knowhereThreadPoolNumRatio",
|
||||||
|
Version: "2.0.0",
|
||||||
|
DefaultValue: "4",
|
||||||
|
Formatter: func(v string) string {
|
||||||
|
factor := getAsInt64(v)
|
||||||
|
if factor <= 0 || !p.EnableDisk.GetAsBool() {
|
||||||
|
factor = 1
|
||||||
|
} else if factor > 32 {
|
||||||
|
factor = 32
|
||||||
|
}
|
||||||
|
knowhereThreadPoolSize := uint32(runtime.GOMAXPROCS(0)) * uint32(factor)
|
||||||
|
return strconv.FormatUint(uint64(knowhereThreadPoolSize), 10)
|
||||||
|
},
|
||||||
|
Doc: "The number of threads in knowhere's thread pool. If disk is enabled, the pool size will multiply with knowhereThreadPoolNumRatio([1, 32]).",
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.KnowhereThreadPoolSize.Init(base.mgr)
|
||||||
|
|
||||||
p.ChunkRows = ParamItem{
|
p.ChunkRows = ParamItem{
|
||||||
Key: "queryNode.segcore.chunkRows",
|
Key: "queryNode.segcore.chunkRows",
|
||||||
Version: "2.0.0",
|
Version: "2.0.0",
|
||||||
|
@ -298,6 +298,7 @@ func TestComponentParam(t *testing.T) {
|
|||||||
assert.Equal(t, int64(1000), Params.MaxGroupNQ.GetAsInt64())
|
assert.Equal(t, int64(1000), Params.MaxGroupNQ.GetAsInt64())
|
||||||
assert.Equal(t, 10.0, Params.TopKMergeRatio.GetAsFloat())
|
assert.Equal(t, 10.0, Params.TopKMergeRatio.GetAsFloat())
|
||||||
assert.Equal(t, 10.0, Params.CPURatio.GetAsFloat())
|
assert.Equal(t, 10.0, Params.CPURatio.GetAsFloat())
|
||||||
|
assert.Equal(t, uint32(runtime.GOMAXPROCS(0)*4), Params.KnowhereThreadPoolSize.GetAsUint32())
|
||||||
|
|
||||||
// test small indexNlist/NProbe default
|
// test small indexNlist/NProbe default
|
||||||
params.Remove("queryNode.segcore.smallIndex.nlist")
|
params.Remove("queryNode.segcore.smallIndex.nlist")
|
||||||
|
Loading…
Reference in New Issue
Block a user