diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 1b51675ca2..fd09b133da 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -201,6 +201,10 @@ queryNode: stats: publishInterval: 1000 # Interval for querynode to report node information (milliseconds) segcore: + knowhereThreadPoolNumRatio: 4 + # Use more threads to make better use of SSD throughput in disk index. + # This parameter is only useful when enable-disk = true. + # And this value should be a number greater than 1 and less than 32. chunkRows: 1024 # The number of vectors in a chunk. smallIndex: nlist: 128 # small index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8 diff --git a/internal/core/src/segcore/segcore_init_c.cpp b/internal/core/src/segcore/segcore_init_c.cpp index b79c247bb5..26f4a486b0 100644 --- a/internal/core/src/segcore/segcore_init_c.cpp +++ b/internal/core/src/segcore/segcore_init_c.cpp @@ -43,7 +43,7 @@ SegcoreSetNprobe(const int64_t value) { } extern "C" void -SegcoreSetThreadPoolNum(const uint32_t num_threads) { +SegcoreSetKnowhereThreadPoolNum(const uint32_t num_threads) { milvus::config::KnowhereInitThreadPool(num_threads); } diff --git a/internal/core/src/segcore/segcore_init_c.h b/internal/core/src/segcore/segcore_init_c.h index b16daf8a0c..006de8c75b 100644 --- a/internal/core/src/segcore/segcore_init_c.h +++ b/internal/core/src/segcore/segcore_init_c.h @@ -32,7 +32,7 @@ char* SegcoreSetSimdType(const char*); void -SegcoreSetThreadPoolNum(const uint32_t num_threads); +SegcoreSetKnowhereThreadPoolNum(const uint32_t num_threads); #ifdef __cplusplus } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 68e61d966d..6a298c994d 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -182,6 +182,9 @@ func (node *QueryNode) InitSegcore() { cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64()) C.SegcoreSetChunkRows(cChunkRows) + cKnowhereThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.GetAsUint32()) + C.SegcoreSetKnowhereThreadPoolNum(cKnowhereThreadPoolSize) + nlist := C.int64_t(paramtable.Get().QueryNodeCfg.SmallIndexNlist.GetAsInt64()) C.SegcoreSetNlist(nlist) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 27961d7839..41e6755182 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1367,9 +1367,10 @@ type queryNodeConfig struct { StatsPublishInterval ParamItem `refreshable:"true"` // segcore - ChunkRows ParamItem `refreshable:"false"` - SmallIndexNlist ParamItem `refreshable:"false"` - SmallIndexNProbe ParamItem `refreshable:"false"` + KnowhereThreadPoolSize ParamItem `refreshable:"false"` + ChunkRows ParamItem `refreshable:"false"` + SmallIndexNlist ParamItem `refreshable:"false"` + SmallIndexNProbe ParamItem `refreshable:"false"` // memory limit LoadMemoryUsageFactor ParamItem `refreshable:"true"` @@ -1442,6 +1443,25 @@ func (p *queryNodeConfig) init(base *BaseTable) { } p.StatsPublishInterval.Init(base.mgr) + p.KnowhereThreadPoolSize = ParamItem{ + Key: "queryNode.segcore.knowhereThreadPoolNumRatio", + Version: "2.0.0", + DefaultValue: "4", + Formatter: func(v string) string { + factor := getAsInt64(v) + if factor <= 0 || !p.EnableDisk.GetAsBool() { + factor = 1 + } else if factor > 32 { + factor = 32 + } + knowhereThreadPoolSize := uint32(runtime.GOMAXPROCS(0)) * uint32(factor) + return strconv.FormatUint(uint64(knowhereThreadPoolSize), 10) + }, + Doc: "The number of threads in knowhere's thread pool. If disk is enabled, the pool size will multiply with knowhereThreadPoolNumRatio([1, 32]).", + Export: true, + } + p.KnowhereThreadPoolSize.Init(base.mgr) + p.ChunkRows = ParamItem{ Key: "queryNode.segcore.chunkRows", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 72c27b850b..c3ab37b432 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -298,6 +298,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(1000), Params.MaxGroupNQ.GetAsInt64()) assert.Equal(t, 10.0, Params.TopKMergeRatio.GetAsFloat()) assert.Equal(t, 10.0, Params.CPURatio.GetAsFloat()) + assert.Equal(t, uint32(runtime.GOMAXPROCS(0)*4), Params.KnowhereThreadPoolSize.GetAsUint32()) // test small indexNlist/NProbe default params.Remove("queryNode.segcore.smallIndex.nlist")